diff --git a/.gitignore b/.gitignore index 4527be7..f44a0ab 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ build* CMakeFiles/ kvc2/ sched/ +*.png \ No newline at end of file diff --git a/kt-kernel/examples/bench_moe_amx_int8.py b/kt-kernel/examples/bench_moe_amx_int8.py new file mode 100644 index 0000000..a87c200 --- /dev/null +++ b/kt-kernel/examples/bench_moe_amx_int8.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python +# coding=utf-8 +""" +AMX INT8 MoE Benchmark Script + +Benchmarks performance of AMX-accelerated INT8 MOE operations with configurable parameters. +Supports uniform workload distribution across experts and optional CUDA stream mode. + +Usage: + python bench_moe_amx_int8.py [options] + +Examples: + # Default parameters + python bench_moe_amx_int8.py + + # Custom parameters + python bench_moe_amx_int8.py --layer_num 4 --expert_num 256 --workload 8 --use_cuda_stream + + # Full configuration + python bench_moe_amx_int8.py --layer_num 2 --expert_num 128 --num_experts_per_tok 8 \ + --workload 4 --hidden_size 7168 --intermediate_size 2048 \ + --warmup_iter 100 --test_iter 1000 --use_cuda_stream +""" + +import os +import sys +import time +import argparse + +# Add build path for development +sys.path.insert(0, os.path.dirname(__file__) + "/../build") + +import torch + +try: + from kt_kernel import kt_kernel_ext + + HAS_KT_KERNEL = True +except ImportError as e: + HAS_KT_KERNEL = False + import_error = str(e) + + +def parse_args(): + parser = argparse.ArgumentParser( + description="AMX INT8 MoE Benchmark", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + # Model parameters + parser.add_argument("--layer_num", type=int, default=2, help="Number of MoE layers") + parser.add_argument("--expert_num", type=int, default=256, help="Number of experts per layer") + parser.add_argument( + "--num_experts_per_tok", type=int, default=8, help="Number of experts selected per token (top-k)" + ) + parser.add_argument("--hidden_size", type=int, default=7168, help="Hidden dimension size") + parser.add_argument("--intermediate_size", type=int, default=2048, help="Intermediate dimension size") + + # Workload parameters + parser.add_argument("--workload", type=int, default=1, help="Workload (qlen, number of tokens)") + parser.add_argument("--max_len", type=int, default=25600, help="Maximum sequence length for buffer allocation") + + # Benchmark parameters + parser.add_argument("--warmup_iter", type=int, default=100, help="Number of warmup iterations") + parser.add_argument("--test_iter", type=int, default=1000, help="Number of test iterations") + + # Execution mode + parser.add_argument("--use_cuda_stream", action="store_true", help="Use CUDA stream mode (submit_with_cuda_stream)") + parser.add_argument("--profile", action="store_true", help="Enable PyTorch profiler and export trace.json") + parser.add_argument("--profile_path", type=str, default="./trace.json", help="Path to save profile trace") + + # Worker configuration + parser.add_argument("--cpuinfer_threads", type=int, default=60, help="Total CPU inference threads") + parser.add_argument("--numa_count", type=int, default=2, help="Number of NUMA nodes") + parser.add_argument( + "--num_gpu_experts", type=int, default=0, help="Number of experts to place on GPU (first N experts)" + ) + + return parser.parse_args() + + +def generate_uniform_workload(expert_num, num_experts_per_tok, workload): + """ + Generate expert_ids and weights with uniform workload distribution. + + workload = qlen (number of tokens) + Each token selects num_experts_per_tok experts. + Total expert calls = workload * num_experts_per_tok + """ + qlen = workload + + # Randomly select num_experts_per_tok experts (uniform, no duplicates) + # All tokens will use the same expert combination + selected_experts = torch.randperm(expert_num)[:num_experts_per_tok].tolist() + + # Create expert_ids: all tokens use the same expert combination + expert_ids = [selected_experts for _ in range(qlen)] + + # Create on GPU then copy to CPU (faster) + expert_ids = torch.tensor(expert_ids, dtype=torch.long, device="cuda").to("cpu").contiguous() + print(f"Selected experts (all tokens use same): {selected_experts}") + print(f"Expert IDs shape: {expert_ids.shape}") + + # Uniform weights (normalized) - create on GPU then copy + weights = torch.ones((qlen, num_experts_per_tok), dtype=torch.float32, device="cuda") / num_experts_per_tok + weights = weights.to("cpu").contiguous() + + return expert_ids, weights, qlen + + +def run_benchmark(args): + """Run the AMX INT8 MoE benchmark.""" + + print("=" * 60) + print("AMX INT8 MoE Benchmark") + print("=" * 60) + print(f"\nConfiguration:") + print(f" Layers: {args.layer_num}") + print(f" Experts per layer: {args.expert_num}") + print(f" Experts per token: {args.num_experts_per_tok}") + print(f" Hidden size: {args.hidden_size}") + print(f" Intermediate size: {args.intermediate_size}") + print(f" Workload (qlen): {args.workload}") + print(f" Use CUDA stream: {args.use_cuda_stream}") + print(f" Warmup iterations: {args.warmup_iter}") + print(f" Test iterations: {args.test_iter}") + print(f" CPU threads: {args.cpuinfer_threads}") + print(f" NUMA nodes: {args.numa_count}") + + # Generate uniform workload + expert_ids, weights, qlen = generate_uniform_workload(args.expert_num, args.num_experts_per_tok, args.workload) + print(f"\nActual qlen: {qlen}") + print(f"Total expert calls: {qlen * args.num_experts_per_tok}") + + with torch.inference_mode(): + # Initialize CPUInfer + if args.numa_count > 1: + worker_config = kt_kernel_ext.WorkerPoolConfig() + worker_config.subpool_count = args.numa_count + worker_config.subpool_numa_map = list(range(args.numa_count)) + threads_per_numa = args.cpuinfer_threads // args.numa_count + worker_config.subpool_thread_count = [threads_per_numa] * args.numa_count + cpu_infer = kt_kernel_ext.CPUInfer(worker_config) + else: + cpu_infer = kt_kernel_ext.CPUInfer(args.cpuinfer_threads) + + # Physical to logical mapping (identity) + physical_to_logical_map = torch.arange(args.expert_num, dtype=torch.int64, device="cpu").contiguous() + + # GPU experts mask - set first num_gpu_experts to True if specified + gpu_experts_mask = torch.zeros(args.expert_num, dtype=torch.bool, device="cpu") + if args.num_gpu_experts > 0: + num_gpu = min(args.num_gpu_experts, args.expert_num) + gpu_experts_mask[:num_gpu] = True + print(f" GPU experts: {num_gpu} (experts 0-{num_gpu-1})") + + # Initialize MoE layers + print("\nInitializing MoE layers...") + moes = [] + for layer_idx in range(args.layer_num): + # Create random weights on GPU then copy to CPU (faster) + gate_proj = ( + torch.randn( + (args.expert_num, args.intermediate_size, args.hidden_size), dtype=torch.bfloat16, device="cuda" + ) + .to("cpu") + .contiguous() + ) + up_proj = ( + torch.randn( + (args.expert_num, args.intermediate_size, args.hidden_size), dtype=torch.bfloat16, device="cuda" + ) + .to("cpu") + .contiguous() + ) + down_proj = ( + torch.randn( + (args.expert_num, args.hidden_size, args.intermediate_size), dtype=torch.bfloat16, device="cuda" + ) + .to("cpu") + .contiguous() + ) + + # Configure MoE + config = kt_kernel_ext.moe.MOEConfig( + args.expert_num, + args.num_experts_per_tok, + args.hidden_size, + args.intermediate_size, + gpu_experts_mask.data_ptr(), + ) + config.max_len = args.max_len + config.gate_proj = gate_proj.data_ptr() + config.up_proj = up_proj.data_ptr() + config.down_proj = down_proj.data_ptr() + config.pool = cpu_infer.backend_ + + moe = kt_kernel_ext.moe.AMXInt8_MOE(config) + cpu_infer.submit(moe.load_weights_task(physical_to_logical_map.data_ptr())) + cpu_infer.sync() + + moes.append(moe) + print(f" Layer {layer_idx} initialized") + + # Prepare input/output tensors + input_tensor = torch.randn((qlen, args.hidden_size), dtype=torch.bfloat16, device="cpu").contiguous() + output_tensor = torch.zeros((qlen, args.hidden_size), dtype=torch.bfloat16, device="cpu").contiguous() + bsz_tensor = torch.tensor([qlen], dtype=torch.int32, device="cpu") + + # CUDA stream setup (if enabled) + cuda_stream = None + if args.use_cuda_stream: + if not torch.cuda.is_available(): + print("\nWarning: CUDA not available, falling back to non-stream mode") + args.use_cuda_stream = False + else: + cuda_stream = torch.cuda.current_stream().cuda_stream + print(f"\nUsing CUDA stream: {cuda_stream}") + + # Warmup + print(f"\nWarmup ({args.warmup_iter} iterations)...") + for i in range(args.warmup_iter): + moe = moes[i % args.layer_num] + task = moe.forward_task( + bsz_tensor.data_ptr(), + args.num_experts_per_tok, + expert_ids.data_ptr(), + weights.data_ptr(), + input_tensor.data_ptr(), + output_tensor.data_ptr(), + False, # incremental + ) + + if args.use_cuda_stream: + cpu_infer.submit_with_cuda_stream(cuda_stream, task) + cpu_infer.sync_with_cuda_stream(cuda_stream) + else: + cpu_infer.submit(task) + cpu_infer.sync() + + # Benchmark + print(f"Benchmarking ({args.test_iter} iterations)...") + + if args.use_cuda_stream: + torch.cuda.synchronize() + + # Setup profiler if enabled + profiler = None + if args.profile: + profiler = torch.profiler.profile( + activities=[ + torch.profiler.ProfilerActivity.CPU, + torch.profiler.ProfilerActivity.CUDA, + ], + record_shapes=False, + with_stack=False, + ) + profiler.__enter__() + + start_time = time.perf_counter() + + for i in range(args.test_iter): + moe = moes[i % args.layer_num] + + if args.profile: + torch.cuda.nvtx.range_push(f"iter_{i}") + + task = moe.forward_task( + bsz_tensor.data_ptr(), + args.num_experts_per_tok, + expert_ids.data_ptr(), + weights.data_ptr(), + input_tensor.data_ptr(), + output_tensor.data_ptr(), + False, + ) + + if args.use_cuda_stream: + if args.profile: + torch.cuda.nvtx.range_push("submit") + cpu_infer.submit_with_cuda_stream(cuda_stream, task) + if args.profile: + torch.cuda.nvtx.range_pop() + torch.cuda.nvtx.range_push("sync") + cpu_infer.sync_with_cuda_stream(cuda_stream) + if args.profile: + torch.cuda.nvtx.range_pop() + else: + cpu_infer.submit(task) + cpu_infer.sync() + + if args.profile: + torch.cuda.nvtx.range_pop() + + if args.use_cuda_stream: + torch.cuda.synchronize() + + end_time = time.perf_counter() + total_time = end_time - start_time + + # Export profiler trace + if profiler: + profiler.__exit__(None, None, None) + profiler.export_chrome_trace(args.profile_path) + print(f"\nProfile trace saved to: {args.profile_path}") + + # Calculate metrics + # Note: each iteration processes ONE layer (round-robin: moe = moes[i % layer_num]) + time_per_iter_us = total_time / args.test_iter * 1e6 + + # Bandwidth calculation + # Weight size per expert: 3 * hidden_size * intermediate_size * bytes_per_elem + bytes_per_elem = 1.0 # INT8 + weight_bytes_per_expert = 3 * args.hidden_size * args.intermediate_size * bytes_per_elem + + # Total weight bytes accessed per iteration (one layer per iteration) + # Each token activates num_experts_per_tok experts + total_experts_activated = qlen * args.num_experts_per_tok + weight_bytes_per_iter = total_experts_activated * weight_bytes_per_expert + + bandwidth_gbs = weight_bytes_per_iter * args.test_iter / total_time / 1e9 + + # FLOPS calculation + # Per expert: 3 * hidden * intermediate * 2 (multiply-add) + flops_per_expert = 3 * args.hidden_size * args.intermediate_size * 2 + total_flops = total_experts_activated * flops_per_expert * args.test_iter + tflops = total_flops / total_time / 1e12 + + # Results + print("\n" + "=" * 60) + print("Results") + print("=" * 60) + print(f" Total time: {total_time:.3f} s") + print(f" Time per iteration: {time_per_iter_us:.2f} us (= time per layer)") + print(f" Memory bandwidth: {bandwidth_gbs:.2f} GB/s") + print(f" Compute throughput: {tflops:.3f} TFLOPS") + print("=" * 60) + + return { + "total_time_s": total_time, + "time_per_iter_us": time_per_iter_us, + "bandwidth_gbs": bandwidth_gbs, + "tflops": tflops, + } + + +def main(): + args = parse_args() + + if not HAS_KT_KERNEL: + print(f"Error: kt_kernel not available: {import_error}") + sys.exit(1) + + run_benchmark(args) + + +if __name__ == "__main__": + main() diff --git a/kt-kernel/examples/test_fp8_moe.py b/kt-kernel/examples/test_fp8_moe.py index 0b7f2e0..94507df 100644 --- a/kt-kernel/examples/test_fp8_moe.py +++ b/kt-kernel/examples/test_fp8_moe.py @@ -19,6 +19,7 @@ sys.path.insert(0, os.path.dirname(__file__) + "/../build") import torch import kt_kernel +from kt_kernel import kt_kernel_ext torch.manual_seed(42) diff --git a/kt-kernel/ext_bindings.cpp b/kt-kernel/ext_bindings.cpp index 7344b2d..9a9056c 100644 --- a/kt-kernel/ext_bindings.cpp +++ b/kt-kernel/ext_bindings.cpp @@ -503,16 +503,21 @@ PYBIND11_MODULE(kt_kernel_ext, m) { .def(py::init([](int expert_num, int routed_expert_num, int hidden_size, int intermediate_size) { return GeneralMOEConfig(expert_num, routed_expert_num, hidden_size, intermediate_size); })) - .def(py::init( - [](int expert_num, int routed_expert_num, int hidden_size, int intermediate_size, int num_gpu_experts) { - GeneralMOEConfig cfg(expert_num, routed_expert_num, hidden_size, intermediate_size); - cfg.num_gpu_experts = num_gpu_experts; - return cfg; - })) + .def(py::init([](int expert_num, int routed_expert_num, int hidden_size, int intermediate_size, + uintptr_t gpu_experts_mask_ptr) { + GeneralMOEConfig cfg(expert_num, routed_expert_num, hidden_size, intermediate_size); + cfg.gpu_experts_mask = reinterpret_cast(gpu_experts_mask_ptr); + cfg.compute_num_gpu_experts(); + return cfg; + })) .def_readwrite("layer_idx", &GeneralMOEConfig::layer_idx) .def_readwrite("pool", &GeneralMOEConfig::pool) - .def_readwrite("num_gpu_experts", &GeneralMOEConfig::num_gpu_experts) + .def_readonly("num_gpu_experts", &GeneralMOEConfig::num_gpu_experts) + .def_property( + "gpu_experts_mask", + [](const GeneralMOEConfig& self) { return reinterpret_cast(self.gpu_experts_mask); }, + [](GeneralMOEConfig& self, uintptr_t val) { self.gpu_experts_mask = reinterpret_cast(val); }) .DEF_PTR_PROPERTY(GeneralMOEConfig, physical_to_logical_map) .DEF_PTR_PROPERTY(GeneralMOEConfig, gate_proj) diff --git a/kt-kernel/operators/amx/moe_base.hpp b/kt-kernel/operators/amx/moe_base.hpp index 09149e0..70724a0 100644 --- a/kt-kernel/operators/amx/moe_base.hpp +++ b/kt-kernel/operators/amx/moe_base.hpp @@ -195,7 +195,7 @@ class AMX_MOE_BASE { std::fill(m_local_num_.begin(), m_local_num_.end(), 0); for (int i = 0; i < qlen; i++) { for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } m_local_pos_[i][j] = m_local_num_[expert_ids[i * k + j]]++; @@ -296,7 +296,7 @@ class AMX_MOE_BASE { direct_or_pool(qlen, [&](int i) { for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } memcpy(m_local_input_ptr_[expert_ids[i * k + j]] + m_local_pos_[i][j] * config_.hidden_size, @@ -403,7 +403,7 @@ class AMX_MOE_BASE { __m512 x0 = _mm512_setzero_ps(); __m512 x1 = _mm512_setzero_ps(); for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } __m512 weight = _mm512_set1_ps(weights[i * k + j]); @@ -450,7 +450,7 @@ class AMX_MOE_BASE { int activated_expert = 0; std::fill(m_local_num_.begin(), m_local_num_.end(), 0); for (int i = 0; i < k; i++) { - if (expert_ids[i] < config_.num_gpu_experts || expert_ids[i] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i])) { continue; } m_expert_id_map_[activated_expert] = expert_ids[i]; @@ -607,7 +607,7 @@ class AMX_MOE_BASE { __m512 x0 = _mm512_setzero_ps(); __m512 x1 = _mm512_setzero_ps(); for (int j = 0; j < k; j++) { - if (expert_ids[j] < config_.num_gpu_experts || expert_ids[j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[j])) { continue; } __m512 weight = _mm512_set1_ps(weights[j]); diff --git a/kt-kernel/operators/common.hpp b/kt-kernel/operators/common.hpp index cfb8488..2ffc6a7 100644 --- a/kt-kernel/operators/common.hpp +++ b/kt-kernel/operators/common.hpp @@ -238,9 +238,25 @@ struct GeneralMOEConfig { WorkerPool* pool = nullptr; // SGLang offload - int num_gpu_experts = 0; + int num_gpu_experts = 0; // Computed from gpu_experts_mask + uint8_t* gpu_experts_mask = nullptr; // Bool mask: true = expert on GPU void* physical_to_logical_map = nullptr; + // Compute num_gpu_experts from gpu_experts_mask + void compute_num_gpu_experts() { + num_gpu_experts = 0; + if (gpu_experts_mask) { + for (int i = 0; i < expert_num; i++) { + if (gpu_experts_mask[i]) num_gpu_experts++; + } + } + } + + // Check if expert should be skipped (invalid, out of range, or on GPU) + inline bool should_skip_expert(int64_t expert_id) const { + return expert_id < 0 || expert_id >= expert_num || (gpu_experts_mask && gpu_experts_mask[expert_id]); + } + void* gate_proj; void* up_proj; void* down_proj; diff --git a/kt-kernel/operators/llamafile/moe.hpp b/kt-kernel/operators/llamafile/moe.hpp index 3079d0f..3a0d761 100644 --- a/kt-kernel/operators/llamafile/moe.hpp +++ b/kt-kernel/operators/llamafile/moe.hpp @@ -317,7 +317,7 @@ class LLAMA_MOE_TP { int activated_expert = 0; for (int i = 0; i < k; i++) { - if (expert_ids[i] < config_.num_gpu_experts || expert_ids[i] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i])) { continue; } m_expert_id_map_[activated_expert] = expert_ids[i]; @@ -474,10 +474,7 @@ class LLAMA_MOE_TP { } for (int i = 0; i < qlen; i++) { for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { - continue; - } - if (expert_ids[i * k + j] == -1) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } m_local_pos_[i][j] = m_local_num_[expert_ids[i * k + j]]++; @@ -567,10 +564,7 @@ class LLAMA_MOE_TP { } } for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { - continue; - } - if (expert_ids[i * k + j] == -1) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } memcpy(m_local_gate_input_ptr_[expert_ids[i * k + j]] + @@ -717,10 +711,7 @@ class LLAMA_MOE_TP { m_output_fp32_[i][e] = 0; } for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { - continue; - } - if (expert_ids[i * k + j] == -1) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } for (int e = 0; e < config_.hidden_size; e++) { diff --git a/kt-kernel/operators/moe_kernel/moe.hpp b/kt-kernel/operators/moe_kernel/moe.hpp index b866739..f33a7ac 100644 --- a/kt-kernel/operators/moe_kernel/moe.hpp +++ b/kt-kernel/operators/moe_kernel/moe.hpp @@ -457,7 +457,7 @@ class MOE_KERNEL_TP } for (int i = 0; i < qlen; i++) { for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } m_local_pos_[i][j] = m_local_num_[expert_ids[i * k + j]]++; @@ -488,7 +488,7 @@ class MOE_KERNEL_TP // Copy inputs into expert-local buffers MOE_DIRECT_OR_POOL_BY_VAR(qlen, [&](int i) { for (int j = 0; j < k; j++) { - if (expert_ids[i * k + j] < config_.num_gpu_experts || expert_ids[i * k + j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[i * k + j])) { continue; } memcpy(m_local_input_ptr_[expert_ids[i * k + j]] + m_local_pos_[i][j] * config_.hidden_size, @@ -639,8 +639,7 @@ class MOE_KERNEL_TP for (int e = e_start; e < e_end; e++) { float sum = 0; for (int j = 0; j < k; j++) { - if (expert_ids[q_idx * k + j] < config_.num_gpu_experts || - expert_ids[q_idx * k + j] >= config_.expert_num) { + if (config_.should_skip_expert(expert_ids[q_idx * k + j])) { continue; } sum += weights[q_idx * k + j] * ((float*)m_local_down_output_ptr_[expert_ids[q_idx * k + j]]) diff --git a/kt-kernel/python/__init__.py b/kt-kernel/python/__init__.py index 8b13399..a01086b 100644 --- a/kt-kernel/python/__init__.py +++ b/kt-kernel/python/__init__.py @@ -50,6 +50,7 @@ kt_kernel_ext = _kt_kernel_ext # Import main API from .experts import KTMoEWrapper +from .experts_base import generate_gpu_experts_masks # Read version from package metadata (preferred) or fallback to project root try: @@ -82,4 +83,4 @@ except ImportError: except ImportError: __version__ = "0.4.3" -__all__ = ["KTMoEWrapper", "kt_kernel_ext", "__cpu_variant__", "__version__"] +__all__ = ["KTMoEWrapper", "generate_gpu_experts_masks", "kt_kernel_ext", "__cpu_variant__", "__version__"] diff --git a/kt-kernel/python/experts.py b/kt-kernel/python/experts.py index 63c9494..5d867e9 100644 --- a/kt-kernel/python/experts.py +++ b/kt-kernel/python/experts.py @@ -11,6 +11,7 @@ selects the appropriate backend implementation based on the method parameter. from __future__ import annotations +import torch from typing import List, Optional # Import base infrastructure @@ -30,13 +31,17 @@ class KTMoEWrapper: selects the appropriate backend implementation based on the `method` parameter. Usage: + # Create a mask where experts 0, 2, 5 are on GPU + gpu_mask = torch.zeros(8, dtype=torch.bool) + gpu_mask[[0, 2, 5]] = True + wrapper = KTMoEWrapper( layer_idx=0, num_experts=8, num_experts_per_tok=2, hidden_size=4096, moe_intermediate_size=14336, - num_gpu_experts=2, + gpu_experts_mask=gpu_mask, # or None for all experts on CPU cpuinfer_threads=32, threadpool_count=2, weight_path="/path/to/weights", @@ -52,7 +57,7 @@ class KTMoEWrapper: num_experts_per_tok: int, hidden_size: int, moe_intermediate_size: int, - num_gpu_experts: int, + gpu_experts_mask: Optional[torch.Tensor], cpuinfer_threads: int, threadpool_count: int, weight_path: str, @@ -70,7 +75,10 @@ class KTMoEWrapper: num_experts_per_tok: Number of experts per token (top-k) hidden_size: Hidden dimension size moe_intermediate_size: MoE intermediate size - num_gpu_experts: Number of experts to run on GPU + gpu_experts_mask: Boolean mask indicating which experts are on GPU. + Shape: [num_experts], dtype: torch.bool. + mask[i] = True means expert i is on GPU. + If None, all experts are on CPU. cpuinfer_threads: Number of CPU inference threads threadpool_count: Number of NUMA subpools weight_path: Path to weights @@ -85,7 +93,7 @@ class KTMoEWrapper: # Select backend based on method if method in ["AMXINT4", "AMXINT8"]: backend_cls = AMXMoEWrapper - elif method in ["RAWINT4", "FP8", "BF16"]: + elif method in ["RAWINT4", "FP8", "BF16", "FP8_PERCHANNEL"]: backend_cls = NativeMoEWrapper elif method == "LLAMAFILE": backend_cls = LlamafileMoEWrapper @@ -101,7 +109,7 @@ class KTMoEWrapper: num_experts_per_tok=num_experts_per_tok, hidden_size=hidden_size, moe_intermediate_size=moe_intermediate_size, - num_gpu_experts=num_gpu_experts, + gpu_experts_mask=gpu_experts_mask, cpuinfer_threads=cpuinfer_threads, threadpool_count=threadpool_count, weight_path=weight_path, diff --git a/kt-kernel/python/experts_base.py b/kt-kernel/python/experts_base.py index 365fe20..ba879ff 100644 --- a/kt-kernel/python/experts_base.py +++ b/kt-kernel/python/experts_base.py @@ -18,6 +18,60 @@ import ctypes from kt_kernel import kt_kernel_ext +def generate_gpu_experts_masks( + activation_freq: torch.Tensor, + num_gpu_experts: int, +) -> torch.Tensor: + """ + Generate GPU experts masks based on activation frequency. + + Selects the top `num_gpu_experts` experts with highest activation frequency + across all layers to be placed on GPU. + + Args: + activation_freq: Activation frequency table of shape (num_layers, num_experts). + Higher values indicate more frequently activated experts. + num_gpu_experts: Total number of experts to place on GPU across all layers. + + Returns: + gpu_experts_masks: Boolean mask of shape (num_layers, num_experts) on CPU. + True means the expert should be on GPU. + + Example: + >>> activation_freq = torch.tensor([ + ... [0.1, 0.5, 0.3, 0.8], # layer 0 + ... [0.2, 0.4, 0.9, 0.1], # layer 1 + ... ]) + >>> masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=3) + >>> # Top 3: layer0-expert3 (0.8), layer1-expert2 (0.9), layer0-expert1 (0.5) + >>> masks + tensor([[False, True, False, True], + [False, False, True, False]]) + """ + num_layers, num_experts_per_layer = activation_freq.shape + total_experts = num_layers * num_experts_per_layer + + # Clamp num_gpu_experts to valid range + num_gpu_experts = min(num_gpu_experts, total_experts) + num_gpu_experts = max(num_gpu_experts, 0) + + if num_gpu_experts == 0: + return torch.zeros(num_layers, num_experts_per_layer, dtype=torch.bool, device="cpu") + + # Flatten and find top-k indices + flat_freq = activation_freq.view(-1).to(device="cpu") + _, top_indices = torch.topk(flat_freq, k=num_gpu_experts, largest=True, sorted=False) + + # Create mask + gpu_experts_masks = torch.zeros(total_experts, dtype=torch.bool, device="cpu") + gpu_experts_masks[top_indices] = True + + # Reshape to (num_layers, num_experts) + gpu_experts_masks = gpu_experts_masks.view(num_layers, num_experts_per_layer) + + return gpu_experts_masks + + class KExpertsCPUBuffer: """ CPU buffer management for expert computation. @@ -102,7 +156,7 @@ class BaseMoEWrapper(ABC): num_experts_per_tok: int, hidden_size: int, moe_intermediate_size: int, - num_gpu_experts: int, + gpu_experts_mask: Optional[torch.Tensor], cpuinfer_threads: int, threadpool_count: int, weight_path: str, @@ -120,7 +174,10 @@ class BaseMoEWrapper(ABC): num_experts_per_tok: Number of experts per token (top-k) hidden_size: Hidden dimension size moe_intermediate_size: MoE intermediate size - num_gpu_experts: Number of experts to run on GPU + gpu_experts_mask: Boolean mask indicating which experts are on GPU. + Shape: [num_experts], dtype: torch.bool. + mask[i] = True means expert i is on GPU. + If None, all experts are on CPU. cpuinfer_threads: Number of CPU inference threads threadpool_count: Number of NUMA subpools weight_path: Path to weights @@ -134,7 +191,22 @@ class BaseMoEWrapper(ABC): self.num_experts_per_tok = num_experts_per_tok self.hidden_size = hidden_size self.moe_intermediate_size = moe_intermediate_size - self.num_gpu_experts = num_gpu_experts + + # Process gpu_experts_mask: convert to bool tensor on CPU, pinned memory for async copy + # This mask is shared between C and Python (C uses uint8_t*), both can read/write it + if gpu_experts_mask is None: + # No GPU experts - all experts on CPU + self.gpu_experts_mask = torch.zeros(num_experts, dtype=torch.bool, device="cpu", pin_memory=True) + else: + # Create a new pinned tensor and copy data into it + self.gpu_experts_mask = torch.empty(num_experts, dtype=torch.bool, device="cpu", pin_memory=True) + self.gpu_experts_mask.copy_(gpu_experts_mask) + + self.num_gpu_experts = int(self.gpu_experts_mask.sum().item()) + + # GPU copy for mask operations in forward pass (e.g., mask_cpu_expert_ids) + # This will be lazily initialized when needed + self._gpu_experts_mask_gpu: Optional[torch.Tensor] = None self.weight_path = weight_path self.chunked_prefill_size = chunked_prefill_size self.cpu_save = cpu_save diff --git a/kt-kernel/python/utils/amx.py b/kt-kernel/python/utils/amx.py index 73e03df..38d417e 100644 --- a/kt-kernel/python/utils/amx.py +++ b/kt-kernel/python/utils/amx.py @@ -1,6 +1,7 @@ import os import torch import ctypes +from typing import Optional # Use relative imports for package structure from ..experts_base import BaseMoEWrapper @@ -41,7 +42,7 @@ class AMXMoEWrapper(BaseMoEWrapper): num_experts_per_tok: int, hidden_size: int, moe_intermediate_size: int, - num_gpu_experts: int, + gpu_experts_mask: Optional[torch.Tensor], cpuinfer_threads: int, threadpool_count: int, weight_path: str, @@ -59,7 +60,10 @@ class AMXMoEWrapper(BaseMoEWrapper): num_experts_per_tok: Number of experts per token (top-k) hidden_size: Hidden dimension size moe_intermediate_size: MoE intermediate size - num_gpu_experts: Number of experts to run on GPU + gpu_experts_mask: Boolean mask indicating which experts are on GPU. + Shape: [num_experts], dtype: torch.bool. + mask[i] = True means expert i is on GPU. + If None, all experts are on CPU. cpuinfer_threads: Number of CPU inference threads threadpool_count: Number of NUMA subpools weight_path: Path to AMX weights (SafeTensor format) @@ -81,7 +85,7 @@ class AMXMoEWrapper(BaseMoEWrapper): num_experts_per_tok=num_experts_per_tok, hidden_size=hidden_size, moe_intermediate_size=moe_intermediate_size, - num_gpu_experts=num_gpu_experts, + gpu_experts_mask=gpu_experts_mask, cpuinfer_threads=cpuinfer_threads, threadpool_count=threadpool_count, weight_path=weight_path, @@ -139,7 +143,7 @@ class AMXMoEWrapper(BaseMoEWrapper): self.num_experts_per_tok, self.hidden_size, self.moe_intermediate_size, - self.num_gpu_experts, + self.gpu_experts_mask.data_ptr(), ) moe_config.layer_idx = self.layer_idx moe_config.pool = self.cpu_infer.backend_ @@ -254,7 +258,7 @@ class AMXMoEWrapper(BaseMoEWrapper): self.num_experts_per_tok, self.hidden_size, self.moe_intermediate_size, - self.num_gpu_experts, + self.gpu_experts_mask.data_ptr(), ) moe_config.layer_idx = self.layer_idx moe_config.pool = self.cpu_infer.backend_ @@ -323,7 +327,7 @@ class NativeMoEWrapper(BaseMoEWrapper): num_experts_per_tok: int, hidden_size: int, moe_intermediate_size: int, - num_gpu_experts: int, + gpu_experts_mask: Optional[torch.Tensor], cpuinfer_threads: int, threadpool_count: int, weight_path: str, @@ -349,7 +353,7 @@ class NativeMoEWrapper(BaseMoEWrapper): num_experts_per_tok=num_experts_per_tok, hidden_size=hidden_size, moe_intermediate_size=moe_intermediate_size, - num_gpu_experts=num_gpu_experts, + gpu_experts_mask=gpu_experts_mask, cpuinfer_threads=cpuinfer_threads, threadpool_count=threadpool_count, weight_path=weight_path, @@ -448,7 +452,7 @@ class NativeMoEWrapper(BaseMoEWrapper): self.num_experts_per_tok, self.hidden_size, self.moe_intermediate_size, - self.num_gpu_experts, + self.gpu_experts_mask.data_ptr(), ) moe_config.layer_idx = self.layer_idx moe_config.pool = self.cpu_infer.backend_ diff --git a/kt-kernel/python/utils/llamafile.py b/kt-kernel/python/utils/llamafile.py index d6086a9..708c29d 100644 --- a/kt-kernel/python/utils/llamafile.py +++ b/kt-kernel/python/utils/llamafile.py @@ -33,7 +33,7 @@ class LlamafileMoEWrapper(BaseMoEWrapper): num_experts_per_tok: int, hidden_size: int, moe_intermediate_size: int, - num_gpu_experts: int, + gpu_experts_mask: Optional[torch.Tensor], cpuinfer_threads: int, threadpool_count: int, weight_path: str, @@ -51,7 +51,10 @@ class LlamafileMoEWrapper(BaseMoEWrapper): num_experts_per_tok: Number of experts per token (top-k) hidden_size: Hidden dimension size moe_intermediate_size: MoE intermediate size - num_gpu_experts: Number of experts to run on GPU + gpu_experts_mask: Boolean mask indicating which experts are on GPU. + Shape: [num_experts], dtype: torch.bool. + mask[i] = True means expert i is on GPU. + If None, all experts are on CPU. cpuinfer_threads: Number of CPU inference threads threadpool_count: Number of NUMA subpools (TP count) weight_path: Path to GGUF weights @@ -122,7 +125,7 @@ class LlamafileMoEWrapper(BaseMoEWrapper): num_experts_per_tok=num_experts_per_tok, hidden_size=hidden_size, moe_intermediate_size=moe_intermediate_size, - num_gpu_experts=num_gpu_experts, + gpu_experts_mask=gpu_experts_mask, cpuinfer_threads=cpuinfer_threads, threadpool_count=threadpool_count, weight_path=weight_path, @@ -189,7 +192,7 @@ class LlamafileMoEWrapper(BaseMoEWrapper): self.num_experts_per_tok, self.hidden_size, self.moe_intermediate_size, - self.num_gpu_experts, + self.gpu_experts_mask.data_ptr(), ) moe_config.layer_idx = self.layer_idx moe_config.pool = self.cpu_infer.backend_ diff --git a/kt-kernel/python/utils/moe_kernel.py b/kt-kernel/python/utils/moe_kernel.py index fa3b3d0..1d772ea 100644 --- a/kt-kernel/python/utils/moe_kernel.py +++ b/kt-kernel/python/utils/moe_kernel.py @@ -1,6 +1,7 @@ import os import torch import ctypes +from typing import Optional # Use relative imports for package structure from ..experts_base import BaseMoEWrapper @@ -40,7 +41,7 @@ class GeneralMoEWrapper(BaseMoEWrapper): num_experts_per_tok: int, hidden_size: int, moe_intermediate_size: int, - num_gpu_experts: int, + gpu_experts_mask: Optional[torch.Tensor], cpuinfer_threads: int, threadpool_count: int, weight_path: str, @@ -58,7 +59,10 @@ class GeneralMoEWrapper(BaseMoEWrapper): num_experts_per_tok: Number of experts per token (top-k) hidden_size: Hidden dimension size moe_intermediate_size: MoE intermediate size - num_gpu_experts: Number of experts to run on GPU + gpu_experts_mask: Boolean mask indicating which experts are on GPU. + Shape: [num_experts], dtype: torch.bool. + mask[i] = True means expert i is on GPU. + If None, all experts are on CPU. cpuinfer_threads: Number of CPU inference threads threadpool_count: Number of NUMA subpools weight_path: Path to weights (SafeTensor format) @@ -85,7 +89,7 @@ class GeneralMoEWrapper(BaseMoEWrapper): num_experts_per_tok=num_experts_per_tok, hidden_size=hidden_size, moe_intermediate_size=moe_intermediate_size, - num_gpu_experts=num_gpu_experts, + gpu_experts_mask=gpu_experts_mask, cpuinfer_threads=cpuinfer_threads, threadpool_count=threadpool_count, weight_path=weight_path, @@ -143,7 +147,7 @@ class GeneralMoEWrapper(BaseMoEWrapper): self.num_experts_per_tok, self.hidden_size, self.moe_intermediate_size, - self.num_gpu_experts, + self.gpu_experts_mask.data_ptr(), ) moe_config.layer_idx = self.layer_idx moe_config.pool = self.cpu_infer.backend_ @@ -258,7 +262,7 @@ class GeneralMoEWrapper(BaseMoEWrapper): self.num_experts_per_tok, self.hidden_size, self.moe_intermediate_size, - self.num_gpu_experts, + self.gpu_experts_mask.data_ptr(), ) moe_config.layer_idx = self.layer_idx moe_config.pool = self.cpu_infer.backend_ diff --git a/kt-kernel/scripts/convert_cpu_weights.py b/kt-kernel/scripts/convert_cpu_weights.py index f597321..839231e 100644 --- a/kt-kernel/scripts/convert_cpu_weights.py +++ b/kt-kernel/scripts/convert_cpu_weights.py @@ -864,15 +864,16 @@ class OnlineQuantConverter(ConverterBase): } amx_method = quant_to_amx_map.get(self.quant_method, "AMXINT4") - # Create AMXMoEWrapper instance for this layer - # num_gpu_experts=0 since we're converting all experts to CPU format + # Create KTMoEWrapper instance for this layer + # gpu_experts_mask: all False means all experts are on CPU for conversion + gpu_experts_mask = torch.zeros(self.num_experts, dtype=torch.bool) wrapper = KTMoEWrapper( layer_idx=layer_idx, num_experts=self.num_experts, num_experts_per_tok=self.num_experts_per_tok, hidden_size=self.hidden_size, moe_intermediate_size=self.moe_intermediate_size, - num_gpu_experts=0, # All experts on CPU for conversion + gpu_experts_mask=gpu_experts_mask, # All experts on CPU for conversion cpuinfer_threads=self.cpuinfer_threads, threadpool_count=self.threadpool_count, weight_path=self.output_path, # Output path for quantized weights diff --git a/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py b/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py index f5d9f85..4a512ed 100644 --- a/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py +++ b/kt-kernel/test/per_commit/test_moe_amx_bench_int8.py @@ -34,13 +34,13 @@ except ImportError as e: import_error = str(e) # Test parameters (from original bench_moe_amx.py) -expert_num = 16 +expert_num = 128 hidden_size = 7168 intermediate_size = 2048 max_len = 25600 -num_experts_per_tok = 8 +num_experts_per_tok = 0 layer_num = 2 -qlen = 2048 +qlen = 1 warm_up_iter = 1000 test_iter = 2000 diff --git a/kt-kernel/test/test_generate_gpu_experts_masks.py b/kt-kernel/test/test_generate_gpu_experts_masks.py new file mode 100644 index 0000000..72c1be1 --- /dev/null +++ b/kt-kernel/test/test_generate_gpu_experts_masks.py @@ -0,0 +1,162 @@ +"""Test for generate_gpu_experts_masks function.""" + +import sys +import os + +# Add python directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "python")) + +import torch +import time +from experts_base import generate_gpu_experts_masks + + +def test_basic(): + """Test basic functionality.""" + print("=" * 60) + print("Test 1: Basic functionality") + print("=" * 60) + + activation_freq = torch.tensor([ + [0.1, 0.5, 0.3, 0.8], # layer 0 + [0.2, 0.4, 0.9, 0.1], # layer 1 + ]) + + print(f"Input activation_freq:\n{activation_freq}") + print(f"num_gpu_experts: 3") + + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=3) + + print(f"Output masks:\n{masks}") + print(f"Output dtype: {masks.dtype}, device: {masks.device}") + + # Verify: top 3 should be (1,2)=0.9, (0,3)=0.8, (0,1)=0.5 + expected_gpu_count = masks.sum().item() + print(f"Total GPU experts: {expected_gpu_count}") + + # Check the top 3 positions + assert masks[1, 2] == True, "layer1-expert2 (0.9) should be on GPU" + assert masks[0, 3] == True, "layer0-expert3 (0.8) should be on GPU" + assert masks[0, 1] == True, "layer0-expert1 (0.5) should be on GPU" + assert expected_gpu_count == 3, f"Expected 3 GPU experts, got {expected_gpu_count}" + + print("PASSED\n") + + +def test_edge_cases(): + """Test edge cases.""" + print("=" * 60) + print("Test 2: Edge cases") + print("=" * 60) + + activation_freq = torch.tensor([ + [0.1, 0.5, 0.3, 0.8], + [0.2, 0.4, 0.9, 0.1], + ]) + + # Test num_gpu_experts = 0 + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=0) + assert masks.sum().item() == 0, "num_gpu_experts=0 should have no GPU experts" + print(f"num_gpu_experts=0: {masks.sum().item()} GPU experts - PASSED") + + # Test num_gpu_experts = total experts + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=8) + assert masks.sum().item() == 8, "num_gpu_experts=8 should have all experts on GPU" + print(f"num_gpu_experts=8 (all): {masks.sum().item()} GPU experts - PASSED") + + # Test num_gpu_experts > total experts (should clamp) + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=100) + assert masks.sum().item() == 8, "num_gpu_experts=100 should be clamped to 8" + print(f"num_gpu_experts=100 (clamped): {masks.sum().item()} GPU experts - PASSED") + + # Test negative num_gpu_experts (should clamp to 0) + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=-5) + assert masks.sum().item() == 0, "num_gpu_experts=-5 should be clamped to 0" + print(f"num_gpu_experts=-5 (clamped): {masks.sum().item()} GPU experts - PASSED") + + print("All edge cases PASSED\n") + + +def test_performance(): + """Test performance with realistic sizes.""" + print("=" * 60) + print("Test 3: Performance") + print("=" * 60) + + # DeepSeek-V3 like: 61 layers, 256 experts + num_layers = 61 + num_experts = 256 + + # Generate random activation frequencies + activation_freq = torch.rand(num_layers, num_experts) + + # Test with different num_gpu_experts + test_cases = [0, 100, 500, 1000, 2000, 5000, num_layers * num_experts] + + print(f"Shape: ({num_layers}, {num_experts}) = {num_layers * num_experts} total experts\n") + + for num_gpu in test_cases: + # Warmup + _ = generate_gpu_experts_masks(activation_freq, num_gpu_experts=num_gpu) + + # Measure time + num_runs = 100 + start = time.perf_counter() + for _ in range(num_runs): + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=num_gpu) + end = time.perf_counter() + + avg_time_us = (end - start) / num_runs * 1e6 + actual_gpu = masks.sum().item() + + print(f"num_gpu_experts={num_gpu:5d} -> actual={actual_gpu:5d}, time={avg_time_us:8.2f} us") + + print("\nPerformance test PASSED\n") + + +def test_output_properties(): + """Test output tensor properties.""" + print("=" * 60) + print("Test 4: Output properties") + print("=" * 60) + + activation_freq = torch.rand(10, 64) + masks = generate_gpu_experts_masks(activation_freq, num_gpu_experts=50) + + print(f"Shape: {masks.shape}") + print(f"Dtype: {masks.dtype}") + print(f"Device: {masks.device}") + print(f"Is contiguous: {masks.is_contiguous()}") + + assert masks.shape == (10, 64), f"Expected shape (10, 64), got {masks.shape}" + assert masks.dtype == torch.bool, f"Expected dtype bool, got {masks.dtype}" + assert str(masks.device) == "cpu", f"Expected device cpu, got {masks.device}" + + print("All properties PASSED\n") + + +def test_determinism(): + """Test that results are deterministic.""" + print("=" * 60) + print("Test 5: Determinism") + print("=" * 60) + + activation_freq = torch.rand(20, 128) + + masks1 = generate_gpu_experts_masks(activation_freq, num_gpu_experts=100) + masks2 = generate_gpu_experts_masks(activation_freq, num_gpu_experts=100) + + assert torch.equal(masks1, masks2), "Results should be deterministic" + print("Determinism PASSED\n") + + +if __name__ == "__main__": + test_basic() + test_edge_cases() + test_output_properties() + test_determinism() + test_performance() + + print("=" * 60) + print("All tests PASSED!") + print("=" * 60)