mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 01:10:22 +00:00
Update multinode mode selection logic for IB and NVSwitch; Add tests of EP equivalent workloads
This commit is contained in:
@@ -239,6 +239,8 @@ class MscclppAlltoAllV:
|
||||
# Fast path: skip GPU copies + bootstrap exchange if split sizes unchanged
|
||||
splits_key = (tuple(send_counts_bytes), tuple(recv_counts_bytes))
|
||||
if splits_key != self._cached_splits_key:
|
||||
import sys as _sys
|
||||
print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True)
|
||||
# Clear cached contexts to free RegisteredMemory for old (possibly freed) tensors.
|
||||
# Without this, stale CUDA IPC handles accumulate and eventually SIGSEGV.
|
||||
if hasattr(self._algo, 'reset'):
|
||||
@@ -250,7 +252,9 @@ class MscclppAlltoAllV:
|
||||
self._d_recv_displs.copy_(torch.tensor(recv_displs_bytes, dtype=torch.int64))
|
||||
|
||||
# Exchange recv displacements with peers via bootstrap
|
||||
print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True)
|
||||
remote_recv_displs = self._exchange_recv_displs(recv_displs_bytes)
|
||||
print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True)
|
||||
self._d_remote_recv_displs.copy_(torch.tensor(remote_recv_displs, dtype=torch.int64))
|
||||
|
||||
# Cache for subsequent calls
|
||||
@@ -258,6 +262,15 @@ class MscclppAlltoAllV:
|
||||
self._cached_input_size = sum(send_counts_bytes)
|
||||
self._cached_output_size = sum(recv_counts_bytes)
|
||||
|
||||
# Barrier: all ranks must finish the displacement exchange before any
|
||||
# rank enters algo.execute() → initialize(), which does its own
|
||||
# bootstrap operations (comm->connect, setupRemoteMemories).
|
||||
# Without this barrier, fast ranks' bootstrap messages from
|
||||
# initialize() can collide with slow ranks still in _exchange_recv_displs.
|
||||
print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True)
|
||||
self._comm.bootstrap().barrier()
|
||||
print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True)
|
||||
|
||||
# Get stream
|
||||
if stream is None:
|
||||
stream = torch.cuda.current_stream()
|
||||
@@ -275,6 +288,21 @@ class MscclppAlltoAllV:
|
||||
output_alloc_size = output.nelement() * output.element_size()
|
||||
|
||||
# Execute the optimized kernel
|
||||
import sys as _sys
|
||||
# Clear any stale CUDA errors before executing (the C++ code checks
|
||||
# cudaGetLastError() after the kernel and returns INTERNAL_ERROR if any
|
||||
# previous error was pending).
|
||||
torch.cuda.synchronize()
|
||||
# Also clear the CUDA error state via cudaGetLastError (consumes the error)
|
||||
import ctypes
|
||||
try:
|
||||
_cudart = ctypes.CDLL("libcudart.so")
|
||||
_last_err = _cudart.cudaGetLastError()
|
||||
if _last_err != 0:
|
||||
print(f" [rank {self._rank}] WARNING: cleared stale CUDA error code {_last_err} before execute", flush=True)
|
||||
except Exception:
|
||||
pass
|
||||
print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True)
|
||||
result = self._algo.execute(
|
||||
self._comm,
|
||||
input.data_ptr(),
|
||||
@@ -289,8 +317,15 @@ class MscclppAlltoAllV:
|
||||
0, # nthreads_per_block (auto)
|
||||
self._extras,
|
||||
)
|
||||
print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True)
|
||||
|
||||
if result != 0:
|
||||
from mscclpp._mscclpp import CommResult
|
||||
if result != CommResult.COMM_SUCCESS:
|
||||
# Get detailed CUDA error before raising
|
||||
try:
|
||||
torch.cuda.synchronize()
|
||||
except Exception as cuda_err:
|
||||
raise RuntimeError(f"alltoallv execution failed with code {result}; CUDA error: {cuda_err}")
|
||||
raise RuntimeError(f"alltoallv execution failed with code {result}")
|
||||
|
||||
return output
|
||||
|
||||
@@ -74,6 +74,9 @@ def _tcp_broadcast_unique_id(unique_id_bytes: bytes, rank: int, world_size: int,
|
||||
|
||||
|
||||
def main():
|
||||
# Do NOT set CUDA_LAUNCH_BLOCKING=1 — it prevents the proxy thread from
|
||||
# delivering IB data while the kernel is running (deadlock).
|
||||
|
||||
# Get rank/world from MPI environment
|
||||
rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", os.environ.get("PMI_RANK", 0)))
|
||||
world_size = int(os.environ.get("OMPI_COMM_WORLD_SIZE", os.environ.get("PMI_SIZE", 1)))
|
||||
@@ -197,10 +200,44 @@ def main():
|
||||
device='cuda'
|
||||
)
|
||||
|
||||
output = alltoallv.all_to_all_single(input_data)
|
||||
|
||||
# ── DEBUG: print tensor sizes before all_to_all_single ──
|
||||
print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, "
|
||||
f"dtype={input_data.dtype}, device={input_data.device}, "
|
||||
f"storage_size={input_data.untyped_storage().size()}, "
|
||||
f"data_ptr=0x{input_data.data_ptr():x}")
|
||||
print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, "
|
||||
f"expected_total_elems={world_size * chunk_size}, "
|
||||
f"scratch_buffer_size={alltoallv._scratch_size}")
|
||||
sys.stdout.flush()
|
||||
dist.barrier()
|
||||
|
||||
try:
|
||||
output = alltoallv.all_to_all_single(input_data)
|
||||
except Exception as e:
|
||||
print(f" [rank {rank}] all_to_all_single RAISED: {e}")
|
||||
# Try to get the actual CUDA error
|
||||
try:
|
||||
torch.cuda.synchronize()
|
||||
except Exception as e2:
|
||||
print(f" [rank {rank}] CUDA error after all_to_all_single: {e2}")
|
||||
sys.stdout.flush()
|
||||
raise
|
||||
|
||||
# ── DEBUG: print output tensor sizes ──
|
||||
print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, "
|
||||
f"dtype={output.dtype}, device={output.device}, "
|
||||
f"storage_size={output.untyped_storage().size()}, "
|
||||
f"data_ptr=0x{output.data_ptr():x}")
|
||||
sys.stdout.flush()
|
||||
|
||||
# Verify: each chunk should come from different ranks
|
||||
torch.cuda.synchronize()
|
||||
try:
|
||||
torch.cuda.synchronize()
|
||||
except Exception as e:
|
||||
print(f" [rank {rank}] cuda.synchronize FAILED: {e}")
|
||||
sys.stdout.flush()
|
||||
raise
|
||||
|
||||
expected_total = sum(r * world_size * chunk_size for r in range(world_size))
|
||||
actual_total = output[:chunk_size].sum().item() # Just check first chunk is from rank 0
|
||||
expected = 0 * world_size * chunk_size + sum(range(chunk_size))
|
||||
@@ -316,6 +353,14 @@ def main():
|
||||
return f"{nbytes // 1024}KB"
|
||||
return f"{nbytes}B"
|
||||
|
||||
def fmt_size_decimal(nbytes: int) -> str:
|
||||
"""Format size using decimal MB (÷1000000) to match NCCL EP reporting."""
|
||||
if nbytes >= 1000000:
|
||||
return f"{nbytes / 1000000:.2f}MB"
|
||||
elif nbytes >= 1000:
|
||||
return f"{nbytes / 1000:.1f}KB"
|
||||
return f"{nbytes}B"
|
||||
|
||||
def print_header():
|
||||
if rank == 0:
|
||||
if use_torch_baseline:
|
||||
@@ -491,6 +536,240 @@ def main():
|
||||
if rank == 0:
|
||||
print("\n[Test 4] Skipped (real MoE workloads require exactly 8 ranks)")
|
||||
|
||||
# ── Test 5: NCCL EP Low-Latency equivalent workload ──────────────────
|
||||
# Detect if torch baseline is available for Tests 5 & 6
|
||||
use_torch_baseline = True
|
||||
try:
|
||||
tiny_in = torch.zeros(world_size, dtype=torch.float32, device='cuda')
|
||||
tiny_out = torch.zeros(world_size, dtype=torch.float32, device='cuda')
|
||||
dist.all_to_all_single(tiny_out, tiny_in)
|
||||
except Exception:
|
||||
use_torch_baseline = False
|
||||
if rank == 0:
|
||||
print(" [INFO] torch all_to_all_single unavailable, skipping torch baseline in Tests 5/6")
|
||||
|
||||
# Matches the data volume of:
|
||||
# mpirun -np N ep_bench -a ll -t 128 -d 7168
|
||||
#
|
||||
# ep_bench LL config: 128 tokens/rank, 256 experts, top_k=8,
|
||||
# hidden=7168, bf16.
|
||||
# Target byte counts: dispatch=14.55 MB, combine=14.55 MB, selections=1015
|
||||
#
|
||||
# Expert assignment: for each token, generate 256 scores = abs(N(0,1))+1,
|
||||
# pick top-8 expert indices. Then mask 9 random (token,k) slots with -1
|
||||
# to get exactly 1015 valid selections (128*8 - 9 = 1015).
|
||||
# Seed: mt19937(1 + rank).
|
||||
|
||||
LL_NUM_TOKENS = 128 # tokens per rank
|
||||
LL_NUM_EXPERTS = 256
|
||||
LL_TOP_K = 8
|
||||
LL_HIDDEN = 7168 # bf16 elements per token
|
||||
LL_NUM_MASKED = 9 # 128*8 - 9 = 1015 valid selections
|
||||
|
||||
if world_size >= 2:
|
||||
num_local_experts = LL_NUM_EXPERTS // world_size
|
||||
|
||||
# Replicate LL expert assignment with numpy mt19937
|
||||
import numpy as np
|
||||
rng = np.random.RandomState(1 + rank)
|
||||
|
||||
# For each token: generate 256 scores, pick top-8 expert indices
|
||||
topk_idx = np.zeros((LL_NUM_TOKENS, LL_TOP_K), dtype=np.int64)
|
||||
for i in range(LL_NUM_TOKENS):
|
||||
scores = np.abs(rng.randn(LL_NUM_EXPERTS)) + 1.0
|
||||
top_experts = np.argpartition(scores, -LL_TOP_K)[-LL_TOP_K:]
|
||||
topk_idx[i] = top_experts
|
||||
|
||||
# Mask ~10 random positions with -1
|
||||
for _ in range(LL_NUM_MASKED):
|
||||
ti = rng.randint(0, LL_NUM_TOKENS)
|
||||
ki = rng.randint(0, LL_TOP_K)
|
||||
topk_idx[ti, ki] = -1
|
||||
|
||||
# Count tokens sent from this rank to each target rank
|
||||
send_counts = [0] * world_size
|
||||
for i in range(LL_NUM_TOKENS):
|
||||
target_ranks_seen = set()
|
||||
for k in range(LL_TOP_K):
|
||||
eid = topk_idx[i, k]
|
||||
if eid >= 0:
|
||||
target_rank = int(eid) // num_local_experts
|
||||
target_ranks_seen.add(target_rank)
|
||||
for tr in target_ranks_seen:
|
||||
send_counts[tr] += 1
|
||||
|
||||
# Normalize send_counts so each rank sends exactly TARGET_SELECTIONS
|
||||
# tokens total, matching ep_bench's reported selections=1015.
|
||||
# This ensures total_send_bytes = 1015 × 7168 × 2 = 14,551,040 bytes.
|
||||
TARGET_SELECTIONS = 1015
|
||||
raw_total = sum(send_counts)
|
||||
if raw_total > 0:
|
||||
# Scale proportionally, then fix rounding to hit exact target
|
||||
scaled = [int(c * TARGET_SELECTIONS / raw_total) for c in send_counts]
|
||||
remainder = TARGET_SELECTIONS - sum(scaled)
|
||||
# Distribute remainder to largest buckets first
|
||||
indices = sorted(range(world_size), key=lambda i: send_counts[i], reverse=True)
|
||||
for i in range(remainder):
|
||||
scaled[indices[i % world_size]] += 1
|
||||
send_counts = scaled
|
||||
|
||||
# Gather 8×8 send matrix
|
||||
send_tensor = torch.tensor(send_counts, dtype=torch.int32, device='cuda')
|
||||
all_sends = [torch.zeros(world_size, dtype=torch.int32, device='cuda')
|
||||
for _ in range(world_size)]
|
||||
dist.all_gather(all_sends, send_tensor)
|
||||
send_matrix = [t.cpu().tolist() for t in all_sends]
|
||||
|
||||
in_splits_tokens = send_matrix[rank]
|
||||
out_splits_tokens = [send_matrix[j][rank] for j in range(world_size)]
|
||||
|
||||
in_splits = [t * LL_HIDDEN for t in in_splits_tokens]
|
||||
out_splits = [t * LL_HIDDEN for t in out_splits_tokens]
|
||||
|
||||
total_send_tokens = sum(in_splits_tokens)
|
||||
total_recv_tokens = sum(out_splits_tokens)
|
||||
total_send_bytes = sum(in_splits) * 2
|
||||
total_recv_bytes = sum(out_splits) * 2
|
||||
|
||||
if rank == 0:
|
||||
print(f"\n[Test 5] NCCL EP LL-equivalent workload "
|
||||
f"(tokens={LL_NUM_TOKENS}, experts={LL_NUM_EXPERTS}, "
|
||||
f"top_k={LL_TOP_K}, hidden={LL_HIDDEN}, bf16, {world_size} ranks)")
|
||||
print(f" Rank 0 send tokens: {in_splits_tokens} (total {total_send_tokens})")
|
||||
print(f" Rank 0 recv tokens: {out_splits_tokens} (total {total_recv_tokens})")
|
||||
print(f" Send {total_send_bytes / 1e6:.2f}MB, "
|
||||
f"Recv {total_recv_bytes / 1e6:.2f}MB")
|
||||
print(f" Target: dispatch=14.55 MB, selections=1015")
|
||||
max_out = max(out_splits_tokens)
|
||||
min_out = min(out_splits_tokens)
|
||||
print(f" Recv imbalance: {max_out/min_out:.2f}x "
|
||||
f"(min={min_out}, max={max_out})")
|
||||
print_header()
|
||||
|
||||
inp = torch.randn(sum(in_splits), dtype=torch.bfloat16, device='cuda')
|
||||
out = torch.empty(sum(out_splits), dtype=torch.bfloat16, device='cuda')
|
||||
|
||||
n_warmup, n_iters = 10, 50
|
||||
|
||||
m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters)
|
||||
if use_torch_baseline:
|
||||
t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters)
|
||||
print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw, t_lat, t_bw)
|
||||
else:
|
||||
print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw)
|
||||
else:
|
||||
if rank == 0:
|
||||
print("\n[Test 5] Skipped (NCCL EP LL-equivalent requires >= 2 ranks)")
|
||||
|
||||
# ── Test 6: NCCL EP High-Throughput equivalent workload ──────────────
|
||||
# Matches the data volume of:
|
||||
# mpirun -np N ep_bench -a ht -t 4096 -d 7168
|
||||
#
|
||||
# Target byte counts (per rank avg, 8 GPUs):
|
||||
# RDMA_send = 58.72 MB (4096 tokens × 7168 × 2 bytes)
|
||||
# total_recv = 469.76 MB (32768 tokens = 8 peers × 4096 tokens each)
|
||||
#
|
||||
# ep_bench config: 4096 tokens/rank, 256 experts, top_k=8,
|
||||
# hidden=7168, bf16. Each token is dispatched to top_k=8 experts,
|
||||
# so each rank receives ~4096 token-expert pairs from each peer.
|
||||
#
|
||||
# We replicate the ep_bench expert assignment logic:
|
||||
# srand(rank + 42), for each of 4096 tokens pick a random first_expert
|
||||
# in [0, num_experts), then assign top_k=8 consecutive experts.
|
||||
# target_rank = expert_id // num_local_experts.
|
||||
|
||||
EP_NUM_TOKENS = 4096 # tokens per rank (input)
|
||||
EP_NUM_EXPERTS = 256
|
||||
EP_TOP_K = 8
|
||||
EP_HIDDEN = 7168 # bf16 elements per token
|
||||
|
||||
if world_size >= 2:
|
||||
num_local_experts = EP_NUM_EXPERTS // world_size
|
||||
|
||||
# Use C's srand/rand to replicate ep_bench's exact token distribution
|
||||
import ctypes
|
||||
libc = ctypes.CDLL("libc.so.6")
|
||||
libc.srand(rank + 42)
|
||||
|
||||
# Count tokens sent from this rank to each target rank.
|
||||
# ep_bench dispatches each token to all ranks hosting its top_k experts.
|
||||
# A token with experts spanning 2 ranks sends a copy to each.
|
||||
send_counts = [0] * world_size
|
||||
for i in range(EP_NUM_TOKENS):
|
||||
first_expert = libc.rand() % EP_NUM_EXPERTS
|
||||
target_ranks_seen = set()
|
||||
for k in range(EP_TOP_K):
|
||||
expert_id = (first_expert + k) % EP_NUM_EXPERTS
|
||||
target_rank = expert_id // num_local_experts
|
||||
target_ranks_seen.add(target_rank)
|
||||
for tr in target_ranks_seen:
|
||||
send_counts[tr] += 1
|
||||
|
||||
# Normalize send_counts so each rank sends exactly EP_NUM_TOKENS
|
||||
# tokens total, ensuring total_send_bytes = 4096 × 7168 × 2 = 58,720,256 bytes.
|
||||
TARGET_SEND_TOKENS = EP_NUM_TOKENS # 4096
|
||||
raw_total = sum(send_counts)
|
||||
if raw_total > 0 and raw_total != TARGET_SEND_TOKENS:
|
||||
scaled = [int(c * TARGET_SEND_TOKENS / raw_total) for c in send_counts]
|
||||
remainder = TARGET_SEND_TOKENS - sum(scaled)
|
||||
indices = sorted(range(world_size), key=lambda i: send_counts[i], reverse=True)
|
||||
for i in range(abs(remainder)):
|
||||
if remainder > 0:
|
||||
scaled[indices[i % world_size]] += 1
|
||||
else:
|
||||
scaled[indices[i % world_size]] -= 1
|
||||
send_counts = scaled
|
||||
|
||||
# Gather 8×8 send matrix via allgather
|
||||
send_tensor = torch.tensor(send_counts, dtype=torch.int32, device='cuda')
|
||||
all_sends = [torch.zeros(world_size, dtype=torch.int32, device='cuda')
|
||||
for _ in range(world_size)]
|
||||
dist.all_gather(all_sends, send_tensor)
|
||||
send_matrix = [t.cpu().tolist() for t in all_sends]
|
||||
|
||||
in_splits_tokens = send_matrix[rank]
|
||||
out_splits_tokens = [send_matrix[j][rank] for j in range(world_size)]
|
||||
|
||||
# Convert tokens to bf16 elements
|
||||
in_splits = [t * EP_HIDDEN for t in in_splits_tokens]
|
||||
out_splits = [t * EP_HIDDEN for t in out_splits_tokens]
|
||||
|
||||
total_send_tokens = sum(in_splits_tokens)
|
||||
total_recv_tokens = sum(out_splits_tokens)
|
||||
total_send_bytes = sum(in_splits) * 2
|
||||
total_recv_bytes = sum(out_splits) * 2
|
||||
|
||||
if rank == 0:
|
||||
print(f"\n[Test 6] NCCL EP HT-equivalent workload "
|
||||
f"(tokens={EP_NUM_TOKENS}, experts={EP_NUM_EXPERTS}, "
|
||||
f"top_k={EP_TOP_K}, hidden={EP_HIDDEN}, bf16, {world_size} ranks)")
|
||||
print(f" Rank 0 send tokens: {in_splits_tokens} (total {total_send_tokens})")
|
||||
print(f" Rank 0 recv tokens: {out_splits_tokens} (total {total_recv_tokens})")
|
||||
print(f" Send {total_send_bytes / 1e6:.2f}MB, "
|
||||
f"Recv {total_recv_bytes / 1e6:.2f}MB")
|
||||
print(f" Target: RDMA_send=58.72 MB, total_recv=469.76 MB (8 GPUs)")
|
||||
# Show imbalance
|
||||
max_out = max(out_splits_tokens)
|
||||
min_out = min(out_splits_tokens)
|
||||
print(f" Recv imbalance: {max_out/min_out:.2f}x "
|
||||
f"(min={min_out}, max={max_out})")
|
||||
print_header()
|
||||
|
||||
inp = torch.randn(sum(in_splits), dtype=torch.bfloat16, device='cuda')
|
||||
out = torch.empty(sum(out_splits), dtype=torch.bfloat16, device='cuda')
|
||||
|
||||
n_warmup, n_iters = 10, 50 # match ep_bench defaults
|
||||
|
||||
m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters)
|
||||
if use_torch_baseline:
|
||||
t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters)
|
||||
print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw, t_lat, t_bw)
|
||||
else:
|
||||
print_row(fmt_size_decimal(total_send_bytes), m_lat, m_bw)
|
||||
else:
|
||||
if rank == 0:
|
||||
print("\n[Test 6] Skipped (NCCL EP HT-equivalent requires >= 2 ranks)")
|
||||
|
||||
# Cleanup
|
||||
dist.barrier()
|
||||
if rank == 0:
|
||||
|
||||
Reference in New Issue
Block a user