From 1d271f4cc7a2b93cd8f622d6ceae70c24f35ec6a Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Wed, 8 Apr 2026 23:03:12 +0000 Subject: [PATCH] Merge latest multinode branch --- python/mscclpp/ext/alltoallv_single.py | 63 ++- python/test/test_alltoallv_mscclpp.py | 310 ++++++++++++++- .../alltoallv/alltoallv_fullmesh.cu | 98 +++-- src/ext/collectives/collective_utils.cc | 21 - .../include/alltoallv/alltoallv_kernel.hpp | 360 ------------------ .../collectives/include/collective_utils.hpp | 20 - test/mscclpp-test/alltoallv_test.cu | 48 +-- 7 files changed, 427 insertions(+), 493 deletions(-) diff --git a/python/mscclpp/ext/alltoallv_single.py b/python/mscclpp/ext/alltoallv_single.py index 419a13d8..088fac0f 100644 --- a/python/mscclpp/ext/alltoallv_single.py +++ b/python/mscclpp/ext/alltoallv_single.py @@ -28,9 +28,18 @@ from mscclpp._mscclpp import ( TcpBootstrap, DataType, ReduceOp, + CommResult, ) from mscclpp.ext.algorithm_collection_builder import AlgorithmCollectionBuilder +import ctypes as _ctypes +try: + _cudart = _ctypes.CDLL("libcudart.so") +except Exception: + _cudart = None + +_DEBUG = os.environ.get("MSCCLPP_DEBUG_ALLTOALLV", "0") == "1" + __all__ = ["MscclppAlltoAllV", "all_to_all_single"] @@ -164,6 +173,8 @@ class MscclppAlltoAllV: self._cached_output_size = 0 self._cached_total_output_elems = 0 self._cached_dtype = None + # One-time check for untyped_storage (available since PyTorch 1.13) + self._has_untyped_storage = hasattr(torch.Tensor, 'untyped_storage') # Pre-built extras dict (GPU pointers don't change) self._extras = { "sendCounts": self._d_send_counts.data_ptr(), @@ -248,6 +259,8 @@ class MscclppAlltoAllV: # Fast path: skip GPU copies + bootstrap exchange if split sizes unchanged splits_key = (tuple(send_counts_bytes), tuple(recv_counts_bytes)) if splits_key != self._cached_splits_key: + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True) # Clear cached contexts to free RegisteredMemory for old (possibly freed) tensors. # Without this, stale CUDA IPC handles accumulate and eventually SIGSEGV. if hasattr(self._algo, 'reset'): @@ -259,7 +272,11 @@ class MscclppAlltoAllV: self._d_recv_displs.copy_(torch.tensor(recv_displs_bytes, dtype=torch.int64)) # Exchange recv displacements with peers via bootstrap + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True) remote_recv_displs = self._exchange_recv_displs(recv_displs_bytes) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True) self._d_remote_recv_displs.copy_(torch.tensor(remote_recv_displs, dtype=torch.int64)) # Cache for subsequent calls @@ -267,19 +284,29 @@ class MscclppAlltoAllV: self._cached_input_size = sum(send_counts_bytes) self._cached_output_size = sum(recv_counts_bytes) + # Barrier: all ranks must finish the displacement exchange before any + # rank enters algo.execute() → initialize(), which does its own + # bootstrap operations (comm->connect, setupRemoteMemories). + # Without this barrier, fast ranks' bootstrap messages from + # initialize() can collide with slow ranks still in _exchange_recv_displs. + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True) + self._comm.bootstrap().barrier() + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True) + # Get stream if stream is None: stream = torch.cuda.current_stream() cuda_stream = stream.cuda_stream - # Use the full underlying storage size (not just the view's active data) - # for the context key, so that reusing views of the same tensor with - # different split sizes doesn't create new contexts (which leak - # RegisteredMemory for stale buffers). - try: + # Use the full underlying storage size for context key stability. + # When the test reuses the same large tensor with different split sizes, + # storage size stays constant → same context key → reuses channels. + if self._has_untyped_storage: input_alloc_size = input.untyped_storage().size() output_alloc_size = output.untyped_storage().size() - except Exception: + else: input_alloc_size = input.nelement() * input.element_size() output_alloc_size = output.nelement() * output.element_size() @@ -290,7 +317,7 @@ class MscclppAlltoAllV: # so the alltoallv kernel launches on a quiet GPU. torch.cuda.synchronize() - _a2av_dbg(f"[A2AV R{self._rank}] #{_cid} pre-barrier in={input_size} out={output_size}") + _a2av_dbg(f"[A2AV R{self._rank}] #{_cid} pre-barrier in={input_alloc_size} out={output_alloc_size}") # Barrier: ensure ALL ranks launch the alltoallv kernel simultaneously. # The kernel uses inter-GPU flag-based signaling that requires every @@ -300,6 +327,16 @@ class MscclppAlltoAllV: _a2av_dbg(f"[A2AV R{self._rank}] #{_cid} post-barrier, launching kernel") # Execute the optimized kernel + + if _DEBUG: + # Clear stale CUDA errors (the C++ code checks cudaGetLastError + # after the kernel and returns INTERNAL_ERROR if any was pending). + if _cudart is not None: + _last_err = _cudart.cudaGetLastError() + if _last_err != 0: + print(f" [rank {self._rank}] WARNING: cleared stale CUDA error code {_last_err} before execute", flush=True) + print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True) + result = self._algo.execute( self._comm, input.data_ptr(), @@ -315,9 +352,15 @@ class MscclppAlltoAllV: self._extras, ) - _a2av_dbg(f"[A2AV R{self._rank}] #{_cid} kernel returned rc={result}") - - if result != 0: + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True) + + if result != CommResult.COMM_SUCCESS: + # Get detailed CUDA error before raising + try: + torch.cuda.synchronize() + except Exception as cuda_err: + raise RuntimeError(f"alltoallv execution failed with code {result}; CUDA error: {cuda_err}") raise RuntimeError(f"alltoallv execution failed with code {result}") return output diff --git a/python/test/test_alltoallv_mscclpp.py b/python/test/test_alltoallv_mscclpp.py index 95dbf044..e8797e43 100644 --- a/python/test/test_alltoallv_mscclpp.py +++ b/python/test/test_alltoallv_mscclpp.py @@ -14,6 +14,8 @@ import torch.distributed as dist import os import sys import time + +_DEBUG = os.environ.get("MSCCLPP_DEBUG_ALLTOALLV", "0") == "1" import random import socket import struct @@ -74,6 +76,9 @@ def _tcp_broadcast_unique_id(unique_id_bytes: bytes, rank: int, world_size: int, def main(): + # Do NOT set CUDA_LAUNCH_BLOCKING=1 — it prevents the proxy thread from + # delivering IB data while the kernel is running (deadlock). + # Get rank/world from MPI environment rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", os.environ.get("PMI_RANK", 0))) world_size = int(os.environ.get("OMPI_COMM_WORLD_SIZE", os.environ.get("PMI_SIZE", 1))) @@ -163,15 +168,17 @@ def main(): except Exception: ib_devices = [] - if rank == 0: + if rank == 0 and _DEBUG: print(f" Hostname: {hostname}") print(f" nRanksPerNode: {n_ranks_per_node}, isMultiNode: {is_multi_node}") print(f" IB devices: {ib_devices if ib_devices else 'NONE FOUND'}") print(f" MSCCLPP_SOCKET_IFNAME: {os.environ.get('MSCCLPP_SOCKET_IFNAME', '')}") if is_multi_node and not ib_devices: - print(f" WARNING: Multi-node detected but no IB devices! Cross-node will fail.") + print(f" NOTE: Multi-node detected but no IB devices. " + f"GB200 NVSwitch can handle cross-node without IB; " + f"on Hopper/Ampere IB is required.") # Also print from rank n_ranks_per_node (first rank on node 1) for comparison - if is_multi_node and rank == n_ranks_per_node: + if is_multi_node and rank == n_ranks_per_node and _DEBUG: print(f" [Node 1] Hostname: {hostname}, rank={rank}") print(f" [Node 1] IB devices: {ib_devices if ib_devices else 'NONE FOUND'}") # ── End diagnostics ──────────────────────────────────────────────── @@ -181,7 +188,7 @@ def main(): # Create MscclppAlltoAllV with existing communicator alltoallv = MscclppAlltoAllV(communicator=comm) - if rank == 0: + if rank == 0 and _DEBUG: print(f"MscclppAlltoAllV initialized") print(f"Algorithm: {alltoallv._algo.name}") @@ -197,10 +204,46 @@ def main(): device='cuda' ) - output = alltoallv.all_to_all_single(input_data) - + # ── DEBUG: print tensor sizes before all_to_all_single ── + if _DEBUG: + print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, " + f"dtype={input_data.dtype}, device={input_data.device}, " + f"storage_size={input_data.untyped_storage().size()}, " + f"data_ptr=0x{input_data.data_ptr():x}") + print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, " + f"expected_total_elems={world_size * chunk_size}, " + f"scratch_buffer_size={alltoallv._scratch_size}") + sys.stdout.flush() + dist.barrier() + + try: + output = alltoallv.all_to_all_single(input_data) + except Exception as e: + print(f" [rank {rank}] all_to_all_single RAISED: {e}") + # Try to get the actual CUDA error + try: + torch.cuda.synchronize() + except Exception as e2: + print(f" [rank {rank}] CUDA error after all_to_all_single: {e2}") + sys.stdout.flush() + raise + + # ── DEBUG: print output tensor sizes ── + if _DEBUG: + print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, " + f"dtype={output.dtype}, device={output.device}, " + f"storage_size={output.untyped_storage().size()}, " + f"data_ptr=0x{output.data_ptr():x}") + sys.stdout.flush() + # Verify: each chunk should come from different ranks - torch.cuda.synchronize() + try: + torch.cuda.synchronize() + except Exception as e: + print(f" [rank {rank}] cuda.synchronize FAILED: {e}") + sys.stdout.flush() + raise + expected_total = sum(r * world_size * chunk_size for r in range(world_size)) actual_total = output[:chunk_size].sum().item() # Just check first chunk is from rank 0 expected = 0 * world_size * chunk_size + sum(range(chunk_size)) @@ -316,6 +359,14 @@ def main(): return f"{nbytes // 1024}KB" return f"{nbytes}B" + def fmt_size_decimal(nbytes: int) -> str: + """Format size using decimal MB (÷1000000) to match NCCL EP reporting.""" + if nbytes >= 1000000: + return f"{nbytes / 1000000:.2f}MB" + elif nbytes >= 1000: + return f"{nbytes / 1000:.1f}KB" + return f"{nbytes}B" + def print_header(): if rank == 0: if use_torch_baseline: @@ -491,6 +542,251 @@ def main(): if rank == 0: print("\n[Test 4] Skipped (real MoE workloads require exactly 8 ranks)") + # ── Test 5: NCCL EP Low-Latency equivalent workload ────────────────── + # Detect if torch baseline is available for Tests 5 & 6 + use_torch_baseline = True + try: + tiny_in = torch.zeros(world_size, dtype=torch.float32, device='cuda') + tiny_out = torch.zeros(world_size, dtype=torch.float32, device='cuda') + dist.all_to_all_single(tiny_out, tiny_in) + except Exception: + use_torch_baseline = False + if rank == 0: + print(" [INFO] torch all_to_all_single unavailable, skipping torch baseline in Tests 5/6") + + # Matches the data volume of: + # mpirun -np N ep_bench -a ll -t 128 -d 7168 + # + # ep_bench LL config: 128 tokens/rank, 256 experts, top_k=8, + # hidden=7168, bf16. + # Target byte counts: dispatch=14.55 MB, combine=14.55 MB, selections=1015 + # + # Expert assignment: for each token, generate 256 scores = abs(N(0,1))+1, + # pick top-8 expert indices. Then mask 9 random (token,k) slots with -1 + # to get exactly 1015 valid selections (128*8 - 9 = 1015). + # Seed: mt19937(1 + rank). + + LL_NUM_TOKENS = 128 # tokens per rank + LL_NUM_EXPERTS = 256 + LL_TOP_K = 8 + LL_HIDDEN = 7168 # bf16 elements per token + LL_NUM_MASKED = 9 # 128*8 - 9 = 1015 valid selections + + if world_size >= 2: + num_local_experts = LL_NUM_EXPERTS // world_size + + # Replicate LL expert assignment with numpy mt19937 + import numpy as np + rng = np.random.RandomState(1 + rank) + + # For each token: generate 256 scores, pick top-8 expert indices + topk_idx = np.zeros((LL_NUM_TOKENS, LL_TOP_K), dtype=np.int64) + for i in range(LL_NUM_TOKENS): + scores = np.abs(rng.randn(LL_NUM_EXPERTS)) + 1.0 + top_experts = np.argpartition(scores, -LL_TOP_K)[-LL_TOP_K:] + topk_idx[i] = top_experts + + # Mask ~10 random positions with -1 + for _ in range(LL_NUM_MASKED): + ti = rng.randint(0, LL_NUM_TOKENS) + ki = rng.randint(0, LL_TOP_K) + topk_idx[ti, ki] = -1 + + # Count tokens sent from this rank to each target rank + send_counts = [0] * world_size + for i in range(LL_NUM_TOKENS): + target_ranks_seen = set() + for k in range(LL_TOP_K): + eid = topk_idx[i, k] + if eid >= 0: + target_rank = int(eid) // num_local_experts + target_ranks_seen.add(target_rank) + for tr in target_ranks_seen: + send_counts[tr] += 1 + + # Normalize send_counts so each rank sends exactly TARGET_SELECTIONS + # tokens total, matching ep_bench's reported selections=1015. + # This ensures total_send_bytes = 1015 × 7168 × 2 = 14,551,040 bytes. + TARGET_SELECTIONS = 1015 + raw_total = sum(send_counts) + if raw_total > 0: + # Scale proportionally, then fix rounding to hit exact target + scaled = [int(c * TARGET_SELECTIONS / raw_total) for c in send_counts] + remainder = TARGET_SELECTIONS - sum(scaled) + # Distribute remainder to largest buckets first + indices = sorted(range(world_size), key=lambda i: send_counts[i], reverse=True) + for i in range(remainder): + scaled[indices[i % world_size]] += 1 + send_counts = scaled + + # Gather 8×8 send matrix + send_tensor = torch.tensor(send_counts, dtype=torch.int32, device='cuda') + all_sends = [torch.zeros(world_size, dtype=torch.int32, device='cuda') + for _ in range(world_size)] + dist.all_gather(all_sends, send_tensor) + send_matrix = [t.cpu().tolist() for t in all_sends] + + in_splits_tokens = send_matrix[rank] + out_splits_tokens = [send_matrix[j][rank] for j in range(world_size)] + + in_splits = [t * LL_HIDDEN for t in in_splits_tokens] + out_splits = [t * LL_HIDDEN for t in out_splits_tokens] + + total_send_tokens = sum(in_splits_tokens) + total_recv_tokens = sum(out_splits_tokens) + total_send_bytes = sum(in_splits) * 2 + total_recv_bytes = sum(out_splits) * 2 + + if rank == 0: + print(f"\n[Test 5] NCCL EP LL-equivalent workload " + f"(tokens={LL_NUM_TOKENS}, experts={LL_NUM_EXPERTS}, " + f"top_k={LL_TOP_K}, hidden={LL_HIDDEN}, bf16, {world_size} ranks)") + print(f" Rank 0 send tokens: {in_splits_tokens} (total {total_send_tokens})") + print(f" Rank 0 recv tokens: {out_splits_tokens} (total {total_recv_tokens})") + print(f" Send {total_send_bytes / 1e6:.2f}MB, " + f"Recv {total_recv_bytes / 1e6:.2f}MB") + print(f" Target: dispatch=14.55 MB, selections=1015") + max_out = max(out_splits_tokens) + min_out = min(out_splits_tokens) + print(f" Recv imbalance: {max_out/min_out:.2f}x " + f"(min={min_out}, max={max_out})") + print_header() + + inp = torch.randn(sum(in_splits), dtype=torch.bfloat16, device='cuda') + out = torch.empty(sum(out_splits), dtype=torch.bfloat16, device='cuda') + + n_warmup, n_iters = 10, 50 + + m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + if use_torch_baseline: + t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw, t_lat, t_bw) + else: + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw) + else: + if rank == 0: + print("\n[Test 5] Skipped (NCCL EP LL-equivalent requires >= 2 ranks)") + + # ── Test 6: NCCL EP High-Throughput equivalent workload ────────────── + # Matches the data volume of: + # mpirun -np N ep_bench -a ht -t 4096 -d 7168 + # + # ep_bench config: 4096 tokens/rank, 256 experts, top_k=8, + # hidden=7168, bf16. Each token is dispatched to top_k=8 experts, + # so each rank receives ~4096 token-expert pairs from each peer. + # + # We replicate the ep_bench expert assignment logic: + # srand(rank + 42), for each of 4096 tokens pick a random first_expert + # in [0, num_experts), then assign top_k=8 consecutive experts. + # target_rank = expert_id // num_local_experts. + # + # Target send bytes vary by GPU count (to match ep_bench reports): + # 8 GPUs: 4096 tokens/rank → 58.72 MB (no cross-boundary inflation) + # 16 GPUs: 4317 tokens/rank → 61.88 MB (matches ep_bench RDMA_send) + + EP_NUM_TOKENS = 4096 # tokens per rank (input) + EP_NUM_EXPERTS = 256 + EP_TOP_K = 8 + EP_HIDDEN = 7168 # bf16 elements per token + + # Target send tokens per rank, keyed by world_size. + # 8 GPUs: top_k=8 = num_local_experts=32, so no boundary-crossing → 4096 + # 16 GPUs: num_local_experts=16, boundary crossing inflates to ~4317 + EP_TARGET_TOKENS = {8: 4096, 16: 4317} + + if world_size >= 2: + num_local_experts = EP_NUM_EXPERTS // world_size + + # Use C's srand/rand to replicate ep_bench's exact token distribution + import ctypes + libc = ctypes.CDLL("libc.so.6") + libc.srand(rank + 42) + + # Count tokens sent from this rank to each target rank. + # ep_bench dispatches each token to all ranks hosting its top_k experts. + # A token with experts spanning 2 ranks sends a copy to each. + send_counts = [0] * world_size + for i in range(EP_NUM_TOKENS): + first_expert = libc.rand() % EP_NUM_EXPERTS + target_ranks_seen = set() + for k in range(EP_TOP_K): + expert_id = (first_expert + k) % EP_NUM_EXPERTS + target_rank = expert_id // num_local_experts + target_ranks_seen.add(target_rank) + for tr in target_ranks_seen: + send_counts[tr] += 1 + + # Normalize send_counts to the target for this world_size. + # For unknown world_size, keep raw counts. + TARGET_SEND_TOKENS = EP_TARGET_TOKENS.get(world_size, sum(send_counts)) + raw_total = sum(send_counts) + if raw_total > 0 and raw_total != TARGET_SEND_TOKENS: + scaled = [int(c * TARGET_SEND_TOKENS / raw_total) for c in send_counts] + remainder = TARGET_SEND_TOKENS - sum(scaled) + indices = sorted(range(world_size), key=lambda i: send_counts[i], reverse=True) + for i in range(abs(remainder)): + if remainder > 0: + scaled[indices[i % world_size]] += 1 + else: + scaled[indices[i % world_size]] -= 1 + send_counts = scaled + + # Gather send matrix via allgather + send_tensor = torch.tensor(send_counts, dtype=torch.int32, device='cuda') + all_sends = [torch.zeros(world_size, dtype=torch.int32, device='cuda') + for _ in range(world_size)] + dist.all_gather(all_sends, send_tensor) + send_matrix = [t.cpu().tolist() for t in all_sends] + + in_splits_tokens = send_matrix[rank] + out_splits_tokens = [send_matrix[j][rank] for j in range(world_size)] + + # Convert tokens to bf16 elements + in_splits = [t * EP_HIDDEN for t in in_splits_tokens] + out_splits = [t * EP_HIDDEN for t in out_splits_tokens] + + total_send_tokens = sum(in_splits_tokens) + total_recv_tokens = sum(out_splits_tokens) + total_send_bytes = sum(in_splits) * 2 + total_recv_bytes = sum(out_splits) * 2 + + target_send_mb = TARGET_SEND_TOKENS * EP_HIDDEN * 2 / 1e6 + target_recv_tokens = world_size * EP_NUM_TOKENS + target_recv_mb = target_recv_tokens * EP_HIDDEN * 2 / 1e6 + + if rank == 0: + print(f"\n[Test 6] NCCL EP HT-equivalent workload " + f"(tokens={EP_NUM_TOKENS}, experts={EP_NUM_EXPERTS}, " + f"top_k={EP_TOP_K}, hidden={EP_HIDDEN}, bf16, {world_size} ranks)") + print(f" Rank 0 send tokens: {in_splits_tokens} (total {total_send_tokens})") + print(f" Rank 0 recv tokens: {out_splits_tokens} (total {total_recv_tokens})") + print(f" Send {total_send_bytes / 1e6:.2f}MB, " + f"Recv {total_recv_bytes / 1e6:.2f}MB") + print(f" Target: RDMA_send={target_send_mb:.2f} MB " + f"({TARGET_SEND_TOKENS} tokens), " + f"total_recv={target_recv_mb:.2f} MB " + f"({target_recv_tokens} tokens)") + max_out = max(out_splits_tokens) + min_out = min(out_splits_tokens) + print(f" Recv imbalance: {max_out/min_out:.2f}x " + f"(min={min_out}, max={max_out})") + print_header() + + inp = torch.randn(sum(in_splits), dtype=torch.bfloat16, device='cuda') + out = torch.empty(sum(out_splits), dtype=torch.bfloat16, device='cuda') + + n_warmup, n_iters = 10, 50 # match ep_bench defaults + + m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + if use_torch_baseline: + t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters) + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw, t_lat, t_bw) + else: + print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw) + else: + if rank == 0: + print("\n[Test 6] Skipped (NCCL EP HT-equivalent requires >= 2 ranks)") + # Cleanup dist.barrier() if rank == 0: diff --git a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu index ca945361..4a57d30d 100644 --- a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu +++ b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu @@ -13,18 +13,11 @@ #include #include -#include #include "debug.h" namespace mscclpp { namespace collective { -#if defined(__HIP_PLATFORM_AMD__) -#define ALLTOALLV_WARP_SIZE 64 -#else -#define ALLTOALLV_WARP_SIZE 32 -#endif - using MultiNodeMode = AlltoallvFullmesh::MultiNodeMode; // Context to hold all necessary state for alltoallv execution @@ -100,24 +93,38 @@ void AlltoallvFullmesh::initialize(std::shared_ptr comm) { bool nvlsSupported = isNvlsSupported(); int ibDevCount = getIBDeviceCount(); + // Detect compute capability to distinguish NVSwitch topologies: + // SM 10.x (Blackwell/GB200): NVSwitch fabric can span across nodes (MNNVLS), + // so CudaIpc works cross-node → prefer NVSwitch mode. + // SM 9.x (Hopper/H100): NVSwitch is intra-node only, + // CudaIpc cannot map cross-node memory → must use IB for cross-node. + int computeCapabilityMajor = 0; + MSCCLPP_CUDATHROW(cudaDeviceGetAttribute(&computeCapabilityMajor, + cudaDevAttrComputeCapabilityMajor, localGpuIdx)); + INFO(MSCCLPP_COLL, "[alltoallv][rank %d] initialize: worldSize=%d, nRanksPerNode=%d, " - "isMultiNode=%d, isNvlsSupported=%d, ibDevCount=%d, localGpuIdx=%d", - rank, worldSize_, nRanksPerNode, isMultiNode, nvlsSupported, ibDevCount, localGpuIdx); + "isMultiNode=%d, isNvlsSupported=%d, ibDevCount=%d, localGpuIdx=%d, computeCapabilityMajor=%d", + rank, worldSize_, nRanksPerNode, isMultiNode, nvlsSupported, ibDevCount, localGpuIdx, + computeCapabilityMajor); if (!isMultiNode) { multiNodeMode_ = MultiNodeMode::SingleNode; this->conns_ = setupConnections(comm); - } else if (nvlsSupported) { + } else if (nvlsSupported && computeCapabilityMajor >= 10) { + // Blackwell/GB200 (SM 10.x+): NVSwitch fabric spans across nodes (MNNVLS). + // CudaIpc works cross-node → use NVSwitch mode for all peers. multiNodeMode_ = MultiNodeMode::NVSwitch; this->conns_ = setupConnections(comm); - } else { - if (ibDevCount <= 0) { - throw Error("Multi-node alltoallv requires IB transport but no IB devices found. " - "Ensure IB drivers are loaded and devices are available.", - ErrorCode::InvalidUsage); - } + } else if (ibDevCount > 0) { + // Hopper/Ampere (SM 9.x/8.x) or no NVLS: NVSwitch is intra-node only. + // Use IB (PortChannel) for cross-node, CudaIpc for intra-node. multiNodeMode_ = MultiNodeMode::IB; this->conns_ = setupHybridConnections(comm, localGpuIdx); + } else { + throw Error("Multi-node alltoallv requires either IB transport or cross-node NVSwitch (GB200+). " + "On Hopper/Ampere, ensure IB drivers are loaded. On Blackwell, ensure NVSwitch is " + "properly configured.", + ErrorCode::InvalidUsage); } const char* modeStr = (multiNodeMode_ == MultiNodeMode::SingleNode) ? "SingleNode" : @@ -179,12 +186,16 @@ CommResult AlltoallvFullmesh::alltoallvKernelFunc( } if (algoCtx->mode == MultiNodeMode::IB) { - // ── IB mode: PortChannel kernel for ALL peers ────────────────────── - // PortChannel handles both CudaIpc (intra) and IB (inter) connections - // via the ProxyService proxy thread. + // ── IB mode: Hybrid kernel ───────────────────────────────────────── + // MemoryChannel (direct NVLink) for intra-node peers, + // PortChannel (CPU proxy → RDMA) for inter-node peers. int numBlocks = nPeers; - alltoallvPortChannelKernel<<>>( + alltoallvHybridKernel<<>>( + algoCtx->memoryChannelDeviceHandles.get(), algoCtx->portChannelDeviceHandles.get(), + algoCtx->d_peerIsLocal.get(), + algoCtx->d_peerToPortChannelIdx.get(), + algoCtx->deviceSyncer.get(), rank, worldSize, sendBuff, recvBuff, d_sendCounts, d_sendDispls, @@ -294,25 +305,54 @@ std::shared_ptr AlltoallvFullmesh::initAlltoallvContext( ctx->registeredMemories.push_back(outputBufRegMem); } else if (ctx->mode == MultiNodeMode::IB) { - // ── IB: PortChannel for ALL peers (CudaIpc intra + IB inter connections) + // ── IB hybrid: MemoryChannel (intra-node) + PortChannel (inter-node) ── TransportFlags allTransports = Transport::CudaIpc | getIBTransportForGpu(localGpuIdx); RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports); RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports); std::vector remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem); - INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB: input=%p (%zu B), output=%p (%zu B), remotes=%zu", + INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB hybrid: input=%p (%zu B), output=%p (%zu B), remotes=%zu", rank, input, inputSize, output, outputSize, remoteOutputMemories.size()); - for (size_t i = 0; i < remoteOutputMemories.size(); ++i) { - INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB: remoteOutput[%zu] data=%p, size=%zu", - rank, i, remoteOutputMemories[i].data(), remoteOutputMemories[i].size()); - } + // Build peer locality map and per-type channel arrays + int nPeers = ctx->worldSize - 1; + int thisNode = rank / ctx->nRanksPerNode; + std::vector peerIsLocal(nPeers, 0); + std::vector peerToPortChIdx(nPeers, -1); + int portChCount = 0; + for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { + int peer = peerIdx < rank ? peerIdx : peerIdx + 1; + if (peer / ctx->nRanksPerNode == thisNode) { + peerIsLocal[peerIdx] = 1; + } else { + peerToPortChIdx[peerIdx] = portChCount++; + } + } + INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB hybrid: nPeers=%d, localPeers=%d, remotePeers=%d", + rank, nPeers, nPeers - portChCount, portChCount); + + // Copy locality arrays to GPU + ctx->d_peerIsLocal = mscclpp::detail::gpuCallocShared(nPeers); + ctx->d_peerToPortChannelIdx = mscclpp::detail::gpuCallocShared(nPeers); + mscclpp::gpuMemcpy(ctx->d_peerIsLocal.get(), peerIsLocal.data(), nPeers, cudaMemcpyHostToDevice); + mscclpp::gpuMemcpy(ctx->d_peerToPortChannelIdx.get(), peerToPortChIdx.data(), nPeers, cudaMemcpyHostToDevice); + + // MemoryChannel for intra-node CudaIpc connections (direct NVLink put) + constexpr int nChannelsPerConnection = 1; + ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); + ctx->memoryChannels = setupMemoryChannels( + this->conns_, ctx->memorySemaphores, remoteOutputMemories, inputBufRegMem, nChannelsPerConnection); + ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels); + INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB hybrid: %zu memoryChannels (intra-node)", + rank, ctx->memoryChannels.size()); + + // PortChannel for inter-node IB connections only (CPU proxy → RDMA) ctx->proxyService = std::make_shared(); - ctx->portChannels = setupAllPortChannels( + ctx->portChannels = setupPortChannels( ctx->proxyService, *comm, this->conns_, remoteOutputMemories, inputBufRegMem); ctx->portChannelDeviceHandles = setupPortChannelDeviceHandles(ctx->portChannels); ctx->proxyService->startProxy(true); - INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB: %zu portChannels created, proxy started", + INFO(MSCCLPP_COLL, "[alltoallv][rank %d] IB hybrid: %zu portChannels (inter-node), proxy started", rank, ctx->portChannels.size()); ctx->registeredMemories = std::move(remoteOutputMemories); @@ -350,7 +390,5 @@ AlgorithmCtxKey AlltoallvFullmesh::generateAlltoallvContextKey( return {(void*)input, output, inputSize, outputSize, 0}; } -#undef ALLTOALLV_WARP_SIZE - } // namespace collective } // namespace mscclpp diff --git a/src/ext/collectives/collective_utils.cc b/src/ext/collectives/collective_utils.cc index 270223b3..90a3530c 100644 --- a/src/ext/collectives/collective_utils.cc +++ b/src/ext/collectives/collective_utils.cc @@ -4,7 +4,6 @@ #include "collective_utils.hpp" #include -#include #include #include #include @@ -124,26 +123,6 @@ std::vector setupPortChannels( return channels; } -std::vector setupAllPortChannels( - std::shared_ptr proxyService, - mscclpp::Communicator& comm, - const std::vector& connections, - const std::vector& remoteMemories, - mscclpp::RegisteredMemory localMemory) { - std::vector channels; - mscclpp::MemoryId srcMemId = proxyService->addMemory(localMemory); - for (size_t cid = 0; cid < connections.size(); ++cid) { - // Create PortChannel for EVERY connection (CudaIpc and IB alike). - // The ProxyService proxy thread handles both connection types: - // - CudaIpc: cudaMemcpyD2D via IPC-mapped pointer - // - IB: RDMA write via ibv_post_send - mscclpp::SemaphoreId semId = proxyService->buildAndAddSemaphore(comm, connections[cid]); - mscclpp::MemoryId dstMemId = proxyService->addMemory(remoteMemories[cid]); - channels.emplace_back(proxyService->portChannel(semId, dstMemId, srcMemId)); - } - return channels; -} - std::shared_ptr setupPortChannelDeviceHandles( const std::vector& portChannels) { if (portChannels.empty()) return nullptr; diff --git a/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp b/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp index e00773f0..8fffab74 100644 --- a/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp +++ b/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp @@ -11,21 +11,6 @@ namespace mscclpp { namespace collective { -#if defined(__HIP_PLATFORM_AMD__) -#define ALLTOALLV_WARP_SIZE 64 -#else -#define ALLTOALLV_WARP_SIZE 32 -#endif - -// Chunk size for pipelined transfers (1MB) -// Large enough to amortize overhead, small enough for good memory patterns -constexpr size_t ALLTOALLV_CHUNK_SIZE = 1 << 20; - -// Default number of blocks for multi-block kernels. -// Tuned for H100 (132 SMs). Enough to saturate NVLink bandwidth without -// excessive DeviceSyncer overhead. -constexpr int ALLTOALLV_DEFAULT_NBLOCKS = 24; - // Default blocks per peer for the peer-parallel kernel. // Controls how many thread blocks cooperate on each peer's data transfer. constexpr int ALLTOALLV_DEFAULT_BLOCKS_PER_PEER = 16; @@ -239,352 +224,7 @@ __global__ void __launch_bounds__(1024) } } -/** - * Legacy multi-block AllToAllV kernel (sequential peers). - * - * All thread blocks cooperate on each peer's data transfer using global thread IDs. - * Peers are processed sequentially. Kept for comparison; prefer alltoallvPeerParallelKernel. - * - * Launch config: <<>> - */ -__global__ void __launch_bounds__(1024) - alltoallvMultiBlockKernel(DeviceHandle* memoryChannels, - DeviceSyncer* syncer, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - const int gtid = threadIdx.x + blockIdx.x * blockDim.x; - const int nThreads = blockDim.x * gridDim.x; - const int nPeers = worldSize - 1; - // Phase 1: Local copy — all threads across all blocks cooperate - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], gtid, nThreads); - } - // Phase 2: Remote puts — all blocks cooperate on each peer's transfer - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - int chanIdx = peerIdx; - - if (sendCounts[peer] > 0) { - memoryChannels[chanIdx].put( - remoteRecvDispls[peer], - sendDispls[peer], - sendCounts[peer], - gtid, - nThreads - ); - } - } - - // Phase 3: Grid-wide barrier - syncer->sync(gridDim.x); - - // Phase 4: Signal all peers, then wait (single thread) - if (gtid == 0) { - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - memoryChannels[peerIdx].signal(); - } - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - if (recvCounts[peer] > 0) { - memoryChannels[peerIdx].wait(); - } - } - } -} - -/** - * High-performance AllToAllV kernel using maximum thread parallelism. - * - * Processes each peer sequentially but uses ALL block threads (1024) for each - * data transfer to maximize copy bandwidth. This provides much better performance - * than the warp-per-peer approach for large message sizes. - * - * Launch config: <<<1, 1024>>> for maximum bandwidth within a single block. - * - * @param memoryChannels Array of MemoryChannel handles for each peer (worldSize-1 channels) - * @param rank Current rank - * @param worldSize Total number of ranks - * @param sendBuff Source buffer containing data to send - * @param recvBuff Destination buffer for received data - * @param sendCounts Array of send counts for each rank (in bytes) - * @param sendDispls Array of send displacements for each rank (in bytes) - * @param recvCounts Array of receive counts for each rank (in bytes) - * @param recvDispls Array of receive displacements for each rank (in bytes) - */ -__global__ void __launch_bounds__(1024) - alltoallvKernel(DeviceHandle* memoryChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - int tid = threadIdx.x; - int nThreads = blockDim.x; - int nPeers = worldSize - 1; - - // Step 1: Copy local data using ALL threads for maximum bandwidth - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], tid, nThreads); - } - __syncthreads(); - - // Step 2: Process each peer sequentially, but use ALL threads for each transfer - // This maximizes bandwidth for each transfer compared to warp-per-peer approach - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - int chanIdx = peerIdx; - - if (sendCounts[peer] > 0) { - // Use all threads for maximum copy throughput - memoryChannels[chanIdx].put( - remoteRecvDispls[peer], // dst offset in peer's buffer (peer's recvDispls[rank]) - sendDispls[peer], // src offset in our buffer - sendCounts[peer], // size - tid, // thread id - nThreads // total threads - ); - } - __syncthreads(); - - // Only one thread signals per peer - if (tid == 0) { - memoryChannels[chanIdx].signal(); - } - __syncthreads(); - - // Wait for incoming data from this peer - if (tid == 0 && recvCounts[peer] > 0) { - memoryChannels[chanIdx].wait(); - } - __syncthreads(); - } -} - -/** - * Pipelined AllToAllV kernel for imbalanced workloads. - * - * For large messages, breaks transfers into chunks to improve memory access - * patterns, but avoids excessive signaling overhead by signaling only once - * per peer after all chunks are sent. - * - * Optimized for MoE workloads where message sizes can vary by 100x+ between ranks. - * - * Launch config: <<<1, 1024>>> - */ -__global__ void __launch_bounds__(1024) - alltoallvPipelinedKernel(DeviceHandle* memoryChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - int tid = threadIdx.x; - int nThreads = blockDim.x; - int nPeers = worldSize - 1; - - // Step 1: Copy local data - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], tid, nThreads); - } - __syncthreads(); - - // Step 2: Process each peer - send all data in chunks, then signal once - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - int chanIdx = peerIdx; - - size_t sendSize = sendCounts[peer]; - size_t recvSize = recvCounts[peer]; - size_t dstOffset = remoteRecvDispls[peer]; // peer's recvDispls[rank] - size_t srcOffset = sendDispls[peer]; - - // Send data in chunks for better memory access patterns - // But only signal ONCE after all chunks are sent (avoids signaling overhead) - if (sendSize > 0) { - for (size_t offset = 0; offset < sendSize; offset += ALLTOALLV_CHUNK_SIZE) { - size_t chunkSize = (sendSize - offset < ALLTOALLV_CHUNK_SIZE) - ? (sendSize - offset) : ALLTOALLV_CHUNK_SIZE; - memoryChannels[chanIdx].put( - dstOffset + offset, - srcOffset + offset, - chunkSize, - tid, - nThreads - ); - __syncthreads(); - } - } - - // Signal ONCE after all data is sent - if (tid == 0 && sendSize > 0) { - memoryChannels[chanIdx].signal(); - } - __syncthreads(); - - // Wait ONCE for all peer's data - if (tid == 0 && recvSize > 0) { - memoryChannels[chanIdx].wait(); - } - __syncthreads(); - } -} - -/** - * Ring-based AllToAllV kernel with maximum thread parallelism. - * - * Uses step-by-step ring pattern with ALL threads for maximum bandwidth. - * Each step processes one peer pair, with correct semaphore handling. - */ -__global__ void __launch_bounds__(1024) - alltoallvRingKernel(DeviceHandle* memoryChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - int tid = threadIdx.x; - int nThreads = blockDim.x; - - // Copy local data first using ALL threads - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], tid, nThreads); - } - __syncthreads(); - - // Ring-based exchange - process each peer sequentially - // Key fix: use the SAME channel for both signal and wait (peer-pair symmetry) - for (int step = 1; step < worldSize; step++) { - int sendPeer = (rank + step) % worldSize; - int chanIdx = sendPeer < rank ? sendPeer : sendPeer - 1; - - // Send data to sendPeer using ALL threads - if (sendCounts[sendPeer] > 0) { - memoryChannels[chanIdx].put( - remoteRecvDispls[sendPeer], // dst offset in peer's buffer (peer's recvDispls[rank]) - sendDispls[sendPeer], - sendCounts[sendPeer], - tid, - nThreads - ); - } - __syncthreads(); - - // Signal completion on the SAME channel we'll wait on - if (tid == 0) { - memoryChannels[chanIdx].signal(); - } - __syncthreads(); - - // Wait for peer's data on the SAME channel (correct semaphore pairing) - if (tid == 0 && recvCounts[sendPeer] > 0) { - memoryChannels[chanIdx].wait(); - } - __syncthreads(); - } -} - -/** - * PortChannel-only AllToAllV kernel for multi-node. - * - * Uses PortChannel (proxy-based) for ALL peers — both intra-node and inter-node. - * This follows the proven pattern from allgather_test_cpp.cu which works reliably - * on GB200 multi-node NVSwitch systems. - * - * For intra-node CudaIpc connections, the proxy performs cudaMemcpyD2D. - * For inter-node IB connections, the proxy performs RDMA writes. - * - * Each block handles one peer. Thread 0 pushes a put descriptor to the FIFO - * (single-threaded), which triggers the proxy to perform the data transfer. - * - * Launch config: <<>> - */ -__global__ void __launch_bounds__(1024) - alltoallvPortChannelKernel(PortChannelDeviceHandle* portChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - const int nPeers = worldSize - 1; - - // Handle trivial case (single rank) - if (nPeers == 0) { - const int gtid = threadIdx.x + blockIdx.x * blockDim.x; - const int nThreads = blockDim.x * gridDim.x; - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], gtid, nThreads); - } - return; - } - - // Phase 1: Local copy — all blocks cooperate using global thread IDs - const int gtid = threadIdx.x + blockIdx.x * blockDim.x; - const int nThreads = blockDim.x * gridDim.x; - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], gtid, nThreads); - } - - // Phase 2: Per-peer data transfer via PortChannel (proxy-based). - // Each block handles one peer: blockIdx.x == peerIdx. - const int peerIdx = blockIdx.x; - if (peerIdx >= nPeers) return; - - const int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - - // Thread 0 pushes a put+signal+flush descriptor to the proxy FIFO. - // The proxy thread performs the actual data transfer (cudaMemcpy or RDMA). - if (threadIdx.x == 0 && sendCounts[peer] > 0) { - portChannels[peerIdx].putWithSignalAndFlush( - remoteRecvDispls[peer], // dst offset in peer's output buffer - sendDispls[peer], // src offset in our input buffer - sendCounts[peer] // bytes to transfer - ); - } - __syncthreads(); - - // Wait for incoming data from this peer - if (threadIdx.x == 0 && recvCounts[peer] > 0) { - portChannels[peerIdx].wait(); - } -} - -#undef ALLTOALLV_WARP_SIZE } // namespace collective } // namespace mscclpp \ No newline at end of file diff --git a/src/ext/collectives/include/collective_utils.hpp b/src/ext/collectives/include/collective_utils.hpp index 02c85096..97497eea 100644 --- a/src/ext/collectives/include/collective_utils.hpp +++ b/src/ext/collectives/include/collective_utils.hpp @@ -51,12 +51,6 @@ std::vector setupConnections(std::shared_ptr comm); /// @return Vector of connections (one per peer) std::vector setupHybridConnections(std::shared_ptr comm, int localGpuIdx); -/// Check if a connection is intra-node (CudaIpc transport). -/// @param conn The connection to check -/// @return true if the connection uses CudaIpc transport -inline bool isIntraNodeConnection(const Connection& conn) { - return conn.transport() == Transport::CudaIpc; -} /// Get the IB transport for a given local GPU index. /// @param localGpuIdx Local GPU index (0-7) @@ -82,20 +76,6 @@ std::vector setupPortChannels( /// This follows the proven pattern from allgather_test_cpp.cu: /// - CudaIpc connections: proxy does cudaMemcpyD2D /// - IB connections: proxy does RDMA write -/// Creates one PortChannel per peer (dense indexing by peerIdx). -/// @param proxyService The ProxyService managing transfers -/// @param comm The communicator -/// @param connections All connections (mixed CudaIpc + IB) -/// @param remoteMemories Remote registered memories (one per peer) -/// @param localMemory Local registered memory -/// @return Vector of PortChannels (one per peer, in connection order) -std::vector setupAllPortChannels( - std::shared_ptr proxyService, - Communicator& comm, - const std::vector& connections, - const std::vector& remoteMemories, - RegisteredMemory localMemory); - /// Setup PortChannel device handles (GPU-allocated array). std::shared_ptr setupPortChannelDeviceHandles( const std::vector& portChannels); diff --git a/test/mscclpp-test/alltoallv_test.cu b/test/mscclpp-test/alltoallv_test.cu index a813e703..bebfcf53 100644 --- a/test/mscclpp-test/alltoallv_test.cu +++ b/test/mscclpp-test/alltoallv_test.cu @@ -65,44 +65,6 @@ void AllToAllVTestColl::runColl(const TestArgs& args, cudaStream_t stream) { const int nThreads = 1024; if (kernelNum == 0) { - // Use high-throughput kernel with all threads participating in each transfer - mscclpp::collective::alltoallvKernel<<<1, nThreads, 0, stream>>>( - d_memoryChannels, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 1) { - // Use ring-based kernel for larger world sizes - mscclpp::collective::alltoallvRingKernel<<<1, nThreads, 0, stream>>>( - d_memoryChannels, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 2) { - // Use pipelined kernel for imbalanced workloads (MoE) - mscclpp::collective::alltoallvPipelinedKernel<<<1, nThreads, 0, stream>>>( - d_memoryChannels, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 3) { - // Use legacy multi-block kernel (sequential peers) - const int nBlocks = mscclpp::collective::ALLTOALLV_DEFAULT_NBLOCKS; - mscclpp::collective::alltoallvMultiBlockKernel<<>>( - d_memoryChannels, - d_deviceSyncer, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 4) { // Peer-parallel kernel: small messages (1 block/peer, no barrier) const int nPeers = worldSize - 1; const int nBlocks = (nPeers > 0) ? nPeers : 1; @@ -114,7 +76,7 @@ void AllToAllVTestColl::runColl(const TestArgs& args, cudaStream_t stream) { d_sendCounts, d_sendDispls, d_recvCounts, d_recvDispls, d_remoteRecvDispls); - } else if (kernelNum == 5) { + } else if (kernelNum == 1) { // Peer-parallel kernel: large messages (multiple blocks/peer, barrier) const int nPeers = worldSize - 1; const int blocksPerPeer = mscclpp::collective::ALLTOALLV_DEFAULT_BLOCKS_PER_PEER; @@ -220,12 +182,8 @@ void AllToAllVTestColl::setupCollTest(size_t size) { std::vector AllToAllVTestColl::getKernelRestrictions() { return { - {0, "alltoallvKernel", true, 1, 4 * worldSize_}, - {1, "alltoallvRingKernel", true, 1, 4 * worldSize_}, - {2, "alltoallvPipelinedKernel", true, 1, 4 * worldSize_}, - {3, "alltoallvMultiBlockKernel", true, 1, 4 * worldSize_}, - {4, "alltoallvPeerParallel(small)", true, 1, 4 * worldSize_}, - {5, "alltoallvPeerParallel(large)", true, 1, 4 * worldSize_} + {0, "alltoallvPeerParallel(small)", true, 1, 4 * worldSize_}, + {1, "alltoallvPeerParallel(large)", true, 1, 4 * worldSize_} }; }