From fef6dd98a8cca4fe24bccd5f010b952a9f628769 Mon Sep 17 00:00:00 2001 From: Jianwei Dong Date: Thu, 27 Nov 2025 10:56:39 +0800 Subject: [PATCH] add accuracy and performance test (#1643) --- .github/workflows/kt-kernel-tests.yml | 2 +- .../per_commit/test_moe_amx_accuracy_int4.py | 212 ++++++++++++ .../test_moe_amx_accuracy_int4_1.py | 212 ++++++++++++ .../test_moe_amx_accuracy_int4_1k.py | 216 ++++++++++++ .../per_commit/test_moe_amx_accuracy_int8.py | 210 ++++++++++++ .../per_commit/test_moe_amx_bench_int4.py | 313 +++++++++++++++++ .../per_commit/test_moe_amx_bench_int4_1.py | 313 +++++++++++++++++ .../per_commit/test_moe_amx_bench_int4_1k.py | 320 ++++++++++++++++++ .../per_commit/test_moe_amx_bench_int8.py | 313 +++++++++++++++++ 9 files changed, 2110 insertions(+), 1 deletion(-) create mode 100644 kt-kernel/test/per_commit/test_moe_amx_accuracy_int4.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1k.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_accuracy_int8.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_bench_int4.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_bench_int4_1.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_bench_int4_1k.py create mode 100644 kt-kernel/test/per_commit/test_moe_amx_bench_int8.py diff --git a/.github/workflows/kt-kernel-tests.yml b/.github/workflows/kt-kernel-tests.yml index de5d362..f4b1e66 100644 --- a/.github/workflows/kt-kernel-tests.yml +++ b/.github/workflows/kt-kernel-tests.yml @@ -66,7 +66,7 @@ jobs: bash install.sh build - name: Run KT-Kernel CPU tests - timeout-minutes: 30 + timeout-minutes: 60 run: | cd kt-kernel/test python3 run_suite.py --hw cpu --suite default diff --git a/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4.py b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4.py new file mode 100644 index 0000000..f6d5500 --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT4 accuracy tests for KT-Kernel. + +Tests accuracy of AMX-accelerated INT4 MOE operations against torch reference. +""" + +import os +import sys +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 120 seconds +register_cpu_ci(est_time=120, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original test_moe_amx.py) +expert_num = 256 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +qlen = 1 +layer_num = 1 +validation_iter = 2 +physical_to_logical_map = None + + +def act_fn(x): + """Activation function for MoE.""" + return x / (1.0 + torch.exp(-x)) + + +def mlp_torch(input, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MLP.""" + gate_buf = torch.mm(input, gate_proj.t()) + up_buf = torch.mm(input, up_proj.t()) + intermediate = act_fn(gate_buf) * up_buf + ret = torch.mm(intermediate, down_proj.t()) + return ret + + +def moe_torch(input, expert_ids, weights, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MoE.""" + cnts = expert_ids.new_zeros((expert_ids.shape[0], expert_num)) + cnts.scatter_(1, expert_ids, 1) + tokens_per_expert = cnts.sum(dim=0) + idxs = expert_ids.view(-1).argsort() + sorted_tokens = input[idxs // expert_ids.shape[1]] + + outputs = [] + start_idx = 0 + + for i, num_tokens in enumerate(tokens_per_expert): + end_idx = start_idx + num_tokens + if num_tokens == 0: + continue + tokens_for_this_expert = sorted_tokens[start_idx:end_idx] + expert_out = mlp_torch( + tokens_for_this_expert, gate_proj[i], up_proj[i], down_proj[i] + ) + outputs.append(expert_out) + start_idx = end_idx + + outs = torch.cat(outputs, dim=0) if len(outputs) else sorted_tokens.new_empty(0) + + new_x = torch.empty_like(outs) + new_x[idxs] = outs + t_output = ( + new_x.view(*expert_ids.shape, -1) + .type(weights.dtype) + .mul_(weights.unsqueeze(dim=-1)) + .sum(dim=1) + .type(new_x.dtype) + ) + + return t_output + + +@pytest.mark.cpu +def test_moe_amx_int4_accuracy(): + """Test AMX INT4 MOE accuracy against PyTorch reference implementation.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + global physical_to_logical_map + physical_to_logical_map = torch.tensor( + data=range(expert_num), device="cpu", dtype=torch.int64 + ).contiguous() + + CPUInfer = kt_kernel_ext.CPUInfer(90) + + with torch.inference_mode(mode=True): + # Initialize MoE layers + gate_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn( + (expert_num, hidden_size, intermediate_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + + # Create MOE config + config = kt_kernel_ext.moe.MOEConfig( + expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0 + ) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.gate_scale = 0 + config.pool = CPUInfer.backend_ + + # Initialize INT4 MOE + moe = kt_kernel_ext.moe.AMXInt4_MOE(config) + CPUInfer.submit(moe.load_weights_task(physical_to_logical_map.data_ptr())) + CPUInfer.sync() + CPUInfer.submit(moe.warm_up_task()) + CPUInfer.sync() + + # Run validation iterations + for i in range(validation_iter): + bsz_tensor = torch.tensor([qlen], device="cpu") + expert_ids = torch.stack( + [torch.randperm(expert_num)[:num_experts_per_tok] for _ in range(qlen)] + ).contiguous() + weights = torch.rand((qlen, num_experts_per_tok), dtype=torch.float32).contiguous() + input_data = torch.randn((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + output = torch.empty((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + input_data = input_data / 100 + + # Run AMX MOE + CPUInfer.submit( + moe.forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids.data_ptr(), + weights.data_ptr(), + input_data.data_ptr(), + output.data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Run torch reference + t_output = moe_torch( + input_data, expert_ids, weights, gate_proj, up_proj, down_proj + ) + + # Calculate relative difference + diff = torch.mean(torch.abs(output - t_output)) / torch.mean( + torch.abs(t_output) + ) + print(f"Iteration {i}, diff = {diff:.6f}") + + # INT4 should have diff < 0.35 + assert diff < 0.35, f"INT4 accuracy test failed: diff={diff:.6f} >= 0.35" + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT4 accuracy tests") + return + + try: + print("Running AMX MOE INT4 accuracy test...") + test_moe_amx_int4_accuracy() + print("✓ AMX MOE INT4 accuracy test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1.py b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1.py new file mode 100644 index 0000000..49d551e --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT4_1 accuracy tests for KT-Kernel. + +Tests accuracy of AMX-accelerated INT4_1 MOE operations against torch reference. +""" + +import os +import sys +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 120 seconds +register_cpu_ci(est_time=120, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original test_moe_amx.py) +expert_num = 256 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +qlen = 1 +layer_num = 1 +validation_iter = 2 +physical_to_logical_map = None + + +def act_fn(x): + """Activation function for MoE.""" + return x / (1.0 + torch.exp(-x)) + + +def mlp_torch(input, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MLP.""" + gate_buf = torch.mm(input, gate_proj.t()) + up_buf = torch.mm(input, up_proj.t()) + intermediate = act_fn(gate_buf) * up_buf + ret = torch.mm(intermediate, down_proj.t()) + return ret + + +def moe_torch(input, expert_ids, weights, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MoE.""" + cnts = expert_ids.new_zeros((expert_ids.shape[0], expert_num)) + cnts.scatter_(1, expert_ids, 1) + tokens_per_expert = cnts.sum(dim=0) + idxs = expert_ids.view(-1).argsort() + sorted_tokens = input[idxs // expert_ids.shape[1]] + + outputs = [] + start_idx = 0 + + for i, num_tokens in enumerate(tokens_per_expert): + end_idx = start_idx + num_tokens + if num_tokens == 0: + continue + tokens_for_this_expert = sorted_tokens[start_idx:end_idx] + expert_out = mlp_torch( + tokens_for_this_expert, gate_proj[i], up_proj[i], down_proj[i] + ) + outputs.append(expert_out) + start_idx = end_idx + + outs = torch.cat(outputs, dim=0) if len(outputs) else sorted_tokens.new_empty(0) + + new_x = torch.empty_like(outs) + new_x[idxs] = outs + t_output = ( + new_x.view(*expert_ids.shape, -1) + .type(weights.dtype) + .mul_(weights.unsqueeze(dim=-1)) + .sum(dim=1) + .type(new_x.dtype) + ) + + return t_output + + +@pytest.mark.cpu +def test_moe_amx_int4_1_accuracy(): + """Test AMX INT4_1 MOE accuracy against PyTorch reference implementation.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + global physical_to_logical_map + physical_to_logical_map = torch.tensor( + data=range(expert_num), device="cpu", dtype=torch.int64 + ).contiguous() + + CPUInfer = kt_kernel_ext.CPUInfer(90) + + with torch.inference_mode(mode=True): + # Initialize MoE layers + gate_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn( + (expert_num, hidden_size, intermediate_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + + # Create MOE config + config = kt_kernel_ext.moe.MOEConfig( + expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0 + ) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.gate_scale = 0 + config.pool = CPUInfer.backend_ + + # Initialize INT4_1 MOE + moe = kt_kernel_ext.moe.AMXInt4_1_MOE(config) + CPUInfer.submit(moe.load_weights_task(physical_to_logical_map.data_ptr())) + CPUInfer.sync() + CPUInfer.submit(moe.warm_up_task()) + CPUInfer.sync() + + # Run validation iterations + for i in range(validation_iter): + bsz_tensor = torch.tensor([qlen], device="cpu") + expert_ids = torch.stack( + [torch.randperm(expert_num)[:num_experts_per_tok] for _ in range(qlen)] + ).contiguous() + weights = torch.rand((qlen, num_experts_per_tok), dtype=torch.float32).contiguous() + input_data = torch.randn((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + output = torch.empty((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + input_data = input_data / 100 + + # Run AMX MOE + CPUInfer.submit( + moe.forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids.data_ptr(), + weights.data_ptr(), + input_data.data_ptr(), + output.data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Run torch reference + t_output = moe_torch( + input_data, expert_ids, weights, gate_proj, up_proj, down_proj + ) + + # Calculate relative difference + diff = torch.mean(torch.abs(output - t_output)) / torch.mean( + torch.abs(t_output) + ) + print(f"Iteration {i}, diff = {diff:.6f}") + + # INT4_1 should have diff < 0.35 + assert diff < 0.35, f"INT4_1 accuracy test failed: diff={diff:.6f} >= 0.35" + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT4_1 accuracy tests") + return + + try: + print("Running AMX MOE INT4_1 accuracy test...") + test_moe_amx_int4_1_accuracy() + print("✓ AMX MOE INT4_1 accuracy test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1k.py b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1k.py new file mode 100644 index 0000000..1eac2a4 --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int4_1k.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT4_1K accuracy tests for KT-Kernel. + +Tests accuracy of AMX-accelerated INT4_1K group quantization MOE operations against torch reference. +""" + +import os +import sys +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 120 seconds +register_cpu_ci(est_time=120, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original test_moe_amx.py) +expert_num = 256 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +qlen = 1 +layer_num = 1 +validation_iter = 2 +k_group_size = 64 +physical_to_logical_map = None + + +def act_fn(x): + """Activation function for MoE.""" + return x / (1.0 + torch.exp(-x)) + + +def mlp_torch(input, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MLP.""" + gate_buf = torch.mm(input, gate_proj.t()) + up_buf = torch.mm(input, up_proj.t()) + intermediate = act_fn(gate_buf) * up_buf + ret = torch.mm(intermediate, down_proj.t()) + return ret + + +def moe_torch(input, expert_ids, weights, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MoE.""" + cnts = expert_ids.new_zeros((expert_ids.shape[0], expert_num)) + cnts.scatter_(1, expert_ids, 1) + tokens_per_expert = cnts.sum(dim=0) + idxs = expert_ids.view(-1).argsort() + sorted_tokens = input[idxs // expert_ids.shape[1]] + + outputs = [] + start_idx = 0 + + for i, num_tokens in enumerate(tokens_per_expert): + end_idx = start_idx + num_tokens + if num_tokens == 0: + continue + tokens_for_this_expert = sorted_tokens[start_idx:end_idx] + expert_out = mlp_torch( + tokens_for_this_expert, gate_proj[i], up_proj[i], down_proj[i] + ) + outputs.append(expert_out) + start_idx = end_idx + + outs = torch.cat(outputs, dim=0) if len(outputs) else sorted_tokens.new_empty(0) + + new_x = torch.empty_like(outs) + new_x[idxs] = outs + t_output = ( + new_x.view(*expert_ids.shape, -1) + .type(weights.dtype) + .mul_(weights.unsqueeze(dim=-1)) + .sum(dim=1) + .type(new_x.dtype) + ) + + return t_output + + +@pytest.mark.cpu +def test_moe_amx_int4_1k_accuracy(): + """Test AMX INT4_1K MOE accuracy against PyTorch reference implementation.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + global physical_to_logical_map + physical_to_logical_map = torch.tensor( + data=range(expert_num), device="cpu", dtype=torch.int64 + ).contiguous() + + CPUInfer = kt_kernel_ext.CPUInfer(90) + + with torch.inference_mode(mode=True): + # Initialize MoE layers + gate_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn( + (expert_num, hidden_size, intermediate_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + + # Create MOE config + config = kt_kernel_ext.moe.MOEConfig( + expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0 + ) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.gate_scale = 0 + config.pool = CPUInfer.backend_ + + # Configure INT4_1K quantization settings + config.quant_config.bits = 4 + config.quant_config.group_size = k_group_size + config.quant_config.zero_point = True + + # Initialize INT4_1K MOE + moe = kt_kernel_ext.moe.AMXInt4_1KGroup_MOE(config) + CPUInfer.submit(moe.load_weights_task(physical_to_logical_map.data_ptr())) + CPUInfer.sync() + + # Run validation iterations + for i in range(validation_iter): + bsz_tensor = torch.tensor([qlen], device="cpu") + expert_ids = torch.stack( + [torch.randperm(expert_num)[:num_experts_per_tok] for _ in range(qlen)] + ).contiguous() + weights = torch.rand((qlen, num_experts_per_tok), dtype=torch.float32).contiguous() + input_data = torch.randn((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + output = torch.empty((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + input_data = input_data / 100 + + # Run AMX MOE + CPUInfer.submit( + moe.forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids.data_ptr(), + weights.data_ptr(), + input_data.data_ptr(), + output.data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Run torch reference + t_output = moe_torch( + input_data, expert_ids, weights, gate_proj, up_proj, down_proj + ) + + # Calculate relative difference + diff = torch.mean(torch.abs(output - t_output)) / torch.mean( + torch.abs(t_output) + ) + print(f"Iteration {i}, diff = {diff:.6f}") + + # INT4_1K should have diff < 0.35 + assert diff < 0.35, f"INT4_1K accuracy test failed: diff={diff:.6f} >= 0.35" + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT4_1K accuracy tests") + return + + try: + print("Running AMX MOE INT4_1K accuracy test...") + test_moe_amx_int4_1k_accuracy() + print("✓ AMX MOE INT4_1K accuracy test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_accuracy_int8.py b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int8.py new file mode 100644 index 0000000..66c71f5 --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_accuracy_int8.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT8 accuracy tests for KT-Kernel. + +Tests accuracy of AMX-accelerated INT8 MOE operations against torch reference. +""" + +import os +import sys +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 120 seconds +register_cpu_ci(est_time=120, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original test_moe_amx.py) +expert_num = 256 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +qlen = 1 +layer_num = 1 +validation_iter = 2 +physical_to_logical_map = None + + +def act_fn(x): + """Activation function for MoE.""" + return x / (1.0 + torch.exp(-x)) + + +def mlp_torch(input, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MLP.""" + gate_buf = torch.mm(input, gate_proj.t()) + up_buf = torch.mm(input, up_proj.t()) + intermediate = act_fn(gate_buf) * up_buf + ret = torch.mm(intermediate, down_proj.t()) + return ret + + +def moe_torch(input, expert_ids, weights, gate_proj, up_proj, down_proj): + """PyTorch reference implementation of MoE.""" + cnts = expert_ids.new_zeros((expert_ids.shape[0], expert_num)) + cnts.scatter_(1, expert_ids, 1) + tokens_per_expert = cnts.sum(dim=0) + idxs = expert_ids.view(-1).argsort() + sorted_tokens = input[idxs // expert_ids.shape[1]] + + outputs = [] + start_idx = 0 + + for i, num_tokens in enumerate(tokens_per_expert): + end_idx = start_idx + num_tokens + if num_tokens == 0: + continue + tokens_for_this_expert = sorted_tokens[start_idx:end_idx] + expert_out = mlp_torch( + tokens_for_this_expert, gate_proj[i], up_proj[i], down_proj[i] + ) + outputs.append(expert_out) + start_idx = end_idx + + outs = torch.cat(outputs, dim=0) if len(outputs) else sorted_tokens.new_empty(0) + + new_x = torch.empty_like(outs) + new_x[idxs] = outs + t_output = ( + new_x.view(*expert_ids.shape, -1) + .type(weights.dtype) + .mul_(weights.unsqueeze(dim=-1)) + .sum(dim=1) + .type(new_x.dtype) + ) + + return t_output + + +@pytest.mark.cpu +def test_moe_amx_int8_accuracy(): + """Test AMX INT8 MOE accuracy against PyTorch reference implementation.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + global physical_to_logical_map + physical_to_logical_map = torch.tensor( + data=range(expert_num), device="cpu", dtype=torch.int64 + ).contiguous() + + CPUInfer = kt_kernel_ext.CPUInfer(90) + + with torch.inference_mode(mode=True): + # Initialize MoE layers + gate_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn( + (expert_num, intermediate_size, hidden_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn( + (expert_num, hidden_size, intermediate_size), + dtype=torch.bfloat16, + device="cuda", + ) + .to("cpu") + .contiguous() + ) + + # Create MOE config + config = kt_kernel_ext.moe.MOEConfig( + expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0 + ) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.gate_scale = 0 + config.pool = CPUInfer.backend_ + + # Initialize INT8 MOE + moe = kt_kernel_ext.moe.AMXInt8_MOE(config) + CPUInfer.submit(moe.load_weights_task(physical_to_logical_map.data_ptr())) + CPUInfer.sync() + + # Run validation iterations + for i in range(validation_iter): + bsz_tensor = torch.tensor([qlen], device="cpu") + expert_ids = torch.stack( + [torch.randperm(expert_num)[:num_experts_per_tok] for _ in range(qlen)] + ).contiguous() + weights = torch.rand((qlen, num_experts_per_tok), dtype=torch.float32).contiguous() + input_data = torch.randn((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + output = torch.empty((qlen, hidden_size), dtype=torch.bfloat16).contiguous() + input_data = input_data / 100 + + # Run AMX MOE + CPUInfer.submit( + moe.forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids.data_ptr(), + weights.data_ptr(), + input_data.data_ptr(), + output.data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Run torch reference + t_output = moe_torch( + input_data, expert_ids, weights, gate_proj, up_proj, down_proj + ) + + # Calculate relative difference + diff = torch.mean(torch.abs(output - t_output)) / torch.mean( + torch.abs(t_output) + ) + print(f"Iteration {i}, diff = {diff:.6f}") + + # INT8 should have diff < 0.05 + assert diff < 0.05, f"INT8 accuracy test failed: diff={diff:.6f} >= 0.05" + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT8 accuracy tests") + return + + try: + print("Running AMX MOE INT8 accuracy test...") + test_moe_amx_int8_accuracy() + print("✓ AMX MOE INT8 accuracy test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_bench_int4.py b/kt-kernel/test/per_commit/test_moe_amx_bench_int4.py new file mode 100644 index 0000000..e6267ab --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_bench_int4.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT4 benchmark tests for KT-Kernel. + +Benchmarks performance (bandwidth and FLOPS) of AMX-accelerated INT4 MOE operations. +""" + +import os +import sys +import time +import json +import subprocess +import platform +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 300 seconds +register_cpu_ci(est_time=300, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + from tqdm import tqdm + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original bench_moe_amx.py) +expert_num = 16 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +layer_num = 2 +qlen = 2048 +warm_up_iter = 1000 +test_iter = 2000 + +# Worker configuration +worker_config_dict = { + "subpool_count": 2, + "subpool_numa_map": [0, 1], + "subpool_thread_count": [45, 45], +} +CPUINFER_PARAM = 90 + + +def get_git_commit(): + """Get current git commit information.""" + result = {} + try: + commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() + commit_msg = subprocess.check_output(["git", "log", "-1", "--pretty=%B"]).decode("utf-8").strip() + result["commit"] = commit + result["commit_message"] = commit_msg + + dirty_output = subprocess.check_output(["git", "status", "--porcelain"]).decode("utf-8").strip() + if dirty_output: + result["dirty"] = True + result["dirty_files"] = dirty_output.splitlines() + else: + result["dirty"] = False + except Exception as e: + result["commit"] = None + result["commit_message"] = None + result["dirty"] = None + result["error"] = str(e) + return result + + +def get_system_info(): + """Get system information including CPU model, memory, cores, and sockets.""" + info = {} + uname = platform.uname() + info["system_name"] = uname.system + info["node_name"] = uname.node + + # Get CPU model (Linux only) + cpu_model = None + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "model name" in line: + cpu_model = line.split(":", 1)[1].strip() + break + except Exception as e: + cpu_model = f"Error: {e}" + info["cpu_model"] = cpu_model + + # Get memory size in GB (Linux only) + mem_total_gb = None + if os.path.exists("/proc/meminfo"): + try: + with open("/proc/meminfo", "r") as f: + for line in f: + if "MemTotal" in line: + mem_kb = float(line.split(":", 1)[1].split()[0]) + mem_total_gb = round(mem_kb / (1024 * 1024), 2) + break + except Exception as e: + mem_total_gb = f"Error: {e}" + info["memory_size_GB"] = mem_total_gb + + # Get CPU core count + info["cpu_core_count"] = os.cpu_count() + + # Get socket count + sockets = set() + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "physical id" in line: + sockets.add(line.split(":", 1)[1].strip()) + except Exception as e: + sockets = set() + info["cpu_socket_count"] = len(sockets) if len(sockets) > 0 else 1 + + return info + + +def record_results(result, filename): + """Append results to JSONL file.""" + with open(filename, "a") as f: + f.write(json.dumps(result) + "\n") + + +@pytest.mark.cpu +def test_moe_amx_int4_benchmark(): + """Benchmark AMX INT4 MOE performance.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + quant_mode = "int4" + bytes_per_elem = 0.5 + + # Setup output file + script_dir = os.path.dirname(os.path.abspath(__file__)) + json_path = os.path.join(script_dir, "bench_moe_amx_int4.jsonl") + + with torch.inference_mode(): + # Initialize CPUInfer with worker config + worker_config = kt_kernel_ext.WorkerPoolConfig() + worker_config.subpool_count = worker_config_dict["subpool_count"] + worker_config.subpool_numa_map = worker_config_dict["subpool_numa_map"] + worker_config.subpool_thread_count = worker_config_dict["subpool_thread_count"] + CPUInfer = kt_kernel_ext.CPUInfer(worker_config) + + # Initialize MOE layers + moes = [] + for layer_index in range(layer_num): + gate_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn((expert_num, hidden_size, intermediate_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + config = kt_kernel_ext.moe.MOEConfig(expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.pool = CPUInfer.backend_ + + moe = kt_kernel_ext.moe.AMXInt4_MOE(config) + CPUInfer.submit(moe.load_weights_task()) + CPUInfer.sync() + moes.append(moe) + + # Generate test data + gen_iter = 3000 + expert_ids = ( + torch.rand(gen_iter * qlen, expert_num, device="cpu") + .argsort(dim=-1)[:, :num_experts_per_tok] + .reshape(gen_iter, qlen * num_experts_per_tok) + .to("cpu") + .contiguous() + ) + weights = ( + torch.rand((gen_iter, qlen, num_experts_per_tok), dtype=torch.float32, device="cpu").to("cpu").contiguous() + ) + input_tensor = ( + torch.randn((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + output_tensor = ( + torch.empty((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + bsz_tensor = torch.tensor([qlen], device="cpu") + + # Warm-up iterations + print(f"Running warm-up for {warm_up_iter} iterations...") + for i in tqdm(range(warm_up_iter), desc="Warm-up"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Test iterations + print(f"Running test for {test_iter} iterations...") + start = time.perf_counter() + for i in tqdm(range(test_iter), desc="Testing"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + end = time.perf_counter() + total_time = end - start + + # Calculate performance metrics + time_per_iter_us = total_time / test_iter * 1e6 + bandwidth = ( + hidden_size + * intermediate_size + * 3 + * num_experts_per_tok + * (1 / 8 * 256 * (1 - (31 / 32) ** qlen)) + * bytes_per_elem + * test_iter + / total_time + / 1e9 + ) # GB/s + flops = ( + hidden_size * intermediate_size * qlen * 3 * num_experts_per_tok * 2 * test_iter / total_time / 1e12 + ) # TFLOPS + + print("Quant mode: ", quant_mode) + print("Time(s): ", total_time) + print("Iteration: ", test_iter) + print("Time(us) per iteration: ", time_per_iter_us) + print("Bandwidth: ", bandwidth, "GB/s") + print("Flops: ", flops, "TFLOPS") + + # Record results + result = { + "quant_mode": quant_mode, + "total_time_seconds": total_time, + "iterations": test_iter, + "time_per_iteration_us": time_per_iter_us, + "bandwidth_GBs": bandwidth, + "flops_TFLOPS": flops, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "test_parameters": { + "expert_num": expert_num, + "hidden_size": hidden_size, + "intermediate_size": intermediate_size, + "max_len": max_len, + "num_experts_per_tok": num_experts_per_tok, + "layer_num": layer_num, + "qlen": qlen, + "warm_up_iter": warm_up_iter, + "test_iter": test_iter, + "CPUInfer_parameter": CPUINFER_PARAM, + }, + } + result.update(get_git_commit()) + result.update(get_system_info()) + record_results(result, json_path) + + print(f"Results saved to {json_path}") + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT4 benchmark tests") + return + + try: + print("Running AMX MOE INT4 benchmark test...") + test_moe_amx_int4_benchmark() + print("✓ AMX MOE INT4 benchmark test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_bench_int4_1.py b/kt-kernel/test/per_commit/test_moe_amx_bench_int4_1.py new file mode 100644 index 0000000..796863a --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_bench_int4_1.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT4_1 benchmark tests for KT-Kernel. + +Benchmarks performance (bandwidth and FLOPS) of AMX-accelerated INT4_1 MOE operations. +""" + +import os +import sys +import time +import json +import subprocess +import platform +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 300 seconds +register_cpu_ci(est_time=300, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + from tqdm import tqdm + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original bench_moe_amx.py) +expert_num = 16 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +layer_num = 2 +qlen = 2048 +warm_up_iter = 1000 +test_iter = 2000 + +# Worker configuration +worker_config_dict = { + "subpool_count": 2, + "subpool_numa_map": [0, 1], + "subpool_thread_count": [45, 45], +} +CPUINFER_PARAM = 90 + + +def get_git_commit(): + """Get current git commit information.""" + result = {} + try: + commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() + commit_msg = subprocess.check_output(["git", "log", "-1", "--pretty=%B"]).decode("utf-8").strip() + result["commit"] = commit + result["commit_message"] = commit_msg + + dirty_output = subprocess.check_output(["git", "status", "--porcelain"]).decode("utf-8").strip() + if dirty_output: + result["dirty"] = True + result["dirty_files"] = dirty_output.splitlines() + else: + result["dirty"] = False + except Exception as e: + result["commit"] = None + result["commit_message"] = None + result["dirty"] = None + result["error"] = str(e) + return result + + +def get_system_info(): + """Get system information including CPU model, memory, cores, and sockets.""" + info = {} + uname = platform.uname() + info["system_name"] = uname.system + info["node_name"] = uname.node + + # Get CPU model (Linux only) + cpu_model = None + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "model name" in line: + cpu_model = line.split(":", 1)[1].strip() + break + except Exception as e: + cpu_model = f"Error: {e}" + info["cpu_model"] = cpu_model + + # Get memory size in GB (Linux only) + mem_total_gb = None + if os.path.exists("/proc/meminfo"): + try: + with open("/proc/meminfo", "r") as f: + for line in f: + if "MemTotal" in line: + mem_kb = float(line.split(":", 1)[1].split()[0]) + mem_total_gb = round(mem_kb / (1024 * 1024), 2) + break + except Exception as e: + mem_total_gb = f"Error: {e}" + info["memory_size_GB"] = mem_total_gb + + # Get CPU core count + info["cpu_core_count"] = os.cpu_count() + + # Get socket count + sockets = set() + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "physical id" in line: + sockets.add(line.split(":", 1)[1].strip()) + except Exception as e: + sockets = set() + info["cpu_socket_count"] = len(sockets) if len(sockets) > 0 else 1 + + return info + + +def record_results(result, filename): + """Append results to JSONL file.""" + with open(filename, "a") as f: + f.write(json.dumps(result) + "\n") + + +@pytest.mark.cpu +def test_moe_amx_int4_1_benchmark(): + """Benchmark AMX INT4_1 MOE performance.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + quant_mode = "int4_1" + bytes_per_elem = 0.5 + + # Setup output file + script_dir = os.path.dirname(os.path.abspath(__file__)) + json_path = os.path.join(script_dir, "bench_moe_amx_int4_1.jsonl") + + with torch.inference_mode(): + # Initialize CPUInfer with worker config + worker_config = kt_kernel_ext.WorkerPoolConfig() + worker_config.subpool_count = worker_config_dict["subpool_count"] + worker_config.subpool_numa_map = worker_config_dict["subpool_numa_map"] + worker_config.subpool_thread_count = worker_config_dict["subpool_thread_count"] + CPUInfer = kt_kernel_ext.CPUInfer(worker_config) + + # Initialize MOE layers + moes = [] + for layer_index in range(layer_num): + gate_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn((expert_num, hidden_size, intermediate_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + config = kt_kernel_ext.moe.MOEConfig(expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.pool = CPUInfer.backend_ + + moe = kt_kernel_ext.moe.AMXInt4_1_MOE(config) + CPUInfer.submit(moe.load_weights_task()) + CPUInfer.sync() + moes.append(moe) + + # Generate test data + gen_iter = 3000 + expert_ids = ( + torch.rand(gen_iter * qlen, expert_num, device="cpu") + .argsort(dim=-1)[:, :num_experts_per_tok] + .reshape(gen_iter, qlen * num_experts_per_tok) + .to("cpu") + .contiguous() + ) + weights = ( + torch.rand((gen_iter, qlen, num_experts_per_tok), dtype=torch.float32, device="cpu").to("cpu").contiguous() + ) + input_tensor = ( + torch.randn((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + output_tensor = ( + torch.empty((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + bsz_tensor = torch.tensor([qlen], device="cpu") + + # Warm-up iterations + print(f"Running warm-up for {warm_up_iter} iterations...") + for i in tqdm(range(warm_up_iter), desc="Warm-up"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Test iterations + print(f"Running test for {test_iter} iterations...") + start = time.perf_counter() + for i in tqdm(range(test_iter), desc="Testing"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + end = time.perf_counter() + total_time = end - start + + # Calculate performance metrics + time_per_iter_us = total_time / test_iter * 1e6 + bandwidth = ( + hidden_size + * intermediate_size + * 3 + * num_experts_per_tok + * (1 / 8 * 256 * (1 - (31 / 32) ** qlen)) + * bytes_per_elem + * test_iter + / total_time + / 1e9 + ) # GB/s + flops = ( + hidden_size * intermediate_size * qlen * 3 * num_experts_per_tok * 2 * test_iter / total_time / 1e12 + ) # TFLOPS + + print("Quant mode: ", quant_mode) + print("Time(s): ", total_time) + print("Iteration: ", test_iter) + print("Time(us) per iteration: ", time_per_iter_us) + print("Bandwidth: ", bandwidth, "GB/s") + print("Flops: ", flops, "TFLOPS") + + # Record results + result = { + "quant_mode": quant_mode, + "total_time_seconds": total_time, + "iterations": test_iter, + "time_per_iteration_us": time_per_iter_us, + "bandwidth_GBs": bandwidth, + "flops_TFLOPS": flops, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "test_parameters": { + "expert_num": expert_num, + "hidden_size": hidden_size, + "intermediate_size": intermediate_size, + "max_len": max_len, + "num_experts_per_tok": num_experts_per_tok, + "layer_num": layer_num, + "qlen": qlen, + "warm_up_iter": warm_up_iter, + "test_iter": test_iter, + "CPUInfer_parameter": CPUINFER_PARAM, + }, + } + result.update(get_git_commit()) + result.update(get_system_info()) + record_results(result, json_path) + + print(f"Results saved to {json_path}") + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT4_1 benchmark tests") + return + + try: + print("Running AMX MOE INT4_1 benchmark test...") + test_moe_amx_int4_1_benchmark() + print("✓ AMX MOE INT4_1 benchmark test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_bench_int4_1k.py b/kt-kernel/test/per_commit/test_moe_amx_bench_int4_1k.py new file mode 100644 index 0000000..764f0b1 --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_bench_int4_1k.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT4_1K benchmark tests for KT-Kernel. + +Benchmarks performance (bandwidth and FLOPS) of AMX-accelerated INT4_1K group quantization MOE operations. +""" + +import os +import sys +import time +import json +import subprocess +import platform +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 300 seconds +register_cpu_ci(est_time=300, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + from tqdm import tqdm + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original bench_moe_amx.py) +expert_num = 16 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +layer_num = 2 +qlen = 2048 +warm_up_iter = 1000 +test_iter = 2000 +k_group_size = 64 + +# Worker configuration +worker_config_dict = { + "subpool_count": 2, + "subpool_numa_map": [0, 1], + "subpool_thread_count": [45, 45], +} +CPUINFER_PARAM = 90 + + +def get_git_commit(): + """Get current git commit information.""" + result = {} + try: + commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() + commit_msg = subprocess.check_output(["git", "log", "-1", "--pretty=%B"]).decode("utf-8").strip() + result["commit"] = commit + result["commit_message"] = commit_msg + + dirty_output = subprocess.check_output(["git", "status", "--porcelain"]).decode("utf-8").strip() + if dirty_output: + result["dirty"] = True + result["dirty_files"] = dirty_output.splitlines() + else: + result["dirty"] = False + except Exception as e: + result["commit"] = None + result["commit_message"] = None + result["dirty"] = None + result["error"] = str(e) + return result + + +def get_system_info(): + """Get system information including CPU model, memory, cores, and sockets.""" + info = {} + uname = platform.uname() + info["system_name"] = uname.system + info["node_name"] = uname.node + + # Get CPU model (Linux only) + cpu_model = None + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "model name" in line: + cpu_model = line.split(":", 1)[1].strip() + break + except Exception as e: + cpu_model = f"Error: {e}" + info["cpu_model"] = cpu_model + + # Get memory size in GB (Linux only) + mem_total_gb = None + if os.path.exists("/proc/meminfo"): + try: + with open("/proc/meminfo", "r") as f: + for line in f: + if "MemTotal" in line: + mem_kb = float(line.split(":", 1)[1].split()[0]) + mem_total_gb = round(mem_kb / (1024 * 1024), 2) + break + except Exception as e: + mem_total_gb = f"Error: {e}" + info["memory_size_GB"] = mem_total_gb + + # Get CPU core count + info["cpu_core_count"] = os.cpu_count() + + # Get socket count + sockets = set() + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "physical id" in line: + sockets.add(line.split(":", 1)[1].strip()) + except Exception as e: + sockets = set() + info["cpu_socket_count"] = len(sockets) if len(sockets) > 0 else 1 + + return info + + +def record_results(result, filename): + """Append results to JSONL file.""" + with open(filename, "a") as f: + f.write(json.dumps(result) + "\n") + + +@pytest.mark.cpu +def test_moe_amx_int4_1k_benchmark(): + """Benchmark AMX INT4_1K MOE performance.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + quant_mode = "int4_1k" + bytes_per_elem = 0.5 + + # Setup output file + script_dir = os.path.dirname(os.path.abspath(__file__)) + json_path = os.path.join(script_dir, "bench_moe_amx_int4_1k.jsonl") + + with torch.inference_mode(): + # Initialize CPUInfer with worker config + worker_config = kt_kernel_ext.WorkerPoolConfig() + worker_config.subpool_count = worker_config_dict["subpool_count"] + worker_config.subpool_numa_map = worker_config_dict["subpool_numa_map"] + worker_config.subpool_thread_count = worker_config_dict["subpool_thread_count"] + CPUInfer = kt_kernel_ext.CPUInfer(worker_config) + + # Initialize MOE layers + moes = [] + for layer_index in range(layer_num): + gate_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn((expert_num, hidden_size, intermediate_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + config = kt_kernel_ext.moe.MOEConfig(expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.pool = CPUInfer.backend_ + + # Configure INT4_1K quantization settings + config.quant_config.bits = 4 + config.quant_config.group_size = k_group_size + config.quant_config.zero_point = True + + moe = kt_kernel_ext.moe.AMXInt4_1KGroup_MOE(config) + CPUInfer.submit(moe.load_weights_task()) + CPUInfer.sync() + moes.append(moe) + + # Generate test data + gen_iter = 3000 + expert_ids = ( + torch.rand(gen_iter * qlen, expert_num, device="cpu") + .argsort(dim=-1)[:, :num_experts_per_tok] + .reshape(gen_iter, qlen * num_experts_per_tok) + .to("cpu") + .contiguous() + ) + weights = ( + torch.rand((gen_iter, qlen, num_experts_per_tok), dtype=torch.float32, device="cpu").to("cpu").contiguous() + ) + input_tensor = ( + torch.randn((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + output_tensor = ( + torch.empty((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + bsz_tensor = torch.tensor([qlen], device="cpu") + + # Warm-up iterations + print(f"Running warm-up for {warm_up_iter} iterations...") + for i in tqdm(range(warm_up_iter), desc="Warm-up"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Test iterations + print(f"Running test for {test_iter} iterations...") + start = time.perf_counter() + for i in tqdm(range(test_iter), desc="Testing"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + end = time.perf_counter() + total_time = end - start + + # Calculate performance metrics + time_per_iter_us = total_time / test_iter * 1e6 + bandwidth = ( + hidden_size + * intermediate_size + * 3 + * num_experts_per_tok + * (1 / 8 * 256 * (1 - (31 / 32) ** qlen)) + * bytes_per_elem + * test_iter + / total_time + / 1e9 + ) # GB/s + flops = ( + hidden_size * intermediate_size * qlen * 3 * num_experts_per_tok * 2 * test_iter / total_time / 1e12 + ) # TFLOPS + + print("Quant mode: ", quant_mode) + print("Time(s): ", total_time) + print("Iteration: ", test_iter) + print("Time(us) per iteration: ", time_per_iter_us) + print("Bandwidth: ", bandwidth, "GB/s") + print("Flops: ", flops, "TFLOPS") + + # Record results + result = { + "quant_mode": quant_mode, + "total_time_seconds": total_time, + "iterations": test_iter, + "time_per_iteration_us": time_per_iter_us, + "bandwidth_GBs": bandwidth, + "flops_TFLOPS": flops, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "test_parameters": { + "expert_num": expert_num, + "hidden_size": hidden_size, + "intermediate_size": intermediate_size, + "max_len": max_len, + "num_experts_per_tok": num_experts_per_tok, + "layer_num": layer_num, + "qlen": qlen, + "warm_up_iter": warm_up_iter, + "test_iter": test_iter, + "k_group_size": k_group_size, + "CPUInfer_parameter": CPUINFER_PARAM, + }, + } + result.update(get_git_commit()) + result.update(get_system_info()) + record_results(result, json_path) + + print(f"Results saved to {json_path}") + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT4_1K benchmark tests") + return + + try: + print("Running AMX MOE INT4_1K benchmark test...") + test_moe_amx_int4_1k_benchmark() + print("✓ AMX MOE INT4_1K benchmark test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests() diff --git a/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py b/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py new file mode 100644 index 0000000..ef3d7f0 --- /dev/null +++ b/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python +# coding=utf-8 +"""AMX MOE INT8 benchmark tests for KT-Kernel. + +Benchmarks performance (bandwidth and FLOPS) of AMX-accelerated INT8 MOE operations. +""" + +import os +import sys +import time +import json +import subprocess +import platform +import pytest + +# Add parent directory to path for CI registration +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from ci.ci_register import register_cpu_ci + +# Register this test for CPU CI with estimated runtime of 300 seconds +register_cpu_ci(est_time=300, suite="default") + +# Check if dependencies are available +try: + import torch + import kt_kernel_ext + from tqdm import tqdm + HAS_DEPS = True +except ImportError as e: + HAS_DEPS = False + import_error = str(e) + +# Test parameters (from original bench_moe_amx.py) +expert_num = 16 +hidden_size = 7168 +intermediate_size = 2048 +max_len = 25600 +num_experts_per_tok = 8 +layer_num = 2 +qlen = 2048 +warm_up_iter = 1000 +test_iter = 2000 + +# Worker configuration +worker_config_dict = { + "subpool_count": 2, + "subpool_numa_map": [0, 1], + "subpool_thread_count": [45, 45], +} +CPUINFER_PARAM = 90 + + +def get_git_commit(): + """Get current git commit information.""" + result = {} + try: + commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() + commit_msg = subprocess.check_output(["git", "log", "-1", "--pretty=%B"]).decode("utf-8").strip() + result["commit"] = commit + result["commit_message"] = commit_msg + + dirty_output = subprocess.check_output(["git", "status", "--porcelain"]).decode("utf-8").strip() + if dirty_output: + result["dirty"] = True + result["dirty_files"] = dirty_output.splitlines() + else: + result["dirty"] = False + except Exception as e: + result["commit"] = None + result["commit_message"] = None + result["dirty"] = None + result["error"] = str(e) + return result + + +def get_system_info(): + """Get system information including CPU model, memory, cores, and sockets.""" + info = {} + uname = platform.uname() + info["system_name"] = uname.system + info["node_name"] = uname.node + + # Get CPU model (Linux only) + cpu_model = None + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "model name" in line: + cpu_model = line.split(":", 1)[1].strip() + break + except Exception as e: + cpu_model = f"Error: {e}" + info["cpu_model"] = cpu_model + + # Get memory size in GB (Linux only) + mem_total_gb = None + if os.path.exists("/proc/meminfo"): + try: + with open("/proc/meminfo", "r") as f: + for line in f: + if "MemTotal" in line: + mem_kb = float(line.split(":", 1)[1].split()[0]) + mem_total_gb = round(mem_kb / (1024 * 1024), 2) + break + except Exception as e: + mem_total_gb = f"Error: {e}" + info["memory_size_GB"] = mem_total_gb + + # Get CPU core count + info["cpu_core_count"] = os.cpu_count() + + # Get socket count + sockets = set() + if os.path.exists("/proc/cpuinfo"): + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "physical id" in line: + sockets.add(line.split(":", 1)[1].strip()) + except Exception as e: + sockets = set() + info["cpu_socket_count"] = len(sockets) if len(sockets) > 0 else 1 + + return info + + +def record_results(result, filename): + """Append results to JSONL file.""" + with open(filename, "a") as f: + f.write(json.dumps(result) + "\n") + + +@pytest.mark.cpu +def test_moe_amx_int8_benchmark(): + """Benchmark AMX INT8 MOE performance.""" + if not HAS_DEPS: + pytest.skip(f"Dependencies not available: {import_error}") + + quant_mode = "int8" + bytes_per_elem = 1.0 + + # Setup output file + script_dir = os.path.dirname(os.path.abspath(__file__)) + json_path = os.path.join(script_dir, "bench_moe_amx_int8.jsonl") + + with torch.inference_mode(): + # Initialize CPUInfer with worker config + worker_config = kt_kernel_ext.WorkerPoolConfig() + worker_config.subpool_count = worker_config_dict["subpool_count"] + worker_config.subpool_numa_map = worker_config_dict["subpool_numa_map"] + worker_config.subpool_thread_count = worker_config_dict["subpool_thread_count"] + CPUInfer = kt_kernel_ext.CPUInfer(worker_config) + + # Initialize MOE layers + moes = [] + for layer_index in range(layer_num): + gate_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn((expert_num, intermediate_size, hidden_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn((expert_num, hidden_size, intermediate_size), dtype=torch.float32, device="cuda") + .to("cpu") + .contiguous() + ) + config = kt_kernel_ext.moe.MOEConfig(expert_num, num_experts_per_tok, hidden_size, intermediate_size, 0) + config.max_len = max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.pool = CPUInfer.backend_ + + moe = kt_kernel_ext.moe.AMXInt8_MOE(config) + CPUInfer.submit(moe.load_weights_task()) + CPUInfer.sync() + moes.append(moe) + + # Generate test data + gen_iter = 3000 + expert_ids = ( + torch.rand(gen_iter * qlen, expert_num, device="cpu") + .argsort(dim=-1)[:, :num_experts_per_tok] + .reshape(gen_iter, qlen * num_experts_per_tok) + .to("cpu") + .contiguous() + ) + weights = ( + torch.rand((gen_iter, qlen, num_experts_per_tok), dtype=torch.float32, device="cpu").to("cpu").contiguous() + ) + input_tensor = ( + torch.randn((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + output_tensor = ( + torch.empty((layer_num, qlen, hidden_size), dtype=torch.bfloat16, device="cuda").to("cpu").contiguous() + ) + bsz_tensor = torch.tensor([qlen], device="cpu") + + # Warm-up iterations + print(f"Running warm-up for {warm_up_iter} iterations...") + for i in tqdm(range(warm_up_iter), desc="Warm-up"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + + # Test iterations + print(f"Running test for {test_iter} iterations...") + start = time.perf_counter() + for i in tqdm(range(test_iter), desc="Testing"): + CPUInfer.submit( + moes[i % layer_num].forward_task( + bsz_tensor.data_ptr(), + num_experts_per_tok, + expert_ids[i % gen_iter].data_ptr(), + weights[i % gen_iter].data_ptr(), + input_tensor[i % layer_num].data_ptr(), + output_tensor[i % layer_num].data_ptr(), + False, + ) + ) + CPUInfer.sync() + end = time.perf_counter() + total_time = end - start + + # Calculate performance metrics + time_per_iter_us = total_time / test_iter * 1e6 + bandwidth = ( + hidden_size + * intermediate_size + * 3 + * num_experts_per_tok + * (1 / 8 * 256 * (1 - (31 / 32) ** qlen)) + * bytes_per_elem + * test_iter + / total_time + / 1e9 + ) # GB/s + flops = ( + hidden_size * intermediate_size * qlen * 3 * num_experts_per_tok * 2 * test_iter / total_time / 1e12 + ) # TFLOPS + + print("Quant mode: ", quant_mode) + print("Time(s): ", total_time) + print("Iteration: ", test_iter) + print("Time(us) per iteration: ", time_per_iter_us) + print("Bandwidth: ", bandwidth, "GB/s") + print("Flops: ", flops, "TFLOPS") + + # Record results + result = { + "quant_mode": quant_mode, + "total_time_seconds": total_time, + "iterations": test_iter, + "time_per_iteration_us": time_per_iter_us, + "bandwidth_GBs": bandwidth, + "flops_TFLOPS": flops, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "test_parameters": { + "expert_num": expert_num, + "hidden_size": hidden_size, + "intermediate_size": intermediate_size, + "max_len": max_len, + "num_experts_per_tok": num_experts_per_tok, + "layer_num": layer_num, + "qlen": qlen, + "warm_up_iter": warm_up_iter, + "test_iter": test_iter, + "CPUInfer_parameter": CPUINFER_PARAM, + }, + } + result.update(get_git_commit()) + result.update(get_system_info()) + record_results(result, json_path) + + print(f"Results saved to {json_path}") + + +def run_all_tests(): + """Run all tests in this file (for standalone execution).""" + if not HAS_DEPS: + print(f"⚠ Dependencies not available: {import_error}") + print("Skipping AMX MOE INT8 benchmark tests") + return + + try: + print("Running AMX MOE INT8 benchmark test...") + test_moe_amx_int8_benchmark() + print("✓ AMX MOE INT8 benchmark test passed") + print("\n✓ All tests passed!") + except Exception as e: + print(f"\n✗ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + run_all_tests()