From a0229b220c24b677cc7d152fdc6c75787dc57596 Mon Sep 17 00:00:00 2001 From: RICHARDNAN <1466684392@qq.com> Date: Wed, 3 Sep 2025 15:40:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A50.2.4npu=E7=9A=84scripts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 0.sh | 44 ++++ install.sh | 2 +- install_for_npu.sh | 122 +++++++++ merge_tensors/merge_safetensor_gguf.py | 2 +- scripts/test_curl.sh | 62 +++++ scripts/test_generator.py | 163 ++++++++++++ scripts/test_generator.yaml | 41 +++ scripts/test_runner.py | 332 +++++++++++++++++++++++++ scripts/test_runner.yaml | 27 ++ serve_test.sh | 63 +++++ setup.py | 60 ++++- 11 files changed, 907 insertions(+), 11 deletions(-) create mode 100644 0.sh create mode 100644 install_for_npu.sh create mode 100644 scripts/test_curl.sh create mode 100644 scripts/test_generator.py create mode 100644 scripts/test_generator.yaml create mode 100644 scripts/test_runner.py create mode 100644 scripts/test_runner.yaml create mode 100644 serve_test.sh diff --git a/0.sh b/0.sh new file mode 100644 index 0000000..ee4b458 --- /dev/null +++ b/0.sh @@ -0,0 +1,44 @@ +#!/bin/bash +#set -ex + +# export area +export ASDOPS_LOG_TO_FILE=0 +export ASDOPS_LOG_TO_STDOUT=0 +export ASDOPS_LOG_LEVEL=ERROR +export ATB_LOG_TO_FILE=0 +export ATB_LOG_TO_STDOUT=0 +export ATB_LOG_LEVEL=ERROR +export USE_MERGE=0 +# export PROF_DECODE=1 +#export PYTORCH_NPU_ALLOC_CONF=expandable_segments:False +export INF_NAN_MODE_FORCE_DISABLE=1 +export CAPTURE_PLUGIN_PATH=/home/x30058903/pack/npu_graph +export BLAS_NUM_THREADS=1 +export TASK_QUEUE_ENABLE=0 +# source area +source /usr/local/Ascend/ascend-toolkit/set_env.sh +source /usr/local/Ascend/nnal/atb/set_env.sh + +# global vars +LOG_DIR="../logs" +LOG_NAME="test_cpuinfer49" +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +mkdir -p ${LOG_DIR} +LOG_FILE="${LOG_DIR}/${LOG_NAME}_${TIMESTAMP}.log" +# /home/x00656918/600-500.txt +# /home/x00656918/600-500.txt +# torchrun and model area +torchrun \ + --master-port 41532 \ + --nproc_per_node 1 \ + -m ktransformers.local_chat \ + --cpu_infer 65 \ + --model_path /home/DeepSeek-V3-q4km-w8a8/ \ + --gguf_path /home/DeepSeek-V3-q4km-w8a8/ \ + --max_new_tokens 200 \ + --use_cuda_graph True \ + --optimize_config_path ./ktransformers/optimize/optimize_rules/DeepSeek-V3-Chat-800IA2-npu.yaml \ +2>&1 | tee "${LOG_FILE}" + + # --gguf_path /mnt/DeepSeek-R1-BF16/ \ + diff --git a/install.sh b/install.sh index 573826e..2710eb5 100644 --- a/install.sh +++ b/install.sh @@ -29,7 +29,7 @@ pip install -r requirements-local_chat.txt pip install -r ktransformers/server/requirements.txt echo "Installing ktransformers" -KTRANSFORMERS_FORCE_BUILD=TRUE pip install -v . --no-build-isolation +KTRANSFORMERS_FORCE_BUILD=TRUE USE_BALANCE_SERVE=1 pip install -vvv . --no-build-isolation if [[ "$DEV_BACKEND" == "cuda" ]]; then echo "Installing custom_flashinfer for CUDA backend" diff --git a/install_for_npu.sh b/install_for_npu.sh new file mode 100644 index 0000000..98cf3b9 --- /dev/null +++ b/install_for_npu.sh @@ -0,0 +1,122 @@ +#!/bin/bash +set -e +source /usr/local/Ascend/ascend-toolkit/set_env.sh + +INITIALIZED="FALSE" +ROOT_DIR=$(pwd) + +if [ -f ".INITIALIZED" ]; then + INITIALIZED="TRUE" +fi + +case "$INITIALIZED" in + TRUE) + echo "Detect file .INITIALIZED, will not init again." + ;; + FALSE) + echo "Not detect file .INITIALIZED, do init." + # Detect architecture + ARCH=$(uname -m) + IS_ARM=false + if [[ "$ARCH" == "armv7l" || "$ARCH" == "aarch64" ]]; then + IS_ARM=true + fi + + # ARM-specific operations + if $IS_ARM; then + echo "Processing ARM architecture specific tasks" + + # Copy ARM specific files + # cp ./for_arm/CMakeLists.txt ./csrc/ktransformers_ext/CMakeLists.txt + # cp ./for_arm/iqk_mul_mat.inc ./third_party/llamafile/iqk_mul_mat.inc + # cp ./for_arm/sgemm.cpp ./third_party/llamafile/sgemm.cpp + # cp ./for_arm/tinyblas_cpu_sgemm.inc ./third_party/llamafile/tinyblas_cpu_sgemm.inc + cp ./for_arm/requirements-local_chat.txt ./requirements-local_chat.txt + cp ./for_arm/setup.py ./setup.py + fi + + # init third_party + # clone third_party or unzip third_party file + third_party_file="" + third_party="$ROOT_DIR/third_party" + cd "$third_party" + + for i in "$@" + do + case $i in + --third-party-file=*|-f=*) + third_party_file="${i#*=}" + shift + ;; + *) + echo "Unknown operation: $i" + exit 1 + ;; + esac + done + + if [ -n "$third_party_file" ]; then + if [[ "$third_party_file" != /* ]]; then + third_party_file="$ROOT_DIR/$third_party_file" + fi + + if [ ! -f "$third_party_file" ]; then + echo "Error: file not found on '$third_party_file'" + exit 1 + fi + + case "${third_party_file}" in + *.tar.gz|*.tgz) + tar -xzf "$third_party_file" + ;; + *.zip) + unzip "$third_party_file" + ;; + *) + echo "Error: unsupported file format '$third_party_file'" + exit 1 + ;; + esac + echo "Finish decompress ${third_party_file}" + else + # todo update + git clone https://github.com/kvcache-ai/custom_flashinfer.git -b fix-precision-mla-merge-main && cd custom_flashinfer && git checkout fd94393f + git submodule init && git submodule update && cd 3rdparty + cd composable_kernels && git checkout 5055b3bd && cd .. + cd cutlass && git checkout cc3c29a8 && cd .. + cd googletest && git checkout 5a37b517 && cd .. + cd mscclpp && git checkout v0.5.1 && cd .. + cd nvbench && git checkout 555d628e && cd .. + cd spdlog && git checkout v1.x && cd .. + cd "$third_party" + + git clone https://github.com/ggerganov/llama.cpp.git -b master && cd llama.cpp && git checkout b3173 + git submodule init && git submodule update + cd kompute && git checkout 4565194e && cd .. + cd "$third_party" + + git clone https://github.com/jupp0r/prometheus-cpp -b master && cd prometheus-cpp && git checkout f13cdd05 + git submodule init && git submodule update && cd 3rdparty + cd civetweb && git checkout v1.16 && cd .. + cd googletest && git checkout release-1.11.0 && cd .. + cd "$third_party" + + git clone https://github.com/pybind/pybind11.git -b master && cd pybind11 && git checkout bb05e081 && cd .. + git clone https://github.com/gabime/spdlog.git -b v1.x && cd spdlog && git checkout v1.15.2 && cd .. + git clone https://github.com/Cyan4973/xxHash.git -b dev && cd xxHash && git checkout 953a09ab && cd .. + + echo "Finish clone and checkout third_party" + fi + + cd "$ROOT_DIR" + touch ./.INITIALIZED + ;; + *) + echo "Error" + exit 1 + ;; +esac + +cd "$ROOT_DIR" +sed -i 's/\r$//' ./install.sh +bash ./install.sh diff --git a/merge_tensors/merge_safetensor_gguf.py b/merge_tensors/merge_safetensor_gguf.py index f299ab9..efeab3b 100644 --- a/merge_tensors/merge_safetensor_gguf.py +++ b/merge_tensors/merge_safetensor_gguf.py @@ -6,7 +6,7 @@ import sys # sys.path.insert(0, "/home/azure/ktransformers") import argparse import torch -from ktransformers.util.custom_loader import GGUFLoader, translate_name_to_gguf +from ktransformers.util.custom_gguf import GGUFLoader, translate_name_to_gguf from safetensors import safe_open from safetensors.torch import save_file import re diff --git a/scripts/test_curl.sh b/scripts/test_curl.sh new file mode 100644 index 0000000..de8c4e6 --- /dev/null +++ b/scripts/test_curl.sh @@ -0,0 +1,62 @@ +#curl -X 'POST' 'http://localhost:10068/v1/chat/completions' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ +# "messages": [ +# { +# "content": "Give me a clear and full explanation about the history of ThanksGiving.", +# "role": "user" +# } +# ], +# "model": "DeepSeek-Coder-V2-Instruct", +# "stream": false +#}' & +# +#sleep 1 + +curl -X 'POST' 'http://localhost:10068/v1/chat/completions' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ + "messages": [ + { + "content": "Who are you?", + "role": "user" + } + ], + "model": "DeepSeek-Coder-V2-Instruct", + "stream": false +}' & + +sleep 1 + +curl -X 'POST' 'http://localhost:10068/v1/chat/completions' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ + "messages": [ + { + "content": "Where is Beijing?", + "role": "user" + } + ], + "model": "DeepSeek-Coder-V2-Instruct", + "stream": false +}' & + +sleep 1 + +curl -X 'POST' 'http://localhost:10068/v1/chat/completions' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ + "messages": [ + { + "content": "What you do?", + "role": "user" + } + ], + "model": "DeepSeek-Coder-V2-Instruct", + "stream": false +}' & + +sleep 1 + +curl -X 'POST' 'http://localhost:10068/v1/chat/completions' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ + "messages": [ + { + "content": "Where is Beijing?", + "role": "user" + } + ], + "model": "DeepSeek-Coder-V2-Instruct", + "stream": false +}' & diff --git a/scripts/test_generator.py b/scripts/test_generator.py new file mode 100644 index 0000000..dfa1350 --- /dev/null +++ b/scripts/test_generator.py @@ -0,0 +1,163 @@ +import os +import re +import socket +import random +from copy import deepcopy +from typing import Sequence + +import yaml + + +def get_sh_body(): + sh = r'''#!/bin/bash +set -ex + +# export area +{export_area} + +# source area +{source_area} + +# global vars +LOG_DIR="../logs" +{log_file_name} +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +mkdir -p ${LOG_DIR} +LOG_FILE="${LOG_DIR}/${LOG_NAME}_${TIMESTAMP}.log" + +# torchrun and model area +torchrun \ + --master-port {master_port} \ +{torchrun_area} +{model_area} +2>&1 | tee "${LOG_FILE}"''' + return sh + + +def add_static_pattern_seq(sh: str, data: Sequence, pattern_name: str, replace_line: str): + lines = [replace_line.format(val) for val in data] + replace_context = "\n".join(lines) + return sh.replace(pattern_name, replace_context) + + +def add_static_pattern_dict(sh: str, data: dict, pattern_name: str, replace_line: str): + lines = [replace_line.format(key, val) for key, val in data.items()] + replace_context = "\n".join(lines) + return sh.replace(pattern_name, replace_context) + + +def add_export(sh, data: dict): + return add_static_pattern_dict(sh, data, "{export_area}", "export {0}={1}") + + +def add_source(sh, data: Sequence): + return add_static_pattern_seq(sh, data, "{source_area}", "source {0}") + + +def add_torchrun(sh, data): + long_replace = add_static_pattern_dict("long", data["long"], "long", " --{0} {1} \\") + short_replace = add_static_pattern_dict("short", data["short"], "short", " -{0} {1} \\") + return sh.replace("{torchrun_area}", long_replace + "\n" + short_replace) + + +def add_model(sh, data: dict): + return add_static_pattern_dict(sh, data, "{model_area}", " --{0} {1} \\") + + +def get_valid_file_name_sequence(s: str) -> str: + return "".join(char for char in s if char.isalnum()) + + +def get_valid_file_name_lines(hyper): + keys_required = ("cpu_infer", ) + if not all(key in hyper for key in keys_required): + raise ValueError(f"{', '.join(keys_required)} should be in hyperparams of generator.py to generate file name.") + ret = [get_valid_file_name_sequence(f"{key}{hyper[key]}") for key in keys_required] + return "test_" + "_".join(ret) + + +def add_log_file(sh, hyper): + hyperparams_lines = get_valid_file_name_lines(hyper) + return sh.replace("{log_file_name}", "LOG_NAME=\"" + hyperparams_lines + "\"") + + +def is_available_port(port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("127.0.0.1", port)) + return True + except socket.error as e: + if e.errno == 98: + return False + raise e + +def add_available_port(sh, start=1024, end=65535): + while True: + port = random.randint(start, end) + if is_available_port(port): + return sh.replace("{master_port}", f"{port}") + + +def get_all_hyper_keys(hypers): + keys = set() + for hyper in hypers: + for key, val in hyper.items(): + keys.add(key) + return keys + + +def replace_hyper(sh: str, hyper, all_hyper_keys): + lines = [] + hyper_not_used = set(hyper.keys()) + for line in sh.split("\n"): + match = re.search("\$([a-zA-Z0-9_]+)", line) + if not match: + lines.append(line) + continue + + key = match.group(1) + if key in hyper: + hyper_not_used.remove(key) + lines.append(line.replace(f"${key}", str(hyper[key]))) + elif key not in all_hyper_keys: + print(f"[WARNING] `{key}` not in hyperparams, will skip generating the line.") + + if hyper_not_used: + print(f"[WARNING] The following hyperparams are not used: {','.join(hyper_not_used)}") + return "\n".join(lines) + + +def load_yaml_file(config_path): + if not os.path.exists(config_path): + raise FileNotFoundError("Failed to find config file on {}".format(config_path)) + + with open(config_path) as f: + return yaml.safe_load(f) + +def save_sh_file(sh, fname): + with open(fname, "w") as f: + f.write(sh) + os.chmod(fname, 0o550) + print("Generate file: ", fname) + + +if __name__ == "__main__": + root_dir = os.path.abspath(os.path.dirname(__file__)) + "/" + y = load_yaml_file(root_dir + "test_generator.yaml") + hypers = y["hyperparams"] + all_hyper_keys = get_all_hyper_keys(hypers) + test_cnt = len(hypers) + + sh = get_sh_body() + sh = add_export(sh, y["export_area"]) + sh = add_source(sh, y["source_area"]) + sh = add_torchrun(sh, y["torchrun_area"]) + sh = add_model(sh, y["model_area"]) + + for hyper in hypers: + sh_ = deepcopy(sh) + sh_ = add_available_port(sh_) + sh_ = add_log_file(sh_, hyper) + sh_ = replace_hyper(sh_, hyper, all_hyper_keys) + fname = root_dir + get_valid_file_name_lines(hyper) + ".sh" + save_sh_file(sh_, fname) diff --git a/scripts/test_generator.yaml b/scripts/test_generator.yaml new file mode 100644 index 0000000..0e5dd50 --- /dev/null +++ b/scripts/test_generator.yaml @@ -0,0 +1,41 @@ +global_vars: + q8: &q8 /home/dataset/int8 + q4: &q4 /home/dataset/int4 + w8a8: &w8a8 /home/dataset/w8a8 + ascend_toolkit: &toolkit /usr/local/Ascend/ascend-toolkit/set_env.sh + atb: &atb /home/ascend/ascend-transformer-boost/output/atb/set_env.sh + optimize_config_path: &opt_cfg ./ktransformers/optimize/optimize_rules/DeepSeek-V3-Chat-800IA2-npu.yaml + +hyperparams: + - cpu_infer: 49 + w8a8_safetensor_path: *w8a8 + + - cpu_infer: 37 + +export_area: + ASDOPS_LOG_TO_FILE: 0 + ASDOPS_LOG_TO_STDOUT: 0 + ASDOPS_LOG_LEVEL: ERROR + ATB_LOG_TO_FILE: 0 + ATB_LOG_TO_STDOUT: 0 + ATB_LOG_LEVEL: ERROR + USE_MERGE: 0 + +source_area: + - *toolkit + - *atb + +torchrun_area: + long: + nproc_per_node: 1 + short: + m: ktransformers.local_chat + +model_area: + cpu_infer: $cpu_infer + model_path: $model_path + gguf_path: *q8 + q4_gguf_path: *q4 + w8a8_safetensor_path: $w8a8_safetensor_path + max_new_tokens: 50 + optimize_config_path: *opt_cfg diff --git a/scripts/test_runner.py b/scripts/test_runner.py new file mode 100644 index 0000000..2bd05b2 --- /dev/null +++ b/scripts/test_runner.py @@ -0,0 +1,332 @@ +import os +import select +import subprocess +import time +from functools import partial +from types import SimpleNamespace +from typing import Sequence, Tuple, Dict, Union + +import psutil +import yaml + + +def load_yaml_file(config_path): + if not os.path.exists(config_path): + raise FileNotFoundError('[Runner] Failed to find config file on {}'.format(config_path)) + + with open(config_path) as f: + return yaml.safe_load(f) + + +def store_output_stat(line: str, store: dict[str, list[str]]) -> bool: + line_head = line[:30] + for key in store.keys(): + if key in line_head: + store[key].append(line) + return True + return False + + +def get_file_line_cnt(line): + if os.path.isfile(line): + with open(line, 'r') as f: + non_empty_lines = sum(1 for line in f if line.strip()) + else: + non_empty_lines = 1 + print(f'[Runner] Use default line count: 1') + return non_empty_lines + + +def print_script_running(proc, script, args, script_id) -> tuple[bool, str | dict] | None: + is_print_lines = args.is_print_lines + print_interval = args.print_interval + test_input = script.get('test_input', 'Hello, world.') + timeout = script.get('timeout', 20000) # sec + + line_cnt = get_file_line_cnt(test_input) + max_stat_count = line_cnt * len(args.stat_keys) + print(f'[Runner] The script will exit after the stat info in the {max_stat_count} line is collected.') + stat_count = 0 + success_collect = False + test_output = {key: [] for key in args.stat_keys} + + failure_start_time = 0 + failure_wait_time = 3 + failure_ret_info = '' + + buffer = '' + prompt_detected = False + last_status_time = start_time = time.time() + + os.set_blocking(proc.stdout.fileno(), False) + + read_chunk = partial(os.read, proc.stdout.fileno(), 4096) + + def add_os_linesep(text): + if not text.endswith(os.linesep): + text += os.linesep + return text + + def work_on_detected(): + nonlocal prompt_detected + print(f'[Runner] Chat detected') + proc.stdin.write(add_os_linesep(test_input)) + proc.stdin.flush() + prompt_detected = True + + def update_buffer(): + try: + nonlocal buffer + while True: + data = read_chunk().decode('utf-8', errors='replace') + if not data: + break + buffer += data + except BlockingIOError: + pass # Buffer is empty + + def process_lines(lines): + nonlocal failure_start_time, failure_ret_info, stat_count, is_print_lines + for line in lines: + if is_print_lines: + print(line) + + if 'Traceback' in line and failure_ret_info == '': + failure_start_time = cur_time + failure_ret_info = f'Detect traceback, exiting...' + print(f'[Runner] Detect traceback') + print(line) + is_print_lines = True + + if 'NPU out of memory' in line: + failure_ret_info = f'Detect NPU OOM, exiting...' + + if store_output_stat(line, test_output): + stat_count += 1 + + if stat_count >= max_stat_count: + nonlocal success_collect + success_collect = True + + if not prompt_detected and 'Chat:' in line: + work_on_detected() + + def process_incomplete_line(line): + if not prompt_detected and 'Chat:' in line: + if is_print_lines: + print(line) + work_on_detected() + line = '' + return line + + while True: + cur_time = time.time() + cur_running_time = int(cur_time - start_time) + if cur_time - last_status_time > print_interval: + print(f'[Runner] Script still running (elapsed: {cur_running_time}s), task id: {script_id}') + last_status_time = cur_time + + if cur_time - start_time > timeout: + print(f'[Runner] Timeout, exiting...') + return False, f'Script execution timeout after {cur_running_time}s' + + if success_collect: + return True, test_output + + if failure_ret_info and cur_time - failure_start_time >= failure_wait_time: + return False, failure_ret_info + + rlist, _, _ = select.select([proc.stdout], [], [], 0.2) + + if rlist: + update_buffer() + + if buffer: + lines = buffer.split('\n') + + process_lines(lines[:-1]) + + buffer = process_incomplete_line(lines[-1]) + + +def kill_process(p): + try: + if p.status() == psutil.STATUS_ZOMBIE: + print(f'[Runner] Found zombie process {p.pid}, waiting parent to reap it') + os.waitpid(p.pid, os.WNOHANG) + else: + p.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + +def kill_subprocess(args): + pid = os.getpid() + self = psutil.Process(pid) + + for _ in range(10): + children = self.children(recursive=True) + if not children: + print('[Runner] Script exiting successfully.') + time.sleep(2) + return + + print(f'[Runner] Killing {len(children)} subprocesses...') + + for child in children: + kill_process(child) + + time.sleep(args.sleep_time) + + raise RuntimeError('[Runner] Subprocess exited unsuccessfully!') + + +def script_runner(script: dict[str, str], args, script_id) -> tuple[bool, Union[str, dict | Sequence]]: + max_retries = args.max_retries + script_path = script['path'] + if not os.path.isabs(script_path): + script_path = os.path.join(args.runner_path, script_path) + retries = 0 + ret_failure = [] + + while retries < max_retries: + print(f'[Runner] Running: {script_path}' + ('' if retries == 0 else f'(attempt {retries + 1}/{max_retries})')) + proc = subprocess.Popen( + ['/bin/bash', '--noprofile', '--norc', script_path], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + bufsize=4096, + text=True + ) + + stat, ret = print_script_running(proc, script, args, script_id) + + proc.stdin.close() + proc.stdout.close() + proc.wait() + + kill_subprocess(args) + + if stat: + return True, ret + + if not isinstance(ret, str): + raise AssertionError('ret must be a string when run failure.') + ret_failure.append(ret) + if 'Script execution timeout after' in ret: + print('[Runner] Timeout.') + elif 'Detect traceback, exiting...' in ret: + print('[Runner] Get traceback.') + elif 'Detect NPU OOM, exiting...' in ret: + print('[Runner] Get NPU OOM.') + else: + print(f'[Runner] Get error: {ret}') + + if args.stop_on_failure: + return False, ret_failure + + retries += 1 + if retries < max_retries: + continue + + return False, ret_failure + + return False, ('[RunnerError] max_retries must greater than 0.',) + + +def print_centered_summary(text, char='=', width=60): + if len(text) >= width - 10 - 2: + print(f'\n{char * 5} {text} {char * 5}\n', end='') + else: + padding = (width - 2 - len(text)) // 2 + left_padding = char * padding + right_padding = char * (width - len(text) - padding) + print(f'\n{left_padding} {text} {right_padding}\n', end='') + + +def print_success_output(output): + for script_name, vals in output.items(): + print_centered_summary(script_name, '-') + if len(vals) == 0: + return + len_val = len(next(iter(vals.values()))) + for idx in range(len_val): + for val in vals.values(): + print(val[idx], end='\n') + print() + + +def print_failure_output(output): + for script_name, vals in output.items(): + print_centered_summary(script_name, '-') + if len(vals) == 0: + return + for val in vals: + print(val, end='\n') + print() + + +def make_titles(scripts): + names_cnt = {} + global_cnt = 0 + titles = [] + for item in scripts: + name = item['name'] + names_cnt[name] = names_cnt.get(name, 0) + 1 + global_cnt += 1 + titles.append(f'{global_cnt} {name}' + ('' if names_cnt[name] == 1 else f' ({names_cnt[name]})')) + return titles + + +if __name__ == '__main__': + runner_path = os.path.abspath(os.path.dirname(__file__)) + '/' + root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) + '/' + + config = load_yaml_file(runner_path + 'test_runner.yaml') + settings = config['settings'] + scripts = config['scripts'] + stat_keys = config['stat_keys'] + titles = make_titles(scripts) + if len(scripts) == 0: + print('[Runner] No scripts defined.') + exit(1) + + args = SimpleNamespace() + args.stat_keys = stat_keys + args.max_retries = settings.get('max_retries', 3) + args.sleep_time = settings.get('sleep_time', 30) # second + args.is_print_lines = settings.get('is_print_lines', True) + args.print_interval = settings.get('print_interval', 30) # second + args.stop_on_failure = settings.get('stop_on_failure', False) + args.runner_path = runner_path + args.root_path = root_path + + success_output = {} + failure_output = {} + try: + for i, script in enumerate(scripts): + stat, ret = script_runner(script, args, i + 1) + if stat: + success_output[titles[i]] = ret + else: + failure_output[titles[i]] = ret + if args.stop_on_failure and not stat: + print(f'[Runner] Running {titles[i]} failed. Exit early now because stop_on_failure is set to True.') + break + except Exception as e: + if '[Runner' in e.__str__(): + print(f'{e.__str__()} \n[Runner] Detect error, the collected information will be print.') + else: + raise e + + print_centered_summary('Summary Begin') + print_success_output(success_output) + print_failure_output(failure_output) + success_failure_stat = [] + if len(success_output) > 0: + success_failure_stat.append(f'{len(success_output)} scripts execute success') + if len(failure_output) > 0: + success_failure_stat.append(f'{len(failure_output)} scripts execute failure') + print(', '.join(success_failure_stat) + '.') + print_centered_summary('Summary End') diff --git a/scripts/test_runner.yaml b/scripts/test_runner.yaml new file mode 100644 index 0000000..c80ecc8 --- /dev/null +++ b/scripts/test_runner.yaml @@ -0,0 +1,27 @@ +global_args: + 8k1k: &8k1k /home/prompts/8k1k.txt + +settings: + max_retries: 3 + stop_on_failure: false + is_print_lines: true + sleep_time: 8 + +stat_keys: + - 'prompt eval count' + - 'prompt eval duration' + - 'prompt eval rate' + - 'eval count' # 'eval count' belong to 'prompt eval count', but it is shorter, + - 'eval duration' # put shorter before longer, so match longer failure than match shorter + - 'eval rate' + +scripts: + - name: 'test_cpuinfer37.sh' + path: 'test_cpuinfer37.sh' + test_input: *8k1k + timeout: 20000 + + - name: 'test_cpuinfer49.sh' + path: 'test_cpuinfer49.sh' + test_input: *8k1k + timeout: 20000 \ No newline at end of file diff --git a/serve_test.sh b/serve_test.sh new file mode 100644 index 0000000..367ebd0 --- /dev/null +++ b/serve_test.sh @@ -0,0 +1,63 @@ +#!/bin/bash +#set -ex + +# export area +export ASDOPS_LOG_TO_FILE=0 +export ASDOPS_LOG_TO_STDOUT=0 +export ASDOPS_LOG_LEVEL=ERROR +export ATB_LOG_TO_FILE=0 +export ATB_LOG_TO_STDOUT=0 +export ATB_LOG_LEVEL=ERROR +export USE_MERGE=0 +#export PROF_DECODE=1 +#export PYTORCH_NPU_ALLOC_CONF=expandable_segments:False +export INF_NAN_MODE_FORCE_DISABLE=1 +export CAPTURE_PLUGIN_PATH=/home/x30058903/pack/npu_graph +export BLAS_NUM_THREADS=1 +# export TASK_QUEUE_ENABLE=0 +# source area +source /usr/local/Ascend/ascend-toolkit/set_env.sh +source /usr/local/Ascend/nnal/atb/set_env.sh + +# global vars +LOG_DIR="../logs" +LOG_NAME="test_cpuinfer49" +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +mkdir -p ${LOG_DIR} +LOG_FILE="${LOG_DIR}/${LOG_NAME}_${TIMESTAMP}.log" + +# torchrun \ +# --master-port 41532 \ +# --nproc_per_node 1 \ +# -m ktransformers.server.main \ +# --cpu_infer 65 \ +# --model_path /home/mount/DeepSeek-R1-q4km-w8a8 \ +# --gguf_path /home/mount/DeepSeek-R1-q4km-w8a8 \ +# --max_new_tokens 200 \ +# --use_cuda_graph \ +# --optimize_config_path ./ktransformers/optimize/optimize_rules/DeepSeek-V3-Chat-800IA2-npu.yaml \ +# 2>&1 | tee "${LOG_FILE}" + +LOG_DIR="../logs" +WS=1 +CPUINFER=65 +LOG_NAME="combime_test" +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +mkdir -p $LOG_DIR +LOG_FILE="${LOG_DIR}/${LOG_NAME}_${TIMESTAMP}.log" +echo ${LOG_FILE} +torchrun --nproc_per_node $WS \ + --master_port 6685 -m ktransformers.server.main \ + --cpu_infer $CPUINFER \ + --batch_size 1 \ + --chunk_size 16384 \ + --model_path /home/mount/DeepSeek-R1-q4km-w8a8 \ + --gguf_path /home/mount/DeepSeek-R1-q4km-w8a8 \ + --optimize_config_path ./ktransformers/optimize/optimize_rules/DeepSeek-V3-Chat-800IA2-npu.yaml \ + --port 10014 \ + --force_think \ + --use_cuda_graph \ + --max_new_tokens 2048 >&1 | tee "$LOG_FILE" + + # --gguf_path /mnt/DeepSeek-R1-BF16/ \ + diff --git a/setup.py b/setup.py index c91d9dc..212c158 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,13 @@ except ImportError: MUSA_HOME=None KTRANSFORMERS_BUILD_XPU = torch.xpu.is_available() + +try: + import torch_npu + KTRANSFORMERS_BUILD_NPU = torch_npu.npu.is_available() +except: + KTRANSFORMERS_BUILD_NPU = False + # 检测 DEV_BACKEND 环境变量 dev_backend = os.environ.get("DEV_BACKEND", "").lower() if dev_backend == "xpu": @@ -179,6 +186,8 @@ class VersionInfo: else: print("Using native cpu instruct") if sys.platform.startswith("linux"): + if KTRANSFORMERS_BUILD_NPU: + return 'aarch64' with open('/proc/cpuinfo', 'r', encoding="utf-8") as cpu_f: cpuinfo = cpu_f.read() flags_line = [line for line in cpuinfo.split( @@ -237,6 +246,8 @@ class VersionInfo: backend_version = f"rocm{self.get_rocm_bare_metal_version(ROCM_HOME)}" elif torch.xpu.is_available(): backend_version = f"xpu" + elif KTRANSFORMERS_BUILD_NPU: + backend_version = f"npu{torch_npu.__version__}" else: raise ValueError("Unsupported backend: CUDA_HOME MUSA_HOME ROCM_HOME all not set and XPU is not available.") package_version = f"{flash_version}+{backend_version}torch{torch_version}{cpu_instruct}" @@ -509,6 +520,8 @@ class CMakeBuild(BuildExtension): cmake_args += ["-DKTRANSFORMERS_USE_ROCM=ON"] elif KTRANSFORMERS_BUILD_XPU: cmake_args += ["-DKTRANSFORMERS_USE_XPU=ON", "-DKTRANSFORMERS_USE_CUDA=OFF"] + elif KTRANSFORMERS_BUILD_NPU: + cmake_args += ["-DKTRANSFORMERS_USE_NPU=ON", "-DKTRANSFORMERS_USE_CUDA=OFF"] else: raise ValueError("Unsupported backend: CUDA_HOME, MUSA_HOME, and ROCM_HOME are not set and XPU is not available.") @@ -636,10 +649,12 @@ elif MUSA_HOME is not None: ) elif torch.xpu.is_available(): #XPUExtension is not available now. ops_module = None +elif KTRANSFORMERS_BUILD_NPU: + pass else: raise ValueError("Unsupported backend: CUDA_HOME ROCM_HOME MUSA_HOME are not set and XPU is not available.") -if not torch.xpu.is_available(): +if not torch.xpu.is_available() and not KTRANSFORMERS_BUILD_NPU: ext_modules = [ CMakeExtension("cpuinfer_ext", os.fspath(Path("").resolve() / "csrc" / "ktransformers_ext")), ops_module, @@ -660,15 +675,42 @@ if not torch.xpu.is_available(): ext_modules.append( CMakeExtension("balance_serve", os.fspath(Path("").resolve()/ "csrc"/ "balance_serve")) ) -else: + + setup( + name=VersionInfo.PACKAGE_NAME, + version=VersionInfo().get_package_version(), + install_requires=triton_dep, + cmdclass={"bdist_wheel":BuildWheelsCommand ,"build_ext": CMakeBuild}, + ext_modules=ext_modules + ) + + + +elif torch.xpu.is_available(): ext_modules = [ CMakeExtension("cpuinfer_ext", os.fspath(Path("").resolve() / "csrc" / "ktransformers_ext")), ] + setup( + name=VersionInfo.PACKAGE_NAME, + version=VersionInfo().get_package_version(), + install_requires=triton_dep, + cmdclass={"bdist_wheel":BuildWheelsCommand ,"build_ext": CMakeBuild}, + ext_modules=ext_modules + ) -setup( - name=VersionInfo.PACKAGE_NAME, - version=VersionInfo().get_package_version(), - install_requires=triton_dep, - cmdclass={"bdist_wheel":BuildWheelsCommand ,"build_ext": CMakeBuild}, - ext_modules=ext_modules -) +elif KTRANSFORMERS_BUILD_NPU: + ext_modules = [ + CMakeExtension("cpuinfer_ext", os.fspath(Path("").resolve() / "csrc" / "ktransformers_ext")), + ] + if with_balance: + print("using balance_serve") + ext_modules.append( + CMakeExtension("balance_serve", os.fspath(Path("").resolve()/ "csrc"/ "balance_serve")) + ) + + setup( + name=VersionInfo.PACKAGE_NAME, + version=VersionInfo().get_package_version(), + cmdclass={"bdist_wheel":BuildWheelsCommand ,"build_ext": CMakeBuild}, + ext_modules=ext_modules + ) \ No newline at end of file