Support cross-node CudaIPC

This commit is contained in:
Qinghua Zhou
2026-03-21 10:41:32 +00:00
parent 9ef1fb7cee
commit 7e1cb7b8cf
5 changed files with 100 additions and 19 deletions

View File

@@ -62,6 +62,7 @@ void register_algorithm(nb::module_& m) {
.def_prop_ro("buffer_mode", &Algorithm::bufferMode)
.def_prop_ro("constraint", &Algorithm::constraint)
.def_prop_ro("type", &Algorithm::type)
.def("reset", &Algorithm::reset)
.def(
"execute",
[](Algorithm& self, std::shared_ptr<Communicator> comm, uintptr_t input, uintptr_t output,

View File

@@ -239,6 +239,10 @@ class MscclppAlltoAllV:
# Fast path: skip GPU copies + bootstrap exchange if split sizes unchanged
splits_key = (tuple(send_counts_bytes), tuple(recv_counts_bytes))
if splits_key != self._cached_splits_key:
# Clear cached contexts to free RegisteredMemory for old (possibly freed) tensors.
# Without this, stale CUDA IPC handles accumulate and eventually SIGSEGV.
if hasattr(self._algo, 'reset'):
self._algo.reset()
# Copy counts/displacements to GPU
self._d_send_counts.copy_(torch.tensor(send_counts_bytes, dtype=torch.int64))
self._d_send_displs.copy_(torch.tensor(send_displs_bytes, dtype=torch.int64))
@@ -259,16 +263,24 @@ class MscclppAlltoAllV:
stream = torch.cuda.current_stream()
cuda_stream = stream.cuda_stream
input_size = self._cached_input_size
output_size = self._cached_output_size
# Use the full underlying storage size (not just the view's active data)
# for the context key, so that reusing views of the same tensor with
# different split sizes doesn't create new contexts (which leak
# RegisteredMemory for stale buffers).
try:
input_alloc_size = input.untyped_storage().size()
output_alloc_size = output.untyped_storage().size()
except Exception:
input_alloc_size = input.nelement() * input.element_size()
output_alloc_size = output.nelement() * output.element_size()
# Execute the optimized kernel
result = self._algo.execute(
self._comm,
input.data_ptr(),
output.data_ptr(),
input_size,
output_size,
input_alloc_size,
output_alloc_size,
_torch_dtype_to_mscclpp(dtype),
ReduceOp.NOP,
cuda_stream,

View File

@@ -92,19 +92,31 @@ def main():
# otherwise gloo avoids IB configuration issues on some clusters.
# Set ALLTOALLV_BACKEND=nccl to enable torch baseline comparison.
backend = os.environ.get("ALLTOALLV_BACKEND", "gloo")
# For multi-node: detect a routable IP instead of 127.0.0.1
# For multi-node: MASTER_ADDR must be set to rank 0's routable IP.
# Single-node auto-detects; multi-node requires it from the launcher.
if "MASTER_ADDR" not in os.environ:
if rank == 0:
os.environ["MASTER_ADDR"] = _get_routable_ip()
else:
# Non-zero ranks: MASTER_ADDR must be set externally for multi-node
os.environ["MASTER_ADDR"] = "127.0.0.1"
# Check if we're single-node (all ranks on same host)
n_gpus = torch.cuda.device_count()
if world_size <= n_gpus:
# Likely single-node 127.0.0.1 works
os.environ["MASTER_ADDR"] = "127.0.0.1"
else:
raise RuntimeError(
f"Rank {rank}: MASTER_ADDR not set for multi-node run "
f"(world_size={world_size} > local GPUs={n_gpus}). "
f"Set it in your launcher, e.g.:\n"
f" mpirun -x MASTER_ADDR=<node0_ip> -x MASTER_PORT=29500 ..."
)
os.environ.setdefault("MASTER_PORT", "29500")
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)
if backend == "nccl":
dist.init_process_group(backend="nccl", rank=rank, world_size=world_size,
device_id=torch.device(f"cuda:{local_rank}"))
# Don't use device_id= eager init — it triggers an immediate NCCL allreduce
# that fails on some platforms (e.g. GB200 with NCCL 2.28.9).
dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
else:
dist.init_process_group(backend=backend, rank=rank, world_size=world_size)
@@ -341,6 +353,13 @@ def main():
msg_sizes = [1 << s for s in range(10, 28) if s % 2 == 0]
msg_sizes.append(128 * 1024 * 1024)
# Pre-compute max split sizes across all sweep iterations to allocate
# fixed-size tensors. Reusing the same tensors keeps the NativeAlgorithm
# context key stable (same ptrs + sizes) and avoids the context cache
# leak that causes SIGSEGV when stale RegisteredMemory accumulates.
max_in_elems = 0
max_out_elems = 0
sweep_params = [] # (avg_msg_size, in_splits, out_splits)
for avg_msg_size in msg_sizes:
random.seed(12345)
avg_elems = avg_msg_size // 4
@@ -348,19 +367,27 @@ def main():
for i in range(world_size):
row = [max(1, int(avg_elems * (0.5 + random.random()))) for _ in range(world_size)]
send_matrix.append(row)
in_splits = send_matrix[rank]
out_splits = [send_matrix[j][rank] for j in range(world_size)]
max_in_elems = max(max_in_elems, sum(in_splits))
max_out_elems = max(max_out_elems, sum(out_splits))
sweep_params.append((avg_msg_size, in_splits, out_splits))
inp = torch.randn(sum(in_splits), dtype=torch.float32, device='cuda')
out = torch.empty(sum(out_splits), dtype=torch.float32, device='cuda')
# Allocate once at max size
inp = torch.randn(max_in_elems, dtype=torch.float32, device='cuda')
out = torch.empty(max_out_elems, dtype=torch.float32, device='cuda')
for avg_msg_size, in_splits, out_splits in sweep_params:
n_warmup = 3 if avg_msg_size >= 16 * 1024 * 1024 else 5
n_iters = 5 if avg_msg_size >= 64 * 1024 * 1024 else (10 if avg_msg_size >= 4 * 1024 * 1024 else 20)
m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp, out, in_splits, out_splits, n_warmup, n_iters)
# Use views into the fixed buffers (same data_ptr → same context key)
inp_view = inp[:sum(in_splits)]
out_view = out[:sum(out_splits)]
m_lat, m_bw = bench_alltoallv(mscclpp_fn, inp_view, out_view, in_splits, out_splits, n_warmup, n_iters)
if use_torch_baseline:
t_lat, t_bw = bench_alltoallv(torch_fn, inp, out, in_splits, out_splits, n_warmup, n_iters)
t_lat, t_bw = bench_alltoallv(torch_fn, inp_view, out_view, in_splits, out_splits, n_warmup, n_iters)
print_row(fmt_size(avg_msg_size), m_lat, m_bw, t_lat, t_bw)
else:
print_row(fmt_size(avg_msg_size), m_lat, m_bw)