This commit is contained in:
Jianwei Dong
2025-11-25 20:52:08 +08:00
committed by GitHub
parent 2cffdf7033
commit 51745a9ea1
14 changed files with 845 additions and 48 deletions

104
.github/workflows/kt-kernel-tests.yml vendored Normal file
View File

@@ -0,0 +1,104 @@
name: PR KT-Kernel Test
on:
pull_request:
branches:
- main
- develop
types: [synchronize, labeled]
workflow_dispatch:
concurrency:
group: pr-kt-kernel-test-${{ github.ref }}
cancel-in-progress: true
jobs:
# =============================================== check changes ====================================================
check-changes:
runs-on: ubuntu-latest
outputs:
kt_kernel: ${{ steps.filter.outputs.kt_kernel }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Fail if the PR does not have the 'run-ci' label
if: github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'run-ci')
run: |
echo "This pull request does not have the 'run-ci' label. Failing the workflow."
exit 1
- name: Fail if the PR is a draft
if: github.event_name == 'pull_request' && github.event.pull_request.draft == true
run: |
echo "This pull request is a draft. Failing the workflow."
exit 1
- name: Detect file changes
id: filter
uses: dorny/paths-filter@v3
with:
filters: |
kt_kernel:
- "kt-kernel/**"
- ".github/workflows/kt-kernel-tests.yml"
# =============================================== KT-Kernel tests ====================================================
per-commit-kt-kernel-cpu:
needs: [check-changes]
if: always() && !failure() && !cancelled() &&
(needs.check-changes.outputs.kt_kernel == 'true' || github.event_name == 'workflow_dispatch')
runs-on: kt-cpu
continue-on-error: false
steps:
- name: Cleanup
run: |
sudo rm -rf $GITHUB_WORKSPACE/* || true
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: recursive
- name: Install KT-Kernel
run: |
cd kt-kernel
bash install.sh build
- name: Run KT-Kernel CPU tests
timeout-minutes: 30
run: |
cd kt-kernel/test
python3 run_suite.py --hw cpu --suite default
# =============================================== finish ====================================================
pr-test-kt-kernel-finish:
needs: [check-changes, per-commit-kt-kernel-cpu]
if: always()
runs-on: ubuntu-latest
steps:
- name: Check all dependent job statuses
run: |
# Convert the 'needs' context to a JSON string
json_needs='${{ toJson(needs) }}'
# Get a list of all job names from the JSON keys
job_names=$(echo "$json_needs" | jq -r 'keys_unsorted[]')
for job in $job_names; do
# For each job, extract its result
result=$(echo "$json_needs" | jq -r --arg j "$job" '.[$j].result')
# Print the job name and its result
echo "$job: $result"
# Check for failure or cancellation and exit if found
if [[ "$result" == "failure" || "$result" == "cancelled" ]]; then
echo "The above jobs failed."
exit 1
fi
done
# If the loop completes, all jobs were successful
echo "All jobs completed successfully"
exit 0

View File

@@ -30,7 +30,11 @@ dependencies = [
"black>=25.9.0",
]
# No optional dev group needed for formatting; using custom git hooks instead of pre-commit
[project.optional-dependencies]
test = [
"pytest>=7.0.0",
"psutil>=5.9.0",
]
[project.urls]
Homepage = "https://github.com/kvcache-ai"

27
kt-kernel/pytest.ini Normal file
View File

@@ -0,0 +1,27 @@
[pytest]
# Test paths
testpaths = test/per_commit
# File and function naming conventions
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# Markers for hardware backends
markers =
cpu: CPU backend tests (Intel AMX/AVX512/AVX2)
cuda: CUDA backend tests (NVIDIA GPUs)
amd: AMD backend tests (ROCm)
slow: Slow-running tests (>60 seconds)
requires_model: Tests requiring model files
# Output options
addopts =
-v
--tb=short
--strict-markers
# Filter warnings
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning

View File

@@ -22,6 +22,8 @@ Convert weights to INT4/INT8 format optimized for AMX inference on CPU. These qu
- **FP16**: 16-bit floating point
- **BF16**: BFloat16 format
> **⚠️ Precision Warning:** Quantizing directly from FP8 to INT4/INT8 may cause significant accuracy degradation. For best results, use the original **BF16** model as the source for INT4/INT8 quantization.
## Basic Usage
### Quantize BF16 model to INT4
@@ -213,6 +215,37 @@ python scripts/convert_gpu_weights.py \
- `--dataset`: HuggingFace dataset for calibration
- `--dataset_split`: Dataset split to use
#### Memory Management (Avoiding OOM)
GPTQ quantization requires additional GPU memory for Hessian matrix computation beyond model weights. Use `--max_gpu_memory` to limit GPU memory usage and offload remaining layers to CPU:
```bash
python scripts/convert_gpu_weights.py \
--model_id /path/to/model \
--output_dir /path/to/output \
--quant_type W4A16 \
--max_gpu_memory "40GiB"
```
**Recommended settings:**
| GPU VRAM | Suggested `--max_gpu_memory` |
|----------|------------------------------|
| 24 GiB | 14-16 GiB |
| 48 GiB | 30-35 GiB |
| 80 GiB | 50-60 GiB |
Reserve 40-50% of GPU memory for GPTQ's Hessian matrix computation.
**Options:**
- `--max_gpu_memory`: Maximum GPU memory for model weights per device (e.g., '40GiB')
- `--max_cpu_memory`: Maximum CPU memory (default: 1000GiB when `--max_gpu_memory` is set)
**Important:** llmcompressor does not support disk offloading. Ensure your machine has enough GPU + CPU memory to load the entire model. If you still encounter OOM:
1. Reduce `--num_calibration_samples` (e.g., 256)
2. Reduce `--max_sequence_length` (e.g., 1024)
3. Use `--force_cpu` to run entirely on CPU (slower but avoids GPU OOM)
### Examples
#### Example 1: Quantize Qwen3-Next-80B for Hybrid Inference (W4A16)

View File

@@ -34,42 +34,63 @@ from datasets import load_dataset
def parse_args():
parser = argparse.ArgumentParser(description="Quantize MoE models with selective quantization")
# Required arguments
parser.add_argument("--model_id", type=str, required=True, help="Path to the input model directory")
parser.add_argument("--output_dir", type=str, required=True, help="Path to save the quantized model")
parser.add_argument(
"--model_id",
type=str,
required=True,
help="Path to the input model directory"
)
parser.add_argument(
"--output_dir",
type=str,
required=True,
help="Path to save the quantized model"
)
# Optional arguments
parser.add_argument(
"--quant_type",
type=str,
choices=["W4A16", "W8A16"],
default="W8A16",
help="Quantization type: W4A16 (GPTQ4) or W8A16 (GPTQ8). Default: W8A16",
help="Quantization type: W4A16 (GPTQ4) or W8A16 (GPTQ8). Default: W8A16"
)
parser.add_argument(
"--num_calibration_samples", type=int, default=512, help="Number of calibration samples. Default: 512"
"--num_calibration_samples",
type=int,
default=512,
help="Number of calibration samples. Default: 512"
)
parser.add_argument(
"--max_sequence_length", type=int, default=2048, help="Maximum sequence length for calibration. Default: 2048"
"--max_sequence_length",
type=int,
default=2048,
help="Maximum sequence length for calibration. Default: 2048"
)
parser.add_argument(
"--dampening_frac",
type=float,
default=0.1,
help="Dampening fraction to mitigate quantization noise. Default: 0.1",
help="Dampening fraction to mitigate quantization noise. Default: 0.1"
)
parser.add_argument(
"--dataset",
type=str,
default="HuggingFaceH4/ultrachat_200k",
help="Dataset for calibration. Default: HuggingFaceH4/ultrachat_200k",
help="Dataset for calibration. Default: HuggingFaceH4/ultrachat_200k"
)
parser.add_argument(
"--dataset_split", type=str, default="train_sft", help="Dataset split to use. Default: train_sft"
"--dataset_split",
type=str,
default="train_sft",
help="Dataset split to use. Default: train_sft"
)
parser.add_argument(
"--force_cpu", action="store_true", help="Force all computations to CPU (sets CUDA_VISIBLE_DEVICES='')"
"--force_cpu",
action="store_true",
help="Force all computations to CPU (sets CUDA_VISIBLE_DEVICES='')"
)
parser.add_argument(
"--ignore_patterns",
@@ -82,22 +103,44 @@ def parse_args():
r"re:.*\.shared_expert\..*$",
r"re:.*\.shared_experts\..*$",
r"re:.*\.mlp\.shared_expert_gate$",
r"re:.*\.linear_attn\..*$",
r"re:.*\.linear_attn\..*$"
],
help="Regex patterns for layers to ignore during quantization",
help="Regex patterns for layers to ignore during quantization"
)
parser.add_argument(
"--torch_dtype",
type=str,
choices=["bfloat16", "float16", "float32"],
default="bfloat16",
help="PyTorch dtype for model loading. Default: bfloat16",
help="PyTorch dtype for model loading. Default: bfloat16"
)
parser.add_argument(
"--trust_remote_code", action="store_true", help="Allow loading of remote code (required for some models)"
"--trust_remote_code",
action="store_true",
help="Allow loading of remote code (required for some models)"
)
parser.add_argument("--random_seed", type=int, default=42, help="Random seed for dataset shuffling. Default: 42")
parser.add_argument(
"--random_seed",
type=int,
default=42,
help="Random seed for dataset shuffling. Default: 42"
)
parser.add_argument(
"--max_gpu_memory",
type=str,
default=None,
help="Maximum GPU memory for model weights per device (e.g., '40GiB'). "
"GPTQ quantization requires additional GPU memory for Hessian matrix computation, "
"so reserve 40-50%% of total VRAM. For example, use '40GiB' on 80GB GPUs. "
"Remaining layers will be offloaded to CPU. Default: use all available"
)
parser.add_argument(
"--max_cpu_memory",
type=str,
default=None,
help="Maximum CPU memory to use (e.g., '100GiB'). Default: use all available"
)
return parser.parse_args()
@@ -124,7 +167,11 @@ def get_torch_dtype(dtype_str):
Returns:
torch.dtype: Corresponding PyTorch dtype
"""
dtype_map = {"bfloat16": torch.bfloat16, "float16": torch.float16, "float32": torch.float32}
dtype_map = {
"bfloat16": torch.bfloat16,
"float16": torch.float16,
"float32": torch.float32
}
return dtype_map[dtype_str]
@@ -144,18 +191,18 @@ def check_dense_layers_and_update_ignore(model_id, ignore_patterns, trust_remote
Updated ignore_patterns list with dense layer patterns added
"""
print("🔍 Checking model configuration for dense layers...")
try:
# Load model configuration
config = AutoConfig.from_pretrained(model_id, trust_remote_code=trust_remote_code)
# Check if the model has first_k_dense_replace parameter
first_k_dense_replace = getattr(config, "first_k_dense_replace", None)
first_k_dense_replace = getattr(config, 'first_k_dense_replace', None)
if first_k_dense_replace is not None and first_k_dense_replace > 0:
print(f"✅ Found dense layers configuration: first_k_dense_replace = {first_k_dense_replace}")
print(f" Adding first {first_k_dense_replace} layers to ignore list...")
# Create regex pattern for dense layers (layers 0 to first_k_dense_replace-1)
if first_k_dense_replace == 1:
dense_pattern = r"re:model\.layers\.0\.mlp\..*$"
@@ -163,18 +210,18 @@ def check_dense_layers_and_update_ignore(model_id, ignore_patterns, trust_remote
# For multiple layers, use range pattern
layer_range = f"[0-{first_k_dense_replace-1}]"
dense_pattern = f"re:model\\.layers\\.{layer_range}\\.mlp\\..*$"
# Add the dense layer pattern to ignore list
updated_ignore_patterns = ignore_patterns + [dense_pattern]
print(f" Dense layer pattern added: {dense_pattern}")
print(f" This will ignore MLP components in layers 0-{first_k_dense_replace-1}")
return updated_ignore_patterns
else:
print(" No dense layers detected (first_k_dense_replace not found or is 0)")
return ignore_patterns
except Exception as e:
print(f"⚠️ Warning: Could not check model config for dense layers: {e}")
print(" Proceeding with original ignore patterns...")
@@ -214,7 +261,11 @@ def load_and_prepare_dataset(dataset_name, dataset_split, num_samples, max_lengt
# Tokenize the data
def tokenize(sample):
return tokenizer(
sample["text"], padding=False, max_length=max_length, truncation=True, add_special_tokens=False
sample["text"],
padding=False,
max_length=max_length,
truncation=True,
add_special_tokens=False
)
ds = ds.map(tokenize, remove_columns=ds.column_names)
@@ -255,32 +306,97 @@ def main():
# 0) Check for dense layers and update ignore patterns
# Dense layers in the first few layers should not be quantized
updated_ignore_patterns = check_dense_layers_and_update_ignore(
args.model_id, args.ignore_patterns, args.trust_remote_code
args.model_id,
args.ignore_patterns,
args.trust_remote_code
)
# --------------------------------------------------------------------
# 1) Build a dummy model (no weights) to infer a device map
# This determines optimal device placement for each module
print("🔍 Inferring device map...")
with init_empty_weights():
dummy = AutoModelForCausalLM.from_pretrained(
args.model_id, torch_dtype=torch_dtype, trust_remote_code=args.trust_remote_code
)
device_map = infer_auto_device_map(dummy, no_split_module_classes=dummy._no_split_modules)
del dummy
# Force all modules to CPU for quantization
if args.force_cpu:
device_map = {name: "cpu" for name in device_map}
# In force_cpu mode, directly get module names without calling infer_auto_device_map
# to avoid GPU memory allocation
print("🔍 Building CPU-only device map...")
with init_empty_weights():
dummy = AutoModelForCausalLM.from_pretrained(
args.model_id,
torch_dtype=torch_dtype,
trust_remote_code=args.trust_remote_code
)
device_map = {name: "cpu" for name, _ in dummy.named_modules() if name}
del dummy
else:
print("🔍 Inferring device map...")
with init_empty_weights():
dummy = AutoModelForCausalLM.from_pretrained(
args.model_id,
torch_dtype=torch_dtype,
trust_remote_code=args.trust_remote_code
)
# Build max_memory dict if specified
max_memory = None
if args.max_gpu_memory or args.max_cpu_memory:
max_memory = {}
if args.max_gpu_memory:
# Apply to all available GPUs
num_gpus = torch.cuda.device_count()
for i in range(num_gpus):
max_memory[i] = args.max_gpu_memory
print(f" GPU memory limit: {args.max_gpu_memory} per device ({num_gpus} GPUs)")
# Always set CPU memory when max_memory is used
# Otherwise infer_auto_device_map may trigger disk offloading
if args.max_cpu_memory:
max_memory["cpu"] = args.max_cpu_memory
print(f" CPU memory limit: {args.max_cpu_memory}")
else:
# Use a very large value to allow using all available CPU memory
# This prevents disk offloading when user has enough RAM
max_memory["cpu"] = "1000GiB"
print(f" CPU memory limit: 1000GiB (default, to prevent disk offloading)")
device_map = infer_auto_device_map(
dummy,
no_split_module_classes=dummy._no_split_modules,
max_memory=max_memory
)
# Check if disk offloading was triggered (not supported by llmcompressor)
disk_modules = [k for k, v in device_map.items() if v == "disk"]
if disk_modules:
print(f"❌ Error: {len(disk_modules)} modules would be offloaded to disk.")
print(" llmcompressor does not support disk offloading.")
print(" Solutions:")
print(" 1. Increase --max_gpu_memory to use more GPU memory")
print(" 2. Add --max_cpu_memory with higher value (e.g., '200GiB')")
print(" 3. Ensure your machine has enough GPU + CPU memory")
raise RuntimeError("Disk offloading is not supported by llmcompressor. "
"Please ensure you have enough GPU + CPU memory.")
del dummy
# --------------------------------------------------------------------
# 2) Load the full model weights with device mapping
# Note: offload_folder=None disables disk offloading (not supported by llmcompressor)
print("📥 Loading model...")
model = AutoModelForCausalLM.from_pretrained(
args.model_id,
device_map=device_map,
torch_dtype=torch_dtype,
trust_remote_code=args.trust_remote_code,
)
try:
model = AutoModelForCausalLM.from_pretrained(
args.model_id,
device_map=device_map,
torch_dtype=torch_dtype,
trust_remote_code=args.trust_remote_code,
offload_folder=None, # Disable disk offloading (not supported by llmcompressor)
)
except Exception as e:
if "disk" in str(e).lower() or "offload" in str(e).lower():
print(f"❌ Error: Not enough GPU + CPU memory to load the model.")
print(" llmcompressor does not support disk offloading.")
print(" Solutions:")
print(" 1. Increase --max_gpu_memory to use more GPU memory")
print(" 2. Ensure you have enough CPU RAM for remaining layers")
print(" 3. Use a machine with more memory")
raise
raise
tokenizer = AutoTokenizer.from_pretrained(args.model_id)
@@ -293,7 +409,7 @@ def main():
args.num_calibration_samples,
args.max_sequence_length,
tokenizer,
args.random_seed,
args.random_seed
)
# --------------------------------------------------------------------
@@ -331,4 +447,4 @@ def main():
if __name__ == "__main__":
main()
main()

View File

@@ -0,0 +1,5 @@
"""KT-Kernel Test Suite
This test suite is adapted from SGLang's CI testing framework.
It provides hardware-aware test registration and execution with timeout control.
"""

View File

@@ -0,0 +1 @@
"""CI test registration and execution utilities."""

View File

@@ -0,0 +1,112 @@
import ast
import warnings
from dataclasses import dataclass
from enum import Enum, auto
from typing import List
class HWBackend(Enum):
CPU = auto()
CUDA = auto()
AMD = auto()
@dataclass
class CIRegistry:
backend: HWBackend
filename: str
est_time: float
suite: str
def register_cpu_ci(est_time: float, suite: str):
pass
def register_cuda_ci(est_time: float, suite: str):
pass
def register_amd_ci(est_time: float, suite: str):
pass
REGISTER_MAPPING = {
"register_cpu_ci": HWBackend.CPU,
"register_cuda_ci": HWBackend.CUDA,
"register_amd_ci": HWBackend.AMD,
}
class RegistryVisitor(ast.NodeVisitor):
def __init__(self, filename: str):
self.filename = filename
self.registries: list[CIRegistry] = []
def _collect_ci_registry(self, func_call: ast.Call):
if not isinstance(func_call.func, ast.Name):
return None
if func_call.func.id not in REGISTER_MAPPING:
return None
hw = REGISTER_MAPPING[func_call.func.id]
est_time, suite = None, None
for kw in func_call.keywords:
if kw.arg == "est_time":
if isinstance(kw.value, ast.Constant):
est_time = kw.value.value
elif kw.arg == "suite":
if isinstance(kw.value, ast.Constant):
suite = kw.value.value
for i, arg in enumerate(func_call.args):
if isinstance(arg, ast.Constant):
if i == 0:
est_time = arg.value
elif i == 1:
suite = arg.value
assert (
est_time is not None
), "esimation_time is required and should be a constant"
assert suite is not None, "suite is required and should be a constant"
return CIRegistry(
backend=hw, filename=self.filename, est_time=est_time, suite=suite
)
def visit_Module(self, node):
for stmt in node.body:
if not isinstance(stmt, ast.Expr) or not isinstance(stmt.value, ast.Call):
continue
cr = self._collect_ci_registry(stmt.value)
if cr is not None:
self.registries.append(cr)
self.generic_visit(node)
def ut_parse_one_file(filename: str) -> List[CIRegistry]:
with open(filename, "r") as f:
file_content = f.read()
tree = ast.parse(file_content, filename=filename)
visitor = RegistryVisitor(filename=filename)
visitor.visit(tree)
return visitor.registries
def collect_tests(files: list[str], sanity_check: bool = True) -> List[CIRegistry]:
ci_tests = []
for file in files:
registries = ut_parse_one_file(file)
if len(registries) == 0:
msg = f"No CI registry found in {file}"
if sanity_check:
raise ValueError(msg)
else:
warnings.warn(msg)
continue
ci_tests.extend(registries)
return ci_tests

View File

@@ -0,0 +1,171 @@
import os
import subprocess
import threading
import time
from dataclasses import dataclass
from typing import Callable, List, Optional
import psutil, signal, sys
def kill_process_tree(parent_pid, include_parent: bool = True, skip_pid: int = None):
"""Kill the process and all its child processes."""
# Remove sigchld handler to avoid spammy logs.
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
if parent_pid is None:
parent_pid = os.getpid()
include_parent = False
try:
itself = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = itself.children(recursive=True)
for child in children:
if child.pid == skip_pid:
continue
try:
child.kill()
except psutil.NoSuchProcess:
pass
if include_parent:
try:
if parent_pid == os.getpid():
itself.kill()
sys.exit(0)
itself.kill()
# Sometime processes cannot be killed with SIGKILL (e.g, PID=1 launched by kubernetes),
# so we send an additional signal to kill them.
itself.send_signal(signal.SIGQUIT)
except psutil.NoSuchProcess:
pass
@dataclass
class TestFile:
name: str
estimated_time: float = 60
def run_with_timeout(
func: Callable,
args: tuple = (),
kwargs: Optional[dict] = None,
timeout: float = None,
):
"""Run a function with timeout."""
ret_value = []
def _target_func():
ret_value.append(func(*args, **(kwargs or {})))
t = threading.Thread(target=_target_func)
t.start()
t.join(timeout=timeout)
if t.is_alive():
raise TimeoutError()
if not ret_value:
raise RuntimeError()
return ret_value[0]
def run_unittest_files(
files: List[TestFile], timeout_per_file: float, continue_on_error: bool = False
):
"""
Run a list of test files.
Args:
files: List of TestFile objects to run
timeout_per_file: Timeout in seconds for each test file
continue_on_error: If True, continue running remaining tests even if one fails.
If False, stop at first failure (default behavior for PR tests).
"""
tic = time.perf_counter()
success = True
passed_tests = []
failed_tests = []
for i, file in enumerate(files):
filename, estimated_time = file.name, file.estimated_time
process = None
def run_one_file(filename):
nonlocal process
filename = os.path.join(os.getcwd(), filename)
print(
f".\n.\nBegin ({i}/{len(files) - 1}):\npython3 {filename}\n.\n.\n",
flush=True,
)
tic = time.perf_counter()
process = subprocess.Popen(
["python3", filename], stdout=None, stderr=None, env=os.environ
)
process.wait()
elapsed = time.perf_counter() - tic
print(
f".\n.\nEnd ({i}/{len(files) - 1}):\n{filename=}, {elapsed=:.0f}, {estimated_time=}\n.\n.\n",
flush=True,
)
return process.returncode
try:
ret_code = run_with_timeout(
run_one_file, args=(filename,), timeout=timeout_per_file
)
if ret_code != 0:
print(
f"\n✗ FAILED: {filename} returned exit code {ret_code}\n",
flush=True,
)
success = False
failed_tests.append((filename, f"exit code {ret_code}"))
if not continue_on_error:
# Stop at first failure for PR tests
break
# Otherwise continue to next test for nightly tests
else:
passed_tests.append(filename)
except TimeoutError:
kill_process_tree(process.pid)
time.sleep(5)
print(
f"\n✗ TIMEOUT: {filename} after {timeout_per_file} seconds\n",
flush=True,
)
success = False
failed_tests.append((filename, f"timeout after {timeout_per_file}s"))
if not continue_on_error:
# Stop at first timeout for PR tests
break
# Otherwise continue to next test for nightly tests
if success:
print(f"Success. Time elapsed: {time.perf_counter() - tic:.2f}s", flush=True)
else:
print(f"Fail. Time elapsed: {time.perf_counter() - tic:.2f}s", flush=True)
# Print summary
print(f"\n{'='*60}", flush=True)
print(f"Test Summary: {len(passed_tests)}/{len(files)} passed", flush=True)
print(f"{'='*60}", flush=True)
if passed_tests:
print("✓ PASSED:", flush=True)
for test in passed_tests:
print(f" {test}", flush=True)
if failed_tests:
print("\n✗ FAILED:", flush=True)
for test, reason in failed_tests:
print(f" {test} ({reason})", flush=True)
print(f"{'='*60}\n", flush=True)
return 0 if success else -1

View File

@@ -0,0 +1,4 @@
"""Per-commit tests for KT-Kernel.
Tests in this directory are run on every commit in CI.
"""

View File

@@ -0,0 +1,36 @@
"""AMD/ROCm backend tests for KT-Kernel (Placeholder).
This file is a placeholder for future AMD/ROCm backend tests.
Currently, KT-Kernel focuses on CPU optimizations (Intel AMX/AVX512).
To implement AMD tests:
1. Add actual test functions with @pytest.mark.amd
2. Update the estimated time in register_amd_ci()
3. Implement AMD/ROCm-specific initialization and validation tests
"""
import os
import sys
# Add parent directory to path for CI registration
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from ci.ci_register import register_amd_ci
# Register this test for AMD CI (estimated time: 10 seconds, placeholder)
# Update suite name when implementing: currently using "stage-a-test-1"
register_amd_ci(est_time=10, suite="stage-a-test-1")
def test_amd_placeholder():
"""Placeholder test for AMD/ROCm backend.
TODO: Implement actual AMD/ROCm tests when AMD support is added to kt-kernel.
"""
# Currently a no-op placeholder
pass
if __name__ == "__main__":
# Allow running standalone (required by test runner)
print("⚠ AMD/ROCm tests are not yet implemented (placeholder)")
print("✓ Placeholder test passed")

View File

@@ -0,0 +1,80 @@
"""Basic CPU backend tests for KT-Kernel.
These tests verify basic functionality without requiring model files.
"""
import os
import sys
import pytest
# Add parent directory to path for CI registration
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from ci.ci_register import register_cpu_ci
# Register this test for CPU CI with estimated runtime of 30 seconds
register_cpu_ci(est_time=30, suite="default")
# Check if kt_kernel_ext is available
try:
import kt_kernel_ext
HAS_KT_KERNEL = True
except ImportError:
HAS_KT_KERNEL = False
kt_kernel_ext = None
@pytest.mark.cpu
def test_kt_kernel_import():
"""Test that kt_kernel_ext can be imported."""
if not HAS_KT_KERNEL:
pytest.skip("kt_kernel_ext not built or available")
assert kt_kernel_ext is not None, "kt_kernel_ext module should be importable"
@pytest.mark.cpu
def test_cpu_infer_initialization():
"""Test that CPUInfer can be initialized."""
if not HAS_KT_KERNEL:
pytest.skip("kt_kernel_ext not built or available")
# Initialize CPUInfer with 4 threads
cpuinfer = kt_kernel_ext.CPUInfer(4)
assert cpuinfer is not None, "CPUInfer should be initialized successfully"
@pytest.mark.cpu
def test_basic_module_attributes():
"""Test that kt_kernel_ext has expected attributes."""
if not HAS_KT_KERNEL:
pytest.skip("kt_kernel_ext not built or available")
# Check for key attributes/functions
assert hasattr(kt_kernel_ext, 'CPUInfer'), "kt_kernel_ext should have CPUInfer class"
def run_all_tests():
"""Run all tests in this file (for standalone execution)."""
if not HAS_KT_KERNEL:
print("⚠ kt_kernel_ext not available, skipping tests")
return
try:
test_kt_kernel_import()
print("✓ test_kt_kernel_import passed")
test_cpu_infer_initialization()
print("✓ test_cpu_infer_initialization passed")
test_basic_module_attributes()
print("✓ test_basic_module_attributes passed")
print("\n✓ All tests passed!")
except Exception as e:
print(f"\n✗ Test failed: {e}")
sys.exit(1)
if __name__ == "__main__":
# Allow running standalone (required by test runner)
run_all_tests()

View File

@@ -0,0 +1,36 @@
"""CUDA backend tests for KT-Kernel (Placeholder).
This file is a placeholder for future CUDA backend tests.
Currently, KT-Kernel focuses on CPU optimizations (Intel AMX/AVX512).
To implement CUDA tests:
1. Add actual test functions with @pytest.mark.cuda
2. Update the estimated time in register_cuda_ci()
3. Implement CUDA-specific initialization and validation tests
"""
import os
import sys
# Add parent directory to path for CI registration
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from ci.ci_register import register_cuda_ci
# Register this test for CUDA CI (estimated time: 10 seconds, placeholder)
# Update suite name when implementing: currently using "stage-a-test-1"
register_cuda_ci(est_time=10, suite="stage-a-test-1")
def test_cuda_placeholder():
"""Placeholder test for CUDA backend.
TODO: Implement actual CUDA tests when CUDA support is added to kt-kernel.
"""
# Currently a no-op placeholder
pass
if __name__ == "__main__":
# Allow running standalone (required by test runner)
print("⚠ CUDA tests are not yet implemented (placeholder)")
print("✓ Placeholder test passed")

View File

@@ -0,0 +1,68 @@
import argparse
import glob
from typing import List
from ci.ci_register import HWBackend, CIRegistry, collect_tests
from ci.ci_utils import TestFile, run_unittest_files
HW_MAPPING = {
"cpu": HWBackend.CPU,
"cuda": HWBackend.CUDA,
"amd": HWBackend.AMD,
}
LABEL_MAPPING = {
HWBackend.CPU: ["default"],
HWBackend.AMD: ["stage-a-test-1"],
HWBackend.CUDA: ["stage-a-test-1"],
}
def _filter_tests(
ci_tests: List[CIRegistry], hw: HWBackend, suite: str
) -> List[CIRegistry]:
ci_tests = [t for t in ci_tests if t.backend == hw]
ret = []
for t in ci_tests:
assert t.suite in LABEL_MAPPING[hw], f"Unknown stage {t.suite} for backend {hw}"
if t.suite == suite:
ret.append(t)
return ret
def run_per_commit(hw: HWBackend, suite: str):
files = glob.glob("per_commit/**/*.py", recursive=True)
# Exclude __init__.py files as they don't contain test registrations
files = [f for f in files if not f.endswith("__init__.py")]
ci_tests = _filter_tests(collect_tests(files), hw, suite)
test_files = [TestFile(t.filename, t.est_time) for t in ci_tests]
run_unittest_files(
test_files,
timeout_per_file=1200,
continue_on_error=False,
)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--hw",
type=str,
choices=["cpu", "cuda", "amd"],
required=True,
help="Hardware backend to run tests on.",
)
parser.add_argument(
"--suite",
type=str,
required=True,
help="Test suite to run.",
)
args = parser.parse_args()
hw = HW_MAPPING[args.hw]
run_per_commit(hw, args.suite)
if __name__ == "__main__":
main()