Add debug variable MSCCLPP_DEBUG_ALLTOALLV_to print

This commit is contained in:
Qinghua Zhou
2026-04-02 04:39:48 +00:00
parent 36940dbacf
commit 520c890df5
2 changed files with 38 additions and 26 deletions

View File

@@ -12,6 +12,7 @@ via the NativeAlgorithm framework with size-adaptive algorithm selection.
"""
from __future__ import annotations
import os
import torch
import torch.distributed as dist
from typing import Optional, List, Tuple
@@ -23,6 +24,8 @@ from mscclpp._mscclpp import (
)
from mscclpp.ext.algorithm_collection_builder import AlgorithmCollectionBuilder
_DEBUG = os.environ.get("MSCCLPP_DEBUG_ALLTOALLV", "0") == "1"
__all__ = ["MscclppAlltoAllV", "all_to_all_single"]
@@ -239,8 +242,8 @@ class MscclppAlltoAllV:
# Fast path: skip GPU copies + bootstrap exchange if split sizes unchanged
splits_key = (tuple(send_counts_bytes), tuple(recv_counts_bytes))
if splits_key != self._cached_splits_key:
import sys as _sys
print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True)
# Clear cached contexts to free RegisteredMemory for old (possibly freed) tensors.
# Without this, stale CUDA IPC handles accumulate and eventually SIGSEGV.
if hasattr(self._algo, 'reset'):
@@ -252,9 +255,11 @@ class MscclppAlltoAllV:
self._d_recv_displs.copy_(torch.tensor(recv_displs_bytes, dtype=torch.int64))
# Exchange recv displacements with peers via bootstrap
print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True)
remote_recv_displs = self._exchange_recv_displs(recv_displs_bytes)
print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True)
self._d_remote_recv_displs.copy_(torch.tensor(remote_recv_displs, dtype=torch.int64))
# Cache for subsequent calls
@@ -267,9 +272,11 @@ class MscclppAlltoAllV:
# bootstrap operations (comm->connect, setupRemoteMemories).
# Without this barrier, fast ranks' bootstrap messages from
# initialize() can collide with slow ranks still in _exchange_recv_displs.
print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True)
self._comm.bootstrap().barrier()
print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True)
# Get stream
if stream is None:
@@ -288,7 +295,6 @@ class MscclppAlltoAllV:
output_alloc_size = output.nelement() * output.element_size()
# Execute the optimized kernel
import sys as _sys
# Clear any stale CUDA errors before executing (the C++ code checks
# cudaGetLastError() after the kernel and returns INTERNAL_ERROR if any
# previous error was pending).
@@ -298,11 +304,12 @@ class MscclppAlltoAllV:
try:
_cudart = ctypes.CDLL("libcudart.so")
_last_err = _cudart.cudaGetLastError()
if _last_err != 0:
if _last_err != 0 and _DEBUG:
print(f" [rank {self._rank}] WARNING: cleared stale CUDA error code {_last_err} before execute", flush=True)
except Exception:
pass
print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True)
result = self._algo.execute(
self._comm,
input.data_ptr(),
@@ -317,7 +324,8 @@ class MscclppAlltoAllV:
0, # nthreads_per_block (auto)
self._extras,
)
print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True)
if _DEBUG:
print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True)
from mscclpp._mscclpp import CommResult
if result != CommResult.COMM_SUCCESS:

View File

@@ -14,6 +14,8 @@ import torch.distributed as dist
import os
import sys
import time
_DEBUG = os.environ.get("MSCCLPP_DEBUG_ALLTOALLV", "0") == "1"
import random
import socket
import struct
@@ -166,7 +168,7 @@ def main():
except Exception:
ib_devices = []
if rank == 0:
if rank == 0 and _DEBUG:
print(f" Hostname: {hostname}")
print(f" nRanksPerNode: {n_ranks_per_node}, isMultiNode: {is_multi_node}")
print(f" IB devices: {ib_devices if ib_devices else 'NONE FOUND'}")
@@ -174,7 +176,7 @@ def main():
if is_multi_node and not ib_devices:
print(f" WARNING: Multi-node detected but no IB devices! Cross-node will fail.")
# Also print from rank n_ranks_per_node (first rank on node 1) for comparison
if is_multi_node and rank == n_ranks_per_node:
if is_multi_node and rank == n_ranks_per_node and _DEBUG:
print(f" [Node 1] Hostname: {hostname}, rank={rank}")
print(f" [Node 1] IB devices: {ib_devices if ib_devices else 'NONE FOUND'}")
# ── End diagnostics ────────────────────────────────────────────────
@@ -184,7 +186,7 @@ def main():
# Create MscclppAlltoAllV with existing communicator
alltoallv = MscclppAlltoAllV(communicator=comm)
if rank == 0:
if rank == 0 and _DEBUG:
print(f"MscclppAlltoAllV initialized")
print(f"Algorithm: {alltoallv._algo.name}")
@@ -201,14 +203,15 @@ def main():
)
# ── DEBUG: print tensor sizes before all_to_all_single ──
print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, "
f"dtype={input_data.dtype}, device={input_data.device}, "
f"storage_size={input_data.untyped_storage().size()}, "
f"data_ptr=0x{input_data.data_ptr():x}")
print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, "
f"expected_total_elems={world_size * chunk_size}, "
f"scratch_buffer_size={alltoallv._scratch_size}")
sys.stdout.flush()
if _DEBUG:
print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, "
f"dtype={input_data.dtype}, device={input_data.device}, "
f"storage_size={input_data.untyped_storage().size()}, "
f"data_ptr=0x{input_data.data_ptr():x}")
print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, "
f"expected_total_elems={world_size * chunk_size}, "
f"scratch_buffer_size={alltoallv._scratch_size}")
sys.stdout.flush()
dist.barrier()
try:
@@ -224,11 +227,12 @@ def main():
raise
# ── DEBUG: print output tensor sizes ──
print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, "
f"dtype={output.dtype}, device={output.device}, "
f"storage_size={output.untyped_storage().size()}, "
f"data_ptr=0x{output.data_ptr():x}")
sys.stdout.flush()
if _DEBUG:
print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, "
f"dtype={output.dtype}, device={output.device}, "
f"storage_size={output.untyped_storage().size()}, "
f"data_ptr=0x{output.data_ptr():x}")
sys.stdout.flush()
# Verify: each chunk should come from different ranks
try: