From 520c890df57203f3bcbbae766520c15a2f4e4076 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Thu, 2 Apr 2026 04:39:48 +0000 Subject: [PATCH] Add debug variable MSCCLPP_DEBUG_ALLTOALLV_to print --- python/mscclpp/ext/alltoallv_single.py | 28 +++++++++++++------- python/test/test_alltoallv_mscclpp.py | 36 ++++++++++++++------------ 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/python/mscclpp/ext/alltoallv_single.py b/python/mscclpp/ext/alltoallv_single.py index e84171ee..4387c3a3 100644 --- a/python/mscclpp/ext/alltoallv_single.py +++ b/python/mscclpp/ext/alltoallv_single.py @@ -12,6 +12,7 @@ via the NativeAlgorithm framework with size-adaptive algorithm selection. """ from __future__ import annotations +import os import torch import torch.distributed as dist from typing import Optional, List, Tuple @@ -23,6 +24,8 @@ from mscclpp._mscclpp import ( ) from mscclpp.ext.algorithm_collection_builder import AlgorithmCollectionBuilder +_DEBUG = os.environ.get("MSCCLPP_DEBUG_ALLTOALLV", "0") == "1" + __all__ = ["MscclppAlltoAllV", "all_to_all_single"] @@ -239,8 +242,8 @@ class MscclppAlltoAllV: # Fast path: skip GPU copies + bootstrap exchange if split sizes unchanged splits_key = (tuple(send_counts_bytes), tuple(recv_counts_bytes)) if splits_key != self._cached_splits_key: - import sys as _sys - print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: splits changed, doing bootstrap exchange", flush=True) # Clear cached contexts to free RegisteredMemory for old (possibly freed) tensors. # Without this, stale CUDA IPC handles accumulate and eventually SIGSEGV. if hasattr(self._algo, 'reset'): @@ -252,9 +255,11 @@ class MscclppAlltoAllV: self._d_recv_displs.copy_(torch.tensor(recv_displs_bytes, dtype=torch.int64)) # Exchange recv displacements with peers via bootstrap - print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: starting _exchange_recv_displs", flush=True) remote_recv_displs = self._exchange_recv_displs(recv_displs_bytes) - print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: _exchange_recv_displs done", flush=True) self._d_remote_recv_displs.copy_(torch.tensor(remote_recv_displs, dtype=torch.int64)) # Cache for subsequent calls @@ -267,9 +272,11 @@ class MscclppAlltoAllV: # bootstrap operations (comm->connect, setupRemoteMemories). # Without this barrier, fast ranks' bootstrap messages from # initialize() can collide with slow ranks still in _exchange_recv_displs. - print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: waiting on bootstrap barrier", flush=True) self._comm.bootstrap().barrier() - print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: bootstrap barrier done", flush=True) # Get stream if stream is None: @@ -288,7 +295,6 @@ class MscclppAlltoAllV: output_alloc_size = output.nelement() * output.element_size() # Execute the optimized kernel - import sys as _sys # Clear any stale CUDA errors before executing (the C++ code checks # cudaGetLastError() after the kernel and returns INTERNAL_ERROR if any # previous error was pending). @@ -298,11 +304,12 @@ class MscclppAlltoAllV: try: _cudart = ctypes.CDLL("libcudart.so") _last_err = _cudart.cudaGetLastError() - if _last_err != 0: + if _last_err != 0 and _DEBUG: print(f" [rank {self._rank}] WARNING: cleared stale CUDA error code {_last_err} before execute", flush=True) except Exception: pass - print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: calling algo.execute(input_alloc={input_alloc_size}, output_alloc={output_alloc_size})", flush=True) result = self._algo.execute( self._comm, input.data_ptr(), @@ -317,7 +324,8 @@ class MscclppAlltoAllV: 0, # nthreads_per_block (auto) self._extras, ) - print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True) + if _DEBUG: + print(f" [rank {self._rank}] alltoallv: algo.execute returned {result}", flush=True) from mscclpp._mscclpp import CommResult if result != CommResult.COMM_SUCCESS: diff --git a/python/test/test_alltoallv_mscclpp.py b/python/test/test_alltoallv_mscclpp.py index 96695f94..a9ebb6fd 100644 --- a/python/test/test_alltoallv_mscclpp.py +++ b/python/test/test_alltoallv_mscclpp.py @@ -14,6 +14,8 @@ import torch.distributed as dist import os import sys import time + +_DEBUG = os.environ.get("MSCCLPP_DEBUG_ALLTOALLV", "0") == "1" import random import socket import struct @@ -166,7 +168,7 @@ def main(): except Exception: ib_devices = [] - if rank == 0: + if rank == 0 and _DEBUG: print(f" Hostname: {hostname}") print(f" nRanksPerNode: {n_ranks_per_node}, isMultiNode: {is_multi_node}") print(f" IB devices: {ib_devices if ib_devices else 'NONE FOUND'}") @@ -174,7 +176,7 @@ def main(): if is_multi_node and not ib_devices: print(f" WARNING: Multi-node detected but no IB devices! Cross-node will fail.") # Also print from rank n_ranks_per_node (first rank on node 1) for comparison - if is_multi_node and rank == n_ranks_per_node: + if is_multi_node and rank == n_ranks_per_node and _DEBUG: print(f" [Node 1] Hostname: {hostname}, rank={rank}") print(f" [Node 1] IB devices: {ib_devices if ib_devices else 'NONE FOUND'}") # ── End diagnostics ──────────────────────────────────────────────── @@ -184,7 +186,7 @@ def main(): # Create MscclppAlltoAllV with existing communicator alltoallv = MscclppAlltoAllV(communicator=comm) - if rank == 0: + if rank == 0 and _DEBUG: print(f"MscclppAlltoAllV initialized") print(f"Algorithm: {alltoallv._algo.name}") @@ -201,14 +203,15 @@ def main(): ) # ── DEBUG: print tensor sizes before all_to_all_single ── - print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, " - f"dtype={input_data.dtype}, device={input_data.device}, " - f"storage_size={input_data.untyped_storage().size()}, " - f"data_ptr=0x{input_data.data_ptr():x}") - print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, " - f"expected_total_elems={world_size * chunk_size}, " - f"scratch_buffer_size={alltoallv._scratch_size}") - sys.stdout.flush() + if _DEBUG: + print(f" [rank {rank}] input_data: numel={input_data.numel()}, shape={input_data.shape}, " + f"dtype={input_data.dtype}, device={input_data.device}, " + f"storage_size={input_data.untyped_storage().size()}, " + f"data_ptr=0x{input_data.data_ptr():x}") + print(f" [rank {rank}] world_size={world_size}, chunk_size={chunk_size}, " + f"expected_total_elems={world_size * chunk_size}, " + f"scratch_buffer_size={alltoallv._scratch_size}") + sys.stdout.flush() dist.barrier() try: @@ -224,11 +227,12 @@ def main(): raise # ── DEBUG: print output tensor sizes ── - print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, " - f"dtype={output.dtype}, device={output.device}, " - f"storage_size={output.untyped_storage().size()}, " - f"data_ptr=0x{output.data_ptr():x}") - sys.stdout.flush() + if _DEBUG: + print(f" [rank {rank}] output: numel={output.numel()}, shape={output.shape}, " + f"dtype={output.dtype}, device={output.device}, " + f"storage_size={output.untyped_storage().size()}, " + f"data_ptr=0x{output.data_ptr():x}") + sys.stdout.flush() # Verify: each chunk should come from different ranks try: