From 39865c218bd19e18ef5d9b81f8ea56d0fe2ec0ba Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 20 Feb 2026 13:42:29 -0800 Subject: [PATCH 1/6] address flagBuffer ownership issue (#749) This pull request updates the handling of the default flag buffer in the C++ and Python bindings to ensure proper memory management when interfacing with Python. Make sure the buffer will not be deallocated when transfer ownership from cpp to python --- include/mscclpp/algorithm.hpp | 2 +- python/csrc/algorithm.cpp | 13 +++++++---- python/mscclpp/_core/algorithm.py | 17 ++++++++++---- .../ext/algorithm_collection_builder.py | 4 ++-- src/core/algorithm.cc | 23 +++++++++++-------- src/ext/nccl/nccl.cc | 2 +- 6 files changed, 39 insertions(+), 22 deletions(-) diff --git a/include/mscclpp/algorithm.hpp b/include/mscclpp/algorithm.hpp index 6cc05ad4..07149cab 100644 --- a/include/mscclpp/algorithm.hpp +++ b/include/mscclpp/algorithm.hpp @@ -366,7 +366,7 @@ class AlgorithmCollection { /// Get a default GPU flag buffer (allocated once and reused). /// @return A pair of (shared_ptr to the flag buffer, size in bytes). -std::pair, size_t> getDefaultFlagBuffer(); +std::pair, size_t> getFlagBuffer(); } // namespace mscclpp diff --git a/python/csrc/algorithm.cpp b/python/csrc/algorithm.cpp index c8365566..f0d8980d 100644 --- a/python/csrc/algorithm.cpp +++ b/python/csrc/algorithm.cpp @@ -116,10 +116,15 @@ void register_algorithm(nb::module_& m) { .def("buffer_mode", &CollectiveRequest::bufferMode); m.def( - "cpp_get_default_flag_buffer", + "cpp_get_flag_buffer", []() { - auto [buffer, size] = getDefaultFlagBuffer(); - return std::make_pair(reinterpret_cast(buffer.get()), size); + auto [buffer, size] = getFlagBuffer(); + uintptr_t ptr = reinterpret_cast(buffer.get()); + // Transfer shared_ptr ownership into a capsule so Python's GC manages the lifetime. + auto prevent = std::make_unique>(std::move(buffer)); + nb::capsule owner(prevent.get(), [](void* p) noexcept { delete static_cast*>(p); }); + prevent.release(); // capsule now owns the pointer + return nb::make_tuple(ptr, size, owner); }, - "Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size)."); + "Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size, owner)."); } \ No newline at end of file diff --git a/python/mscclpp/_core/algorithm.py b/python/mscclpp/_core/algorithm.py index c712bf88..9b870582 100644 --- a/python/mscclpp/_core/algorithm.py +++ b/python/mscclpp/_core/algorithm.py @@ -19,7 +19,7 @@ from mscclpp._mscclpp import ( CppReduceOp, CppAlgorithmBuilder, CppAlgorithmCollection, - cpp_get_default_flag_buffer, + cpp_get_flag_buffer, ) __all__ = ["Algorithm", "AlgorithmBuilder", "AlgorithmCollection"] @@ -241,15 +241,22 @@ class AlgorithmCollection: self._algorithms.append(algorithm) -def get_default_flag_buffer() -> cp.ndarray: +_flag_buffer_cache = None + + +def get_flag_buffer() -> cp.ndarray: """Get the default flag buffer for algorithm selection. This buffer is used internally by default algorithms to store selection flags. It is allocated as a shared GPU buffer and can be accessed from Python. + The result is cached so all callers share the same buffer. Returns: A CuPy array representing the flag buffer on the GPU. """ - buffer_ptr, buffer_size = cpp_get_default_flag_buffer() - memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer_ptr, buffer_size, None), 0) - return cp.ndarray((buffer_size // 4,), dtype=cp.uint32, memptr=memptr) + global _flag_buffer_cache + if _flag_buffer_cache is None: + buffer_ptr, buffer_size, owner = cpp_get_flag_buffer() + memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer_ptr, buffer_size, owner), 0) + _flag_buffer_cache = cp.ndarray((buffer_size // 4,), dtype=cp.uint32, memptr=memptr) + return _flag_buffer_cache diff --git a/python/mscclpp/ext/algorithm_collection_builder.py b/python/mscclpp/ext/algorithm_collection_builder.py index 80c68909..ddfb929f 100644 --- a/python/mscclpp/ext/algorithm_collection_builder.py +++ b/python/mscclpp/ext/algorithm_collection_builder.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import Union -from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection, get_default_flag_buffer +from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection, get_flag_buffer import atexit from mscclpp._mscclpp import CppAlgorithmCollectionBuilder @@ -58,7 +58,7 @@ class AlgorithmCollectionBuilder: rank: int, ) -> AlgorithmCollection: if self._flag_buffer is None: - self._flag_buffer = get_default_flag_buffer() + self._flag_buffer = get_flag_buffer() native_collection = self._builder.build_default_algorithms( int(scratch_buffer), scratch_buffer_size, self._flag_buffer.data.ptr, self._flag_buffer.nbytes, rank ) diff --git a/src/core/algorithm.cc b/src/core/algorithm.cc index 98ac5520..683d4ddd 100644 --- a/src/core/algorithm.cc +++ b/src/core/algorithm.cc @@ -199,18 +199,23 @@ std::shared_ptr DslAlgorithm::build() { return shared_from_this(); } // TODO: implement this void DslAlgorithm::reset() {} -static std::weak_ptr gDefaultFlagBuffer; +static uint32_t* gDefaultFlagBuffer = nullptr; +static std::weak_ptr gDefaultFlagBufferWeak; static size_t gDefaultFlagCount = 128; -std::pair, size_t> getDefaultFlagBuffer() { - std::shared_ptr flagBuffer = gDefaultFlagBuffer.lock(); - if (!flagBuffer) { - flagBuffer = mscclpp::detail::gpuCallocShared(gDefaultFlagCount); - std::vector initFlags(gDefaultFlagCount, 1); - mscclpp::gpuMemcpy(flagBuffer.get(), initFlags.data(), gDefaultFlagCount, cudaMemcpyHostToDevice); - gDefaultFlagBuffer = flagBuffer; +std::pair, size_t> getFlagBuffer() { + auto ptr = gDefaultFlagBufferWeak.lock(); + if (!ptr) { + if (!gDefaultFlagBuffer) { + // Intentionally never freed — CUDA driver reclaims GPU memory at process exit. + gDefaultFlagBuffer = static_cast(mscclpp::detail::gpuCalloc(gDefaultFlagCount * sizeof(uint32_t))); + std::vector initFlags(gDefaultFlagCount, 1); + mscclpp::gpuMemcpy(gDefaultFlagBuffer, initFlags.data(), gDefaultFlagCount, cudaMemcpyHostToDevice); + } + ptr = std::shared_ptr(gDefaultFlagBuffer, [](void*) {}); + gDefaultFlagBufferWeak = ptr; } - return {flagBuffer, gDefaultFlagCount * sizeof(uint32_t)}; + return {ptr, gDefaultFlagCount * sizeof(uint32_t)}; } } // namespace mscclpp \ No newline at end of file diff --git a/src/ext/nccl/nccl.cc b/src/ext/nccl/nccl.cc index bfde4786..afeb5bdb 100644 --- a/src/ext/nccl/nccl.cc +++ b/src/ext/nccl/nccl.cc @@ -294,7 +294,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI commPtr->scratchBuffer_ = mscclpp::GpuBuffer(commPtr->scratchBufferSize_).memory(); commPtr->executor = std::make_shared(mscclppComm, commPtr->scratchBuffer_); - auto [buffer, size] = mscclpp::getDefaultFlagBuffer(); + auto [buffer, size] = mscclpp::getFlagBuffer(); commPtr->flagBuffer_ = buffer; commPtr->flagBufferSize_ = size; From e2acf7f1c8a274a8ba71ae4b182bfb47010119ea Mon Sep 17 00:00:00 2001 From: Caio Rocha <164253795+caiomcbr@users.noreply.github.com> Date: Fri, 20 Feb 2026 16:04:12 -0800 Subject: [PATCH 2/6] Removing MPI Dependency (#743) --- .azure-pipelines/multi-nodes-test.yml | 2 +- .../templates/integration-test.yaml | 2 +- .azure-pipelines/templates/nccl-test.yaml | 2 +- .azure-pipelines/templates/ut-no-ib-env.yaml | 2 +- .azure-pipelines/templates/ut-npkit.yaml | 2 +- .azure-pipelines/templates/ut.yaml | 4 +- CMakeLists.txt | 2 +- python/mscclpp/_core/comm.py | 37 +++++++++++++------ 8 files changed, 34 insertions(+), 19 deletions(-) diff --git a/.azure-pipelines/multi-nodes-test.yml b/.azure-pipelines/multi-nodes-test.yml index 97a95c94..914c2317 100644 --- a/.azure-pipelines/multi-nodes-test.yml +++ b/.azure-pipelines/multi-nodes-test.yml @@ -44,7 +44,7 @@ jobs: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/integration-test.yaml b/.azure-pipelines/templates/integration-test.yaml index b9dac24b..99ed6d04 100644 --- a/.azure-pipelines/templates/integration-test.yaml +++ b/.azure-pipelines/templates/integration-test.yaml @@ -19,7 +19,7 @@ steps: targetType: inline script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/nccl-test.yaml b/.azure-pipelines/templates/nccl-test.yaml index 1ce37d20..56b75d3f 100644 --- a/.azure-pipelines/templates/nccl-test.yaml +++ b/.azure-pipelines/templates/nccl-test.yaml @@ -27,7 +27,7 @@ steps: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)/mscclpp' diff --git a/.azure-pipelines/templates/ut-no-ib-env.yaml b/.azure-pipelines/templates/ut-no-ib-env.yaml index aa21c407..e6576f6d 100644 --- a/.azure-pipelines/templates/ut-no-ib-env.yaml +++ b/.azure-pipelines/templates/ut-no-ib-env.yaml @@ -16,7 +16,7 @@ steps: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/ut-npkit.yaml b/.azure-pipelines/templates/ut-npkit.yaml index 0ab733c9..5c35317e 100644 --- a/.azure-pipelines/templates/ut-npkit.yaml +++ b/.azure-pipelines/templates/ut-npkit.yaml @@ -63,7 +63,7 @@ steps: set -e; \ cd /root/mscclpp; \ mkdir -p build && cd build; \ - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \ + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \ make -j"' kill $CHILD_PID workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/ut.yaml b/.azure-pipelines/templates/ut.yaml index 82ff4aac..2086fd0a 100644 --- a/.azure-pipelines/templates/ut.yaml +++ b/.azure-pipelines/templates/ut.yaml @@ -20,9 +20,9 @@ steps: script: | mkdir build && cd build if [ "${{ parameters.platform }}" == "rocm" ]; then - CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. else - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. fi make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/CMakeLists.txt b/CMakeLists.txt index 6288dbb0..e524b9ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,7 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) # Options option(MSCCLPP_ENABLE_TRACE "Enable tracing" OFF) -option(MSCCLPP_BUILD_TESTS "Build tests" ON) +option(MSCCLPP_BUILD_TESTS "Build tests" OFF) option(MSCCLPP_BUILD_PYTHON_BINDINGS "Build Python bindings" ON) option(MSCCLPP_BUILD_EXT_NCCL "Build NCCL interfaces" ON) option(MSCCLPP_BUILD_EXT_COLLECTIVES "Build collective algorithms" ON) diff --git a/python/mscclpp/_core/comm.py b/python/mscclpp/_core/comm.py index f0c5c219..e74a0e6f 100644 --- a/python/mscclpp/_core/comm.py +++ b/python/mscclpp/_core/comm.py @@ -19,8 +19,8 @@ from mscclpp._mscclpp import ( CppTransport, CppTransportFlags, ) -import mpi4py import numpy as np +import pickle from mscclpp.utils import is_torch_tensor @@ -29,20 +29,35 @@ __all__ = ["CommGroup"] class CommGroup: def __init__( - self, mpi_comm: mpi4py.MPI.Comm = None, interfaceIpPortTrio: str = "", rank: int = None, size: int = None + self, + mpi_comm: "mpi4py.MPI.Comm" = None, + torch_group: "dist.ProcessGroup" = None, + interfaceIpPortTrio: str = "", + rank: int = None, + size: int = None, ): - if interfaceIpPortTrio == "": - self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size) + if interfaceIpPortTrio == "" and (mpi_comm is not None or torch_group is not None): uniq_id = None - if mpi_comm.rank == 0: - # similar to NCCL's unique id + self.bootstrap = CppTcpBootstrap.create(rank, size) + if rank == 0: uniq_id = self.bootstrap.create_unique_id() - uniq_id_global = mpi_comm.bcast(uniq_id, 0) + if mpi_comm is not None: + import mpi4py + + uniq_id_global = mpi_comm.bcast(uniq_id, 0) + else: + import torch + import torch.distributed as dist + + if rank == 0: + uniq_id_global = uniq_id + pickled_data = pickle.dumps(uniq_id) + data_tensor = torch.frombuffer(bytearray(pickled_data), dtype=torch.uint8).clone() + else: + data_tensor = torch.zeros(256, dtype=torch.uint8) + dist.broadcast(data_tensor, src=0, group=torch_group) + uniq_id_global = pickle.loads(data_tensor.numpy().tobytes()) self.bootstrap.initialize(uniq_id_global) - elif mpi_comm: - # use this instead - self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size) - self.bootstrap.initialize(interfaceIpPortTrio) elif not interfaceIpPortTrio == "": assert rank >= 0 and size >= 1 self.bootstrap = CppTcpBootstrap.create(rank, size) From 3962574bcb73a6c8bb6d0c2449885b218bbe5e72 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 20 Feb 2026 16:11:16 -0800 Subject: [PATCH 3/6] Address installation issue in some env (#750) This pull request updates the way the `nlohmann/json` library is fetched and upgrades it to a newer version in both the main build and test configuration files. Addressed installation issue in some env --- CMakeLists.txt | 6 ++++-- test/mscclpp-test/CMakeLists.txt | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e524b9ab..8d02abd8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -169,9 +169,11 @@ if(MSCCLPP_USE_IB) endif() find_package(NUMA REQUIRED) find_package(Threads REQUIRED) - include(FetchContent) -FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz) +FetchContent_Declare(json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 +) FetchContent_MakeAvailable(json) if("${INSTALL_PREFIX}" STREQUAL "") diff --git a/test/mscclpp-test/CMakeLists.txt b/test/mscclpp-test/CMakeLists.txt index eb2b26ca..241b7e02 100644 --- a/test/mscclpp-test/CMakeLists.txt +++ b/test/mscclpp-test/CMakeLists.txt @@ -1,7 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.2/json.tar.xz) +FetchContent_Declare(json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 +) FetchContent_MakeAvailable(json) function(add_mscclpp_test_executable name sources) From 2a6f1c11927389bcee3398e0a43384aa3eb98e5e Mon Sep 17 00:00:00 2001 From: mahdiehghazim Date: Fri, 20 Feb 2026 22:46:32 -0500 Subject: [PATCH 4/6] Mahdieh/switchchannel test clean (#751) This PR adds an example code for switch channel testing. It validates switch channel on single node and multi node environments. We need to add the description of the algorithms and the explanation of the code under doc. example outputs: rank0: ./bidir_switch_channel 10.0.5.233:45571 0 0 Rank 0 (GPU 0): Preparing for tests ... Rank 0 (GPU 0): bytes 4096, elapsed 0.0062328 ms/iter, BW 0.657169 GB/s Rank 0 (GPU 0): bytes 4.1943e+06, elapsed 0.0164577 ms/iter, BW 254.854 GB/s Rank 0 (GPU 0): bytes 1.34218e+08, elapsed 0.33628 ms/iter, BW 399.125 GB/s Rank 0: Succeed! rank1: ./bidir_switch_channel 10.0.5.233:45571 1 0 Rank 1 (GPU 0): Preparing for tests ... Rank 1: Succeed! --- examples/tutorials/05-switch-channel/Makefile | 15 ++ .../05-switch-channel/bidir_switch_channel.cu | 177 ++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 examples/tutorials/05-switch-channel/Makefile create mode 100644 examples/tutorials/05-switch-channel/bidir_switch_channel.cu diff --git a/examples/tutorials/05-switch-channel/Makefile b/examples/tutorials/05-switch-channel/Makefile new file mode 100644 index 00000000..1a211f64 --- /dev/null +++ b/examples/tutorials/05-switch-channel/Makefile @@ -0,0 +1,15 @@ +CUDA_HOME ?= /usr/local/cuda + +COMPILER := $(CUDA_HOME)/bin/nvcc +ARCH_FLAG := -arch=native + +TARGET = bidir_switch_channel +SRC = bidir_switch_channel.cu + +all: $(TARGET) + +$(TARGET): $(SRC) + $(COMPILER) $(ARCH_FLAG) -o $@ $< -lmscclpp + +clean: + rm -f $(TARGET) diff --git a/examples/tutorials/05-switch-channel/bidir_switch_channel.cu b/examples/tutorials/05-switch-channel/bidir_switch_channel.cu new file mode 100644 index 00000000..658e6f05 --- /dev/null +++ b/examples/tutorials/05-switch-channel/bidir_switch_channel.cu @@ -0,0 +1,177 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#define PORT_NUMBER "50505" + +template +void log(Args &&...args) { + std::stringstream ss; + (ss << ... << args); + ss << std::endl; + std::cout << ss.str(); +} + +int spawn_process(std::function func) { + pid_t pid = fork(); + if (pid < 0) return -1; + if (pid == 0) { + // Child process + func(); + exit(0); + } + return pid; +} + +int wait_process(int pid) { + int status; + if (waitpid(pid, &status, 0) < 0) { + return -1; + } + if (WIFEXITED(status)) { + return WEXITSTATUS(status); + } + return -1; +} + +__constant__ mscclpp::SwitchChannelDeviceHandle gConstSwitchChan; + +__device__ mscclpp::DeviceSyncer devSyncer; + +__global__ void kernelSwitchReduce(int rank, int numElements) { + const int tid = threadIdx.x + blockIdx.x * blockDim.x; + int stride = blockDim.x * gridDim.x; + + // rank 0 performs on first half of data and rank 1 on second half + int min = rank * (numElements / 2); + int max = (rank + 1) * (numElements / 2); + + for (int i = tid + min; i < max; i += stride) { + auto val = gConstSwitchChan.reduce(i); + gConstSwitchChan.broadcast(i, val); + } +} + +void worker(int myRank, int gpuId, const std::string &ipPort) { + MSCCLPP_CUDATHROW(cudaSetDevice(gpuId)); + const int nRanks = 2; + const int iter = 1000; + const size_t bufferBytes = 128 * 1024 * 1024; + + log("Rank ", myRank, " (GPU ", gpuId, "): Preparing for tests ..."); + + // Build a connection and a semaphore + auto bootstrap = std::make_shared(myRank, nRanks); + bootstrap->initialize(ipPort); + std::shared_ptr comm = std::make_shared(bootstrap); + + std::vector ranks; + ranks.reserve(nRanks); + for (int i = 0; i < nRanks; i++) ranks.push_back(i); + + auto buffer = mscclpp::GpuBuffer(bufferBytes); + + auto nvlsConnection = mscclpp::connectNvlsCollective(comm, ranks, bufferBytes); + + auto switchChannel = nvlsConnection->bindAllocatedMemory(CUdeviceptr(buffer.data()), bufferBytes); + + auto deviceHandle = switchChannel.deviceHandle(); + + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gConstSwitchChan, &deviceHandle, sizeof(deviceHandle))); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + // Call the kernel in a loop for perf evaluation + + for (size_t numElements : {1024, 1024 * 1024, 32 * 1024 * 1024}) { + cudaEvent_t start, end; + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventCreate(&start)); + MSCCLPP_CUDATHROW(cudaEventCreate(&end)); + } + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + bootstrap->barrier(); + + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventRecord(start, 0)); + } + + for (int i = 0; i < iter; ++i) { + kernelSwitchReduce<<<256, 1024>>>(myRank, numElements); + } + + MSCCLPP_CUDATHROW(cudaGetLastError()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + comm->bootstrap()->barrier(); + + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventRecord(end, 0)); + MSCCLPP_CUDATHROW(cudaEventSynchronize(end)); + float elapsedTime; + float elapsedTimePerIter; + float gbps; + MSCCLPP_CUDATHROW(cudaEventElapsedTime(&elapsedTime, start, end)); + elapsedTimePerIter = elapsedTime / iter; + float dataSize = numElements * 4; + gbps = dataSize / elapsedTimePerIter * 1e-6f; + log("Rank ", myRank, " (GPU ", gpuId, "): bytes ", dataSize, ", elapsed ", elapsedTimePerIter, " ms/iter, BW ", + gbps, " GB/s"); + } + } +} + +int main(int argc, char **argv) { + if (argc == 1) { + int pid0 = spawn_process([]() { worker(0, 0, "lo:127.0.0.1:" PORT_NUMBER); }); + int pid1 = spawn_process([]() { worker(1, 1, "lo:127.0.0.1:" PORT_NUMBER); }); + if (pid0 < 0 || pid1 < 0) { + log("Failed to spawn processes."); + return -1; + } + int status0 = wait_process(pid0); + int status1 = wait_process(pid1); + if (status0 < 0 || status1 < 0) { + log("Failed to wait for processes."); + return -1; + } + if (status0 != 0 || status1 != 0) { + log("One of the processes failed."); + return -1; + } + log("Succeed!"); + return 0; + } else if (argc == 4) { + std::string ipPort = argv[1]; + int rank, gpuId; + try { + rank = std::stoi(argv[2]); + gpuId = std::stoi(argv[3]); + } catch (const std::exception &) { + log("Error: rank and gpu_id must be valid integers."); + return -1; + } + if (rank < 0 || rank > 2 || gpuId < 0) { + log("Error: rank must be between 0 and 1 and gpu_id must be non-negative."); + return -1; + } + worker(rank, gpuId, ipPort); + log("Rank ", rank, ": Succeed!"); + return 0; + } else { + std::cerr << "Usage:\n" + << " " << argv[0] << " Run in intra-node mode\n" + << " " << argv[0] << " Run in inter-node mode\n"; + return -1; + } +} From b5256032fe407935fd9ddffb6e0847b0996f1d4b Mon Sep 17 00:00:00 2001 From: Caio Rocha <164253795+caiomcbr@users.noreply.github.com> Date: Mon, 23 Feb 2026 11:55:17 -0800 Subject: [PATCH 5/6] Disabling Nanobind Memory Leak Warnings in Release Builds (#745) Co-authored-by: Binyang Li --- CMakeLists.txt | 1 + python/CMakeLists.txt | 6 +++++- python/csrc/core_py.cpp | 5 ++++- python/test/_cpp/proxy_test.cpp | 5 ++++- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8d02abd8..9ff7b075 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,6 +56,7 @@ option(MSCCLPP_USE_ROCM "Use AMD/ROCm." OFF) option(MSCCLPP_USE_IB "Use InfiniBand." ON) option(MSCCLPP_BYPASS_GPU_CHECK "Bypass GPU check." OFF) option(MSCCLPP_NPKIT_FLAGS "Set NPKIT flags" OFF) +option(MSCCLPP_DISABLE_NB_LEAK_WARNINGS "Disable Nanobind leak warnings" ON) set(MSCCLPP_GPU_ARCHS "" CACHE STRING "Specify GPU architectures with delimiters (comma, space, or semicolon).") if(MSCCLPP_BYPASS_GPU_CHECK) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index b84cea3a..5e784e92 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -4,6 +4,10 @@ add_subdirectory(csrc) add_subdirectory(test) +target_compile_definitions(mscclpp_py PRIVATE + $<$:MSCCLPP_DISABLE_NB_LEAK_WARNINGS> +) + add_custom_target(pytest_lib_copy ALL COMMAND ${CMAKE_COMMAND} -E copy_if_different ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/_mscclpp.*.so @@ -12,4 +16,4 @@ add_custom_target(pytest_lib_copy ALL ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/_ext.*.so ${CMAKE_CURRENT_SOURCE_DIR}/test/_cpp DEPENDS mscclpp_py mscclpp_py_test -) +) \ No newline at end of file diff --git a/python/csrc/core_py.cpp b/python/csrc/core_py.cpp index 1a884cb3..47d76ac4 100644 --- a/python/csrc/core_py.cpp +++ b/python/csrc/core_py.cpp @@ -307,6 +307,9 @@ void register_core(nb::module_& m) { } NB_MODULE(_mscclpp, m) { +#ifdef MSCCLPP_DISABLE_NB_LEAK_WARNINGS + nb::set_leak_warnings(false); +#endif register_env(m); register_error(m); register_port_channel(m); @@ -324,4 +327,4 @@ NB_MODULE(_mscclpp, m) { // ext register_algorithm_collection_builder(m); -} +} \ No newline at end of file diff --git a/python/test/_cpp/proxy_test.cpp b/python/test/_cpp/proxy_test.cpp index 5bc18e23..697a5c38 100644 --- a/python/test/_cpp/proxy_test.cpp +++ b/python/test/_cpp/proxy_test.cpp @@ -63,10 +63,13 @@ class MyProxyService { }; NB_MODULE(_ext, m) { +#ifdef MSCCLPP_DISABLE_NB_LEAK_WARNINGS + nb::set_leak_warnings(false); +#endif nb::class_(m, "MyProxyService") .def(nb::init(), nb::arg("rank"), nb::arg("nranks"), nb::arg("data_size"), nb::arg("reg_mem_list"), nb::arg("sem_list")) .def("fifo_device_handle", &MyProxyService::fifoDeviceHandle) .def("start", &MyProxyService::start) .def("stop", &MyProxyService::stop); -} +} \ No newline at end of file From 7738603d6310dbb4f025a80cbb71da043cd46024 Mon Sep 17 00:00:00 2001 From: Caio Rocha <164253795+caiomcbr@users.noreply.github.com> Date: Mon, 23 Feb 2026 16:33:52 -0800 Subject: [PATCH 6/6] Adjusting Communicator in Python API (#752) --- python/mscclpp/_core/comm.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/mscclpp/_core/comm.py b/python/mscclpp/_core/comm.py index e74a0e6f..d42349dd 100644 --- a/python/mscclpp/_core/comm.py +++ b/python/mscclpp/_core/comm.py @@ -38,6 +38,11 @@ class CommGroup: ): if interfaceIpPortTrio == "" and (mpi_comm is not None or torch_group is not None): uniq_id = None + rank, size = ( + (mpi_comm.Get_rank(), mpi_comm.Get_size()) + if mpi_comm is not None + else (torch_group.rank(), torch_group.size()) + ) self.bootstrap = CppTcpBootstrap.create(rank, size) if rank == 0: uniq_id = self.bootstrap.create_unique_id()