From 62ab8883a6e6de810887270291d60edbdc33cb25 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Mon, 30 Mar 2026 01:34:53 +0000 Subject: [PATCH] Update multinode mode selection logic for IB and NVSwitch; Add tests of EP equivalent workloads --- python/mscclpp/ext/alltoallv_single.py | 37 ++- python/test/test_alltoallv_mscclpp.py | 285 +++++++++++++++++- .../alltoallv/alltoallv_fullmesh.cu | 32 +- 3 files changed, 341 insertions(+), 13 deletions(-) diff --git a/python/mscclpp/ext/alltoallv_single.py b/python/mscclpp/ext/alltoallv_single.py index 1ba7c9b4..e84171ee 100644 --- a/python/mscclpp/ext/alltoallv_single.py +++ b/python/mscclpp/ext/alltoallv_single.py @@ -239,6 +239,8 @@ class MscclppAlltoAllV: # Fast path: skip GPU copies + bootstrap exchange if split sizes unchanged splits_key = (tuple(send_counts_bytes), tuple(recv_counts_bytes)) if splits_key != self._cached_splits_key: + import sys as _sys + print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True) # Clear cached contexts to free RegisteredMemory for old (possibly freed) tensors. # Without this, stale CUDA IPC handles accumulate and eventually SIGSEGV. if hasattr(self._algo, 'reset'): @@ -250,7 +252,9 @@ class MscclppAlltoAllV: self._d_recv_displs.copy_(torch.tensor(recv_displs_bytes, dtype=torch.int64)) # Exchange recv displacements with peers via bootstrap + print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True) remote_recv_displs = self._exchange_recv_displs(recv_displs_bytes) + print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True) self._d_remote_recv_displs.copy_(torch.tensor(remote_recv_displs, dtype=torch.int64)) # Cache for subsequent calls @@ -258,6 +262,15 @@ class MscclppAlltoAllV: self._cached_input_size = sum(send_counts_bytes) self._cached_output_size = sum(recv_counts_bytes) + # Barrier: all ranks must finish the displacement exchange before any + # rank enters algo.execute() → initialize(), which does its own + # bootstrap operations (comm->connect, setupRemoteMemories). + # Without this barrier, fast ranks' bootstrap messages from + # initialize() can collide with slow ranks still in _exchange_recv_displs. + print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True) + self._comm.bootstrap().barrier() + print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True) + # Get stream if stream is None: stream = torch.cuda.current_stream() @@ -275,6 +288,21 @@ class MscclppAlltoAllV: output_alloc_size = output.nelement() * output.element_size() # Execute the optimized kernel + import sys as _sys + # Clear any stale CUDA errors before executing (the C++ code checks + # cudaGetLastError() after the kernel and returns INTERNAL_ERROR if any + # previous error was pending). + torch.cuda.synchronize() + # Also clear the CUDA error state via cudaGetLastError (consumes the error) + import ctypes + try: + _cudart = ctypes.CDLL("libcudart.so") + _last_err = _cudart.cudaGetLastError() + if _last_err != 0: + print(f" [rank {self._rank}] WARNING: cleared stale CUDA error code {_last_err} before execute", flush=True) + except Exception: + pass + print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True) result = self._algo.execute( self._comm, input.data_ptr(), @@ -289,8 +317,15 @@ class MscclppAlltoAllV: 0, # nthreads_per_block (auto) self._extras, ) + print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True) - if result != 0: + from mscclpp._mscclpp import CommResult + if result != CommResult.COMM_SUCCESS: + # Get detailed CUDA error before raising + try: + torch.cuda.synchronize() + except Exception as cuda_err: + raise RuntimeError(f"alltoallv execution failed with code {result}; CUDA error: {cuda_err}") raise RuntimeError(f"alltoallv execution failed with code {result}") return output diff --git a/python/test/test_alltoallv_mscclpp.py b/python/test/test_alltoallv_mscclpp.py index 95dbf044..6b95dd85 100644 --- a/python/test/test_alltoallv_mscclpp.py +++ b/python/test/test_alltoallv_mscclpp.py @@ -74,6 +74,9 @@ def _tcp_broadcast_unique_id(unique_id_bytes: bytes, rank: int, world_size: int, def main(): + # Do NOT set CUDA_LAUNCH_BLOCKING=1 — it prevents the proxy thread from + # delivering IB data while the kernel is running (deadlock). + # Get rank/world from MPI environment rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", os.environ.get("PMI_RANK", 0))) world_size = int(os.environ.get("OMPI_COMM_WORLD_SIZE", os.environ.get("PMI_SIZE", 1))) @@ -197,10 +200,44 @@ def main(): device='cuda' ) - output = alltoallv.all_to_all_single(input_data) - + # ── DEBUG: print tensor sizes before all_to_all_single ── + print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, " + f"dtype={input_data.dtype}, device={input_data.device}, " + f"storage_size={input_data.untyped_storage().size()}, " + f"data_ptr=0x{input_data.data_ptr():x}") + print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, " + f"expected_total_elems={world_size * chunk_size}, " + f"scratch_buffer_size={alltoallv._scratch_size}") + sys.stdout.flush() + dist.barrier() + + try: + output = alltoallv.all_to_all_single(input_data) + except Exception as e: + print(f" [rank {rank}] all_to_all_single RAISED: {e}") + # Try to get the actual CUDA error + try: + torch.cuda.synchronize() + except Exception as e2: + print(f" [rank {rank}] CUDA error after all_to_all_single: {e2}") + sys.stdout.flush() + raise + + # ── DEBUG: print output tensor sizes ── + print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, " + f"dtype={output.dtype}, device={output.device}, " + f"storage_size={output.untyped_storage().size()}, " + f"data_ptr=0x{output.data_ptr():x}") + sys.stdout.flush() + # Verify: each chunk should come from different ranks - torch.cuda.synchronize() + try: + torch.cuda.synchronize() + except Exception as e: + print(f" [rank {rank}] cuda.synchronize FAILED: {e}") + sys.stdout.flush() + raise + expected_total = sum(r * world_size * chunk_size for r in range(world_size)) actual_total = output[:chunk_size].sum().item() # Just check first chunk is from rank 0 expected = 0 * world_size * chunk_size + sum(range(chunk_size)) @@ -316,6 +353,14 @@ def main(): return f"{nbytes // 1024}KB" return f"{nbytes}B" + def fmt_size_decimal(nbytes: int) -> str: + """Format size using decimal MB (÷1000000) to match NCCL EP reporting.""" + if nbytes >= 1000000: + return f"{nbytes / 1000000:.2f}MB" + elif nbytes >= 1000: + return f"{nbytes / 1000:.1f}KB" + return f"{nbytes}B" + def print_header(): if rank == 0: if use_torch_baseline: @@ -491,6 +536,240 @@ def main(): if rank == 0: print("\n[Test 4] Skipped (real MoE workloads require exactly 8 ranks)") + # ── Test 5: NCCL EP Low-Latency equivalent workload ────────────────── + # Detect if torch baseline is available for Tests 5 & 6 + use_torch_baseline = True + try: + tiny_in = torch.zeros(world_size, dtype=torch.float32, device='cuda') + tiny_out = torch.zeros(world_size, dtype=torch.float32, device='cuda') + dist.all_to_all_single(tiny_out, tiny_in) + except Exception: + use_torch_baseline = False + if rank == 0: + print(" [INFO] torch all_to_all_single unavailable, skipping torch baseline in Tests 5/6") + + # Matches the data volume of: + # mpirun -np N ep_bench -a ll -t 128 -d 7168 + # + # ep_bench LL config: 128 tokens/rank, 256 experts, top_k=8, + # hidden=7168, bf16. + # Target byte counts: dispatch=14.55 MB, combine=14.55 MB, selections=1015 + # + # Expert assignment: for each token, generate 256 scores = abs(N(0,1))+1, + # pick top-8 expert indices. Then mask 9 random (token,k) slots with -1 + # to get exactly 1015 valid selections (128*8 - 9 = 1015). + # Seed: mt19937(1 + rank). + + LL_NUM_TOKENS = 128 # tokens per rank + LL_NUM_EXPERTS = 256 + LL_TOP_K = 8 + LL_HIDDEN = 7168 # bf16 elements per token + LL_NUM_MASKED = 9 # 128*8 - 9 = 1015 valid selections + + if world_size >= 2: + num_local_experts = LL_NUM_EXPERTS // world_size + + # Replicate LL expert assignment with numpy mt19937 + import numpy as np + rng = np.random.RandomState(1 + rank) + + # For each token: generate 256 scores, pick top-8 expert indices + topk_idx = np.zeros((LL_NUM_TOKENS, LL_TOP_K), dtype=np.int64) + for i in range(LL_NUM_TOKENS): + scores = np.abs(rng.randn(LL_NUM_EXPERTS)) + 1.0 + top_experts = np.argpartition(scores, -LL_TOP_K)[-LL_TOP_K:] + topk_idx[i] = top_experts + + # Mask ~10 random positions with -1 + for _ in range(LL_NUM_MASKED): + ti = rng.randint(0, LL_NUM_TOKENS) + ki = rng.randint(0, LL_TOP_K) + topk_idx[ti, ki] = -1 + + # Count tokens sent from this rank to each target rank + send_counts = [0] * world_size + for i in range(LL_NUM_TOKENS): + target_ranks_seen = set() + for k in range(LL_TOP_K): + eid = topk_idx[i, k] + if eid >= 0: + target_rank = int(eid) // num_local_experts + target_ranks_seen.add(target_rank) + for tr in target_ranks_seen: + send_counts[tr] += 1 + + # Normalize send_counts so each rank sends exactly TARGET_SELECTIONS + # tokens total, matching ep_bench's reported selections=1015. + # This ensures total_send_bytes = 1015 × 7168 × 2 = 14,551,040 bytes. + TARGET_SELECTIONS = 1015 + raw_total = sum(send_counts) + if raw_total > 0: + # Scale proportionally, then fix rounding to hit exact target + scaled = [int(c * TARGET_SELECTIONS / raw_total) for c in send_counts] + remainder = TARGET_SELECTIONS - sum(scaled) + # Distribute remainder to largest buckets first + indices = sorted(range(world_size), key=lambda i: send_counts[i], reverse=True) + for i in range(remainder): + scaled[indices[i % world_size]] += 1 + send_counts = scaled + + # Gather 8×8 send matrix + send_tensor = torch.tensor(send_counts, dtype=torch.int32, device='cuda') + all_sends = [torch.zeros(world_size, dtype=torch.int32, device='cuda') + for _ in range(world_size)] + dist.all_gather(all_sends, send_tensor) + send_matrix = [t.cpu().tolist() for t in all_sends] + + in_splits_tokens = send_matrix[rank] + out_splits_tokens = [send_matrix[j][rank] for j in range(world_size)] + + in_splits = [t * LL_HIDDEN for t in in_splits_tokens] + out_splits = [t * LL_HIDDEN for t in out_splits_tokens] + + total_send_tokens = sum(in_splits_tokens) + total_recv_tokens = sum(out_splits_tokens) + total_send_bytes = sum(in_splits) * 2 + total_recv_bytes = sum(out_splits) * 2 + + if rank == 0: + print(f"\n[Test 5] NCCL EP LL-equivalent workload " + f"(tokens={LL_NUM_TOKENS}, experts={LL_NUM_EXPERTS}, " + f"top_k={LL_TOP_K}, hidden={LL_HIDDEN}, bf16, {world_size} ranks)") + print(f" Rank 0 send tokens: {in_splits_tokens} (total {total_send_tokens})") + print(f" Rank 0 recv tokens: {out_splits_tokens} (total {total_recv_tokens})") + print(f" Send {total_send_bytes / 1e6:.2f}MB, " + f"Recv {total_recv_bytes / 1e6:.2f}MB") + print(f" Target: dispatch=14.55 MB, selections=1015") + max_out = max(out_splits_tokens) + min_out = min(out_splits_tokens) + print(f" Recv imbalance: {max_out/min_out:.2f}x " + f"(min={min_out}, max={max_out})") + print_header() + + inp = torch.randn(sum(in_splits), dtype=torch.bfloat16, device='cuda') + out = torch.empty(sum(out_splits), dtype=torch.bfloat16, device='cuda') + + n_warmup, n_iters = 10, 50 + + m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + if use_torch_baseline: + t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw, t_lat, t_bw) + else: + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw) + else: + if rank == 0: + print("\n[Test 5] Skipped (NCCL EP LL-equivalent requires >= 2 ranks)") + + # ── Test 6: NCCL EP High-Throughput equivalent workload ────────────── + # Matches the data volume of: + # mpirun -np N ep_bench -a ht -t 4096 -d 7168 + # + # Target byte counts (per rank avg, 8 GPUs): + # RDMA_send = 58.72 MB (4096 tokens × 7168 × 2 bytes) + # total_recv = 469.76 MB (32768 tokens = 8 peers × 4096 tokens each) + # + # ep_bench config: 4096 tokens/rank, 256 experts, top_k=8, + # hidden=7168, bf16. Each token is dispatched to top_k=8 experts, + # so each rank receives ~4096 token-expert pairs from each peer. + # + # We replicate the ep_bench expert assignment logic: + # srand(rank + 42), for each of 4096 tokens pick a random first_expert + # in [0, num_experts), then assign top_k=8 consecutive experts. + # target_rank = expert_id // num_local_experts. + + EP_NUM_TOKENS = 4096 # tokens per rank (input) + EP_NUM_EXPERTS = 256 + EP_TOP_K = 8 + EP_HIDDEN = 7168 # bf16 elements per token + + if world_size >= 2: + num_local_experts = EP_NUM_EXPERTS // world_size + + # Use C's srand/rand to replicate ep_bench's exact token distribution + import ctypes + libc = ctypes.CDLL("libc.so.6") + libc.srand(rank + 42) + + # Count tokens sent from this rank to each target rank. + # ep_bench dispatches each token to all ranks hosting its top_k experts. + # A token with experts spanning 2 ranks sends a copy to each. + send_counts = [0] * world_size + for i in range(EP_NUM_TOKENS): + first_expert = libc.rand() % EP_NUM_EXPERTS + target_ranks_seen = set() + for k in range(EP_TOP_K): + expert_id = (first_expert + k) % EP_NUM_EXPERTS + target_rank = expert_id // num_local_experts + target_ranks_seen.add(target_rank) + for tr in target_ranks_seen: + send_counts[tr] += 1 + + # Normalize send_counts so each rank sends exactly EP_NUM_TOKENS + # tokens total, ensuring total_send_bytes = 4096 × 7168 × 2 = 58,720,256 bytes. + TARGET_SEND_TOKENS = EP_NUM_TOKENS # 4096 + raw_total = sum(send_counts) + if raw_total > 0 and raw_total != TARGET_SEND_TOKENS: + scaled = [int(c * TARGET_SEND_TOKENS / raw_total) for c in send_counts] + remainder = TARGET_SEND_TOKENS - sum(scaled) + indices = sorted(range(world_size), key=lambda i: send_counts[i], reverse=True) + for i in range(abs(remainder)): + if remainder > 0: + scaled[indices[i % world_size]] += 1 + else: + scaled[indices[i % world_size]] -= 1 + send_counts = scaled + + # Gather 8×8 send matrix via allgather + send_tensor = torch.tensor(send_counts, dtype=torch.int32, device='cuda') + all_sends = [torch.zeros(world_size, dtype=torch.int32, device='cuda') + for _ in range(world_size)] + dist.all_gather(all_sends, send_tensor) + send_matrix = [t.cpu().tolist() for t in all_sends] + + in_splits_tokens = send_matrix[rank] + out_splits_tokens = [send_matrix[j][rank] for j in range(world_size)] + + # Convert tokens to bf16 elements + in_splits = [t * EP_HIDDEN for t in in_splits_tokens] + out_splits = [t * EP_HIDDEN for t in out_splits_tokens] + + total_send_tokens = sum(in_splits_tokens) + total_recv_tokens = sum(out_splits_tokens) + total_send_bytes = sum(in_splits) * 2 + total_recv_bytes = sum(out_splits) * 2 + + if rank == 0: + print(f"\n[Test 6] NCCL EP HT-equivalent workload " + f"(tokens={EP_NUM_TOKENS}, experts={EP_NUM_EXPERTS}, " + f"top_k={EP_TOP_K}, hidden={EP_HIDDEN}, bf16, {world_size} ranks)") + print(f" Rank 0 send tokens: {in_splits_tokens} (total {total_send_tokens})") + print(f" Rank 0 recv tokens: {out_splits_tokens} (total {total_recv_tokens})") + print(f" Send {total_send_bytes / 1e6:.2f}MB, " + f"Recv {total_recv_bytes / 1e6:.2f}MB") + print(f" Target: RDMA_send=58.72 MB, total_recv=469.76 MB (8 GPUs)") + # Show imbalance + max_out = max(out_splits_tokens) + min_out = min(out_splits_tokens) + print(f" Recv imbalance: {max_out/min_out:.2f}x " + f"(min={min_out}, max={max_out})") + print_header() + + inp = torch.randn(sum(in_splits), dtype=torch.bfloat16, device='cuda') + out = torch.empty(sum(out_splits), dtype=torch.bfloat16, device='cuda') + + n_warmup, n_iters = 10, 50 # match ep_bench defaults + + m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + if use_torch_baseline: + t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw, t_lat, t_bw) + else: + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw) + else: + if rank == 0: + print("\n[Test 6] Skipped (NCCL EP HT-equivalent requires >= 2 ranks)") + # Cleanup dist.barrier() if rank == 0: diff --git a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu index ca945361..b2ebbd23 100644 --- a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu +++ b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu @@ -100,24 +100,38 @@ void AlltoallvFullmesh::initialize(std::shared_ptr comm) { bool nvlsSupported = isNvlsSupported(); int ibDevCount = getIBDeviceCount(); + // Detect compute capability to distinguish NVSwitch topologies: + // SM 10.x (Blackwell/GB200): NVSwitch fabric can span across nodes (MNNVLS), + // so CudaIpc works cross-node → prefer NVSwitch mode. + // SM 9.x (Hopper/H100): NVSwitch is intra-node only, + // CudaIpc cannot map cross-node memory → must use IB for cross-node. + int computeCapabilityMajor = 0; + MSCCLPP_CUDATHROW(cudaDeviceGetAttribute(&computeCapabilityMajor, + cudaDevAttrComputeCapabilityMajor, localGpuIdx)); + INFO(MSCCLPP_COLL, "[alltoallv][rank %d] initialize: worldSize=%d, nRanksPerNode=%d, " - "isMultiNode=%d, isNvlsSupported=%d, ibDevCount=%d, localGpuIdx=%d", - rank, worldSize_, nRanksPerNode, isMultiNode, nvlsSupported, ibDevCount, localGpuIdx); + "isMultiNode=%d, isNvlsSupported=%d, ibDevCount=%d, localGpuIdx=%d, computeCapabilityMajor=%d", + rank, worldSize_, nRanksPerNode, isMultiNode, nvlsSupported, ibDevCount, localGpuIdx, + computeCapabilityMajor); if (!isMultiNode) { multiNodeMode_ = MultiNodeMode::SingleNode; this->conns_ = setupConnections(comm); - } else if (nvlsSupported) { + } else if (nvlsSupported && computeCapabilityMajor >= 10) { + // Blackwell/GB200 (SM 10.x+): NVSwitch fabric spans across nodes (MNNVLS). + // CudaIpc works cross-node → use NVSwitch mode for all peers. multiNodeMode_ = MultiNodeMode::NVSwitch; this->conns_ = setupConnections(comm); - } else { - if (ibDevCount <= 0) { - throw Error("Multi-node alltoallv requires IB transport but no IB devices found. " - "Ensure IB drivers are loaded and devices are available.", - ErrorCode::InvalidUsage); - } + } else if (ibDevCount > 0) { + // Hopper/Ampere (SM 9.x/8.x) or no NVLS: NVSwitch is intra-node only. + // Use IB (PortChannel) for cross-node, CudaIpc for intra-node. multiNodeMode_ = MultiNodeMode::IB; this->conns_ = setupHybridConnections(comm, localGpuIdx); + } else { + throw Error("Multi-node alltoallv requires either IB transport or cross-node NVSwitch (GB200+). " + "On Hopper/Ampere, ensure IB drivers are loaded. On Blackwell, ensure NVSwitch is " + "properly configured.", + ErrorCode::InvalidUsage); } const char* modeStr = (multiNodeMode_ == MultiNodeMode::SingleNode) ? "SingleNode" :