diff --git a/.azure-pipelines/multi-nodes-test.yml b/.azure-pipelines/multi-nodes-test.yml index 97a95c94..914c2317 100644 --- a/.azure-pipelines/multi-nodes-test.yml +++ b/.azure-pipelines/multi-nodes-test.yml @@ -44,7 +44,7 @@ jobs: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/integration-test.yaml b/.azure-pipelines/templates/integration-test.yaml index b9dac24b..99ed6d04 100644 --- a/.azure-pipelines/templates/integration-test.yaml +++ b/.azure-pipelines/templates/integration-test.yaml @@ -19,7 +19,7 @@ steps: targetType: inline script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/nccl-test.yaml b/.azure-pipelines/templates/nccl-test.yaml index 1ce37d20..56b75d3f 100644 --- a/.azure-pipelines/templates/nccl-test.yaml +++ b/.azure-pipelines/templates/nccl-test.yaml @@ -27,7 +27,7 @@ steps: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)/mscclpp' diff --git a/.azure-pipelines/templates/ut-no-ib-env.yaml b/.azure-pipelines/templates/ut-no-ib-env.yaml index aa21c407..e6576f6d 100644 --- a/.azure-pipelines/templates/ut-no-ib-env.yaml +++ b/.azure-pipelines/templates/ut-no-ib-env.yaml @@ -16,7 +16,7 @@ steps: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/ut-npkit.yaml b/.azure-pipelines/templates/ut-npkit.yaml index 0ab733c9..5c35317e 100644 --- a/.azure-pipelines/templates/ut-npkit.yaml +++ b/.azure-pipelines/templates/ut-npkit.yaml @@ -63,7 +63,7 @@ steps: set -e; \ cd /root/mscclpp; \ mkdir -p build && cd build; \ - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \ + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \ make -j"' kill $CHILD_PID workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/ut.yaml b/.azure-pipelines/templates/ut.yaml index 82ff4aac..2086fd0a 100644 --- a/.azure-pipelines/templates/ut.yaml +++ b/.azure-pipelines/templates/ut.yaml @@ -20,9 +20,9 @@ steps: script: | mkdir build && cd build if [ "${{ parameters.platform }}" == "rocm" ]; then - CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. else - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. fi make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/CMakeLists.txt b/CMakeLists.txt index d46e45fe..8fa19a3c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,7 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) # Options option(MSCCLPP_ENABLE_TRACE "Enable tracing" OFF) -option(MSCCLPP_BUILD_TESTS "Build tests" ON) +option(MSCCLPP_BUILD_TESTS "Build tests" OFF) option(MSCCLPP_BUILD_PYTHON_BINDINGS "Build Python bindings" ON) option(MSCCLPP_BUILD_EXT_NCCL "Build NCCL interfaces" ON) option(MSCCLPP_BUILD_EXT_COLLECTIVES "Build collective algorithms" ON) @@ -56,6 +56,7 @@ option(MSCCLPP_USE_ROCM "Use AMD/ROCm." OFF) option(MSCCLPP_USE_IB "Use InfiniBand." ON) option(MSCCLPP_BYPASS_GPU_CHECK "Bypass GPU check." OFF) option(MSCCLPP_NPKIT_FLAGS "Set NPKIT flags" OFF) +option(MSCCLPP_DISABLE_NB_LEAK_WARNINGS "Disable Nanobind leak warnings" ON) set(MSCCLPP_GPU_ARCHS "" CACHE STRING "Specify GPU architectures with delimiters (comma, space, or semicolon).") if(MSCCLPP_BYPASS_GPU_CHECK) @@ -185,7 +186,10 @@ if(MSCCLPP_USE_GDRCOPY) endif() include(FetchContent) -FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz) +FetchContent_Declare(json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 +) FetchContent_MakeAvailable(json) if("${INSTALL_PREFIX}" STREQUAL "") diff --git a/examples/tutorials/05-switch-channel/Makefile b/examples/tutorials/05-switch-channel/Makefile new file mode 100644 index 00000000..1a211f64 --- /dev/null +++ b/examples/tutorials/05-switch-channel/Makefile @@ -0,0 +1,15 @@ +CUDA_HOME ?= /usr/local/cuda + +COMPILER := $(CUDA_HOME)/bin/nvcc +ARCH_FLAG := -arch=native + +TARGET = bidir_switch_channel +SRC = bidir_switch_channel.cu + +all: $(TARGET) + +$(TARGET): $(SRC) + $(COMPILER) $(ARCH_FLAG) -o $@ $< -lmscclpp + +clean: + rm -f $(TARGET) diff --git a/examples/tutorials/05-switch-channel/bidir_switch_channel.cu b/examples/tutorials/05-switch-channel/bidir_switch_channel.cu new file mode 100644 index 00000000..658e6f05 --- /dev/null +++ b/examples/tutorials/05-switch-channel/bidir_switch_channel.cu @@ -0,0 +1,177 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#define PORT_NUMBER "50505" + +template +void log(Args &&...args) { + std::stringstream ss; + (ss << ... << args); + ss << std::endl; + std::cout << ss.str(); +} + +int spawn_process(std::function func) { + pid_t pid = fork(); + if (pid < 0) return -1; + if (pid == 0) { + // Child process + func(); + exit(0); + } + return pid; +} + +int wait_process(int pid) { + int status; + if (waitpid(pid, &status, 0) < 0) { + return -1; + } + if (WIFEXITED(status)) { + return WEXITSTATUS(status); + } + return -1; +} + +__constant__ mscclpp::SwitchChannelDeviceHandle gConstSwitchChan; + +__device__ mscclpp::DeviceSyncer devSyncer; + +__global__ void kernelSwitchReduce(int rank, int numElements) { + const int tid = threadIdx.x + blockIdx.x * blockDim.x; + int stride = blockDim.x * gridDim.x; + + // rank 0 performs on first half of data and rank 1 on second half + int min = rank * (numElements / 2); + int max = (rank + 1) * (numElements / 2); + + for (int i = tid + min; i < max; i += stride) { + auto val = gConstSwitchChan.reduce(i); + gConstSwitchChan.broadcast(i, val); + } +} + +void worker(int myRank, int gpuId, const std::string &ipPort) { + MSCCLPP_CUDATHROW(cudaSetDevice(gpuId)); + const int nRanks = 2; + const int iter = 1000; + const size_t bufferBytes = 128 * 1024 * 1024; + + log("Rank ", myRank, " (GPU ", gpuId, "): Preparing for tests ..."); + + // Build a connection and a semaphore + auto bootstrap = std::make_shared(myRank, nRanks); + bootstrap->initialize(ipPort); + std::shared_ptr comm = std::make_shared(bootstrap); + + std::vector ranks; + ranks.reserve(nRanks); + for (int i = 0; i < nRanks; i++) ranks.push_back(i); + + auto buffer = mscclpp::GpuBuffer(bufferBytes); + + auto nvlsConnection = mscclpp::connectNvlsCollective(comm, ranks, bufferBytes); + + auto switchChannel = nvlsConnection->bindAllocatedMemory(CUdeviceptr(buffer.data()), bufferBytes); + + auto deviceHandle = switchChannel.deviceHandle(); + + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gConstSwitchChan, &deviceHandle, sizeof(deviceHandle))); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + // Call the kernel in a loop for perf evaluation + + for (size_t numElements : {1024, 1024 * 1024, 32 * 1024 * 1024}) { + cudaEvent_t start, end; + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventCreate(&start)); + MSCCLPP_CUDATHROW(cudaEventCreate(&end)); + } + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + bootstrap->barrier(); + + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventRecord(start, 0)); + } + + for (int i = 0; i < iter; ++i) { + kernelSwitchReduce<<<256, 1024>>>(myRank, numElements); + } + + MSCCLPP_CUDATHROW(cudaGetLastError()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + comm->bootstrap()->barrier(); + + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventRecord(end, 0)); + MSCCLPP_CUDATHROW(cudaEventSynchronize(end)); + float elapsedTime; + float elapsedTimePerIter; + float gbps; + MSCCLPP_CUDATHROW(cudaEventElapsedTime(&elapsedTime, start, end)); + elapsedTimePerIter = elapsedTime / iter; + float dataSize = numElements * 4; + gbps = dataSize / elapsedTimePerIter * 1e-6f; + log("Rank ", myRank, " (GPU ", gpuId, "): bytes ", dataSize, ", elapsed ", elapsedTimePerIter, " ms/iter, BW ", + gbps, " GB/s"); + } + } +} + +int main(int argc, char **argv) { + if (argc == 1) { + int pid0 = spawn_process([]() { worker(0, 0, "lo:127.0.0.1:" PORT_NUMBER); }); + int pid1 = spawn_process([]() { worker(1, 1, "lo:127.0.0.1:" PORT_NUMBER); }); + if (pid0 < 0 || pid1 < 0) { + log("Failed to spawn processes."); + return -1; + } + int status0 = wait_process(pid0); + int status1 = wait_process(pid1); + if (status0 < 0 || status1 < 0) { + log("Failed to wait for processes."); + return -1; + } + if (status0 != 0 || status1 != 0) { + log("One of the processes failed."); + return -1; + } + log("Succeed!"); + return 0; + } else if (argc == 4) { + std::string ipPort = argv[1]; + int rank, gpuId; + try { + rank = std::stoi(argv[2]); + gpuId = std::stoi(argv[3]); + } catch (const std::exception &) { + log("Error: rank and gpu_id must be valid integers."); + return -1; + } + if (rank < 0 || rank > 2 || gpuId < 0) { + log("Error: rank must be between 0 and 1 and gpu_id must be non-negative."); + return -1; + } + worker(rank, gpuId, ipPort); + log("Rank ", rank, ": Succeed!"); + return 0; + } else { + std::cerr << "Usage:\n" + << " " << argv[0] << " Run in intra-node mode\n" + << " " << argv[0] << " Run in inter-node mode\n"; + return -1; + } +} diff --git a/include/mscclpp/algorithm.hpp b/include/mscclpp/algorithm.hpp index 6cc05ad4..07149cab 100644 --- a/include/mscclpp/algorithm.hpp +++ b/include/mscclpp/algorithm.hpp @@ -366,7 +366,7 @@ class AlgorithmCollection { /// Get a default GPU flag buffer (allocated once and reused). /// @return A pair of (shared_ptr to the flag buffer, size in bytes). -std::pair, size_t> getDefaultFlagBuffer(); +std::pair, size_t> getFlagBuffer(); } // namespace mscclpp diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index b84cea3a..5e784e92 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -4,6 +4,10 @@ add_subdirectory(csrc) add_subdirectory(test) +target_compile_definitions(mscclpp_py PRIVATE + $<$:MSCCLPP_DISABLE_NB_LEAK_WARNINGS> +) + add_custom_target(pytest_lib_copy ALL COMMAND ${CMAKE_COMMAND} -E copy_if_different ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/_mscclpp.*.so @@ -12,4 +16,4 @@ add_custom_target(pytest_lib_copy ALL ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/_ext.*.so ${CMAKE_CURRENT_SOURCE_DIR}/test/_cpp DEPENDS mscclpp_py mscclpp_py_test -) +) \ No newline at end of file diff --git a/python/csrc/algorithm.cpp b/python/csrc/algorithm.cpp index c8365566..f0d8980d 100644 --- a/python/csrc/algorithm.cpp +++ b/python/csrc/algorithm.cpp @@ -116,10 +116,15 @@ void register_algorithm(nb::module_& m) { .def("buffer_mode", &CollectiveRequest::bufferMode); m.def( - "cpp_get_default_flag_buffer", + "cpp_get_flag_buffer", []() { - auto [buffer, size] = getDefaultFlagBuffer(); - return std::make_pair(reinterpret_cast(buffer.get()), size); + auto [buffer, size] = getFlagBuffer(); + uintptr_t ptr = reinterpret_cast(buffer.get()); + // Transfer shared_ptr ownership into a capsule so Python's GC manages the lifetime. + auto prevent = std::make_unique>(std::move(buffer)); + nb::capsule owner(prevent.get(), [](void* p) noexcept { delete static_cast*>(p); }); + prevent.release(); // capsule now owns the pointer + return nb::make_tuple(ptr, size, owner); }, - "Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size)."); + "Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size, owner)."); } \ No newline at end of file diff --git a/python/csrc/core_py.cpp b/python/csrc/core_py.cpp index 1a884cb3..47d76ac4 100644 --- a/python/csrc/core_py.cpp +++ b/python/csrc/core_py.cpp @@ -307,6 +307,9 @@ void register_core(nb::module_& m) { } NB_MODULE(_mscclpp, m) { +#ifdef MSCCLPP_DISABLE_NB_LEAK_WARNINGS + nb::set_leak_warnings(false); +#endif register_env(m); register_error(m); register_port_channel(m); @@ -324,4 +327,4 @@ NB_MODULE(_mscclpp, m) { // ext register_algorithm_collection_builder(m); -} +} \ No newline at end of file diff --git a/python/mscclpp/_core/algorithm.py b/python/mscclpp/_core/algorithm.py index c712bf88..9b870582 100644 --- a/python/mscclpp/_core/algorithm.py +++ b/python/mscclpp/_core/algorithm.py @@ -19,7 +19,7 @@ from mscclpp._mscclpp import ( CppReduceOp, CppAlgorithmBuilder, CppAlgorithmCollection, - cpp_get_default_flag_buffer, + cpp_get_flag_buffer, ) __all__ = ["Algorithm", "AlgorithmBuilder", "AlgorithmCollection"] @@ -241,15 +241,22 @@ class AlgorithmCollection: self._algorithms.append(algorithm) -def get_default_flag_buffer() -> cp.ndarray: +_flag_buffer_cache = None + + +def get_flag_buffer() -> cp.ndarray: """Get the default flag buffer for algorithm selection. This buffer is used internally by default algorithms to store selection flags. It is allocated as a shared GPU buffer and can be accessed from Python. + The result is cached so all callers share the same buffer. Returns: A CuPy array representing the flag buffer on the GPU. """ - buffer_ptr, buffer_size = cpp_get_default_flag_buffer() - memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer_ptr, buffer_size, None), 0) - return cp.ndarray((buffer_size // 4,), dtype=cp.uint32, memptr=memptr) + global _flag_buffer_cache + if _flag_buffer_cache is None: + buffer_ptr, buffer_size, owner = cpp_get_flag_buffer() + memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer_ptr, buffer_size, owner), 0) + _flag_buffer_cache = cp.ndarray((buffer_size // 4,), dtype=cp.uint32, memptr=memptr) + return _flag_buffer_cache diff --git a/python/mscclpp/_core/comm.py b/python/mscclpp/_core/comm.py index f0c5c219..d42349dd 100644 --- a/python/mscclpp/_core/comm.py +++ b/python/mscclpp/_core/comm.py @@ -19,8 +19,8 @@ from mscclpp._mscclpp import ( CppTransport, CppTransportFlags, ) -import mpi4py import numpy as np +import pickle from mscclpp.utils import is_torch_tensor @@ -29,20 +29,40 @@ __all__ = ["CommGroup"] class CommGroup: def __init__( - self, mpi_comm: mpi4py.MPI.Comm = None, interfaceIpPortTrio: str = "", rank: int = None, size: int = None + self, + mpi_comm: "mpi4py.MPI.Comm" = None, + torch_group: "dist.ProcessGroup" = None, + interfaceIpPortTrio: str = "", + rank: int = None, + size: int = None, ): - if interfaceIpPortTrio == "": - self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size) + if interfaceIpPortTrio == "" and (mpi_comm is not None or torch_group is not None): uniq_id = None - if mpi_comm.rank == 0: - # similar to NCCL's unique id + rank, size = ( + (mpi_comm.Get_rank(), mpi_comm.Get_size()) + if mpi_comm is not None + else (torch_group.rank(), torch_group.size()) + ) + self.bootstrap = CppTcpBootstrap.create(rank, size) + if rank == 0: uniq_id = self.bootstrap.create_unique_id() - uniq_id_global = mpi_comm.bcast(uniq_id, 0) + if mpi_comm is not None: + import mpi4py + + uniq_id_global = mpi_comm.bcast(uniq_id, 0) + else: + import torch + import torch.distributed as dist + + if rank == 0: + uniq_id_global = uniq_id + pickled_data = pickle.dumps(uniq_id) + data_tensor = torch.frombuffer(bytearray(pickled_data), dtype=torch.uint8).clone() + else: + data_tensor = torch.zeros(256, dtype=torch.uint8) + dist.broadcast(data_tensor, src=0, group=torch_group) + uniq_id_global = pickle.loads(data_tensor.numpy().tobytes()) self.bootstrap.initialize(uniq_id_global) - elif mpi_comm: - # use this instead - self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size) - self.bootstrap.initialize(interfaceIpPortTrio) elif not interfaceIpPortTrio == "": assert rank >= 0 and size >= 1 self.bootstrap = CppTcpBootstrap.create(rank, size) diff --git a/python/mscclpp/ext/algorithm_collection_builder.py b/python/mscclpp/ext/algorithm_collection_builder.py index 80c68909..ddfb929f 100644 --- a/python/mscclpp/ext/algorithm_collection_builder.py +++ b/python/mscclpp/ext/algorithm_collection_builder.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import Union -from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection, get_default_flag_buffer +from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection, get_flag_buffer import atexit from mscclpp._mscclpp import CppAlgorithmCollectionBuilder @@ -58,7 +58,7 @@ class AlgorithmCollectionBuilder: rank: int, ) -> AlgorithmCollection: if self._flag_buffer is None: - self._flag_buffer = get_default_flag_buffer() + self._flag_buffer = get_flag_buffer() native_collection = self._builder.build_default_algorithms( int(scratch_buffer), scratch_buffer_size, self._flag_buffer.data.ptr, self._flag_buffer.nbytes, rank ) diff --git a/python/test/_cpp/proxy_test.cpp b/python/test/_cpp/proxy_test.cpp index 5bc18e23..697a5c38 100644 --- a/python/test/_cpp/proxy_test.cpp +++ b/python/test/_cpp/proxy_test.cpp @@ -63,10 +63,13 @@ class MyProxyService { }; NB_MODULE(_ext, m) { +#ifdef MSCCLPP_DISABLE_NB_LEAK_WARNINGS + nb::set_leak_warnings(false); +#endif nb::class_(m, "MyProxyService") .def(nb::init(), nb::arg("rank"), nb::arg("nranks"), nb::arg("data_size"), nb::arg("reg_mem_list"), nb::arg("sem_list")) .def("fifo_device_handle", &MyProxyService::fifoDeviceHandle) .def("start", &MyProxyService::start) .def("stop", &MyProxyService::stop); -} +} \ No newline at end of file diff --git a/src/core/algorithm.cc b/src/core/algorithm.cc index 98ac5520..683d4ddd 100644 --- a/src/core/algorithm.cc +++ b/src/core/algorithm.cc @@ -199,18 +199,23 @@ std::shared_ptr DslAlgorithm::build() { return shared_from_this(); } // TODO: implement this void DslAlgorithm::reset() {} -static std::weak_ptr gDefaultFlagBuffer; +static uint32_t* gDefaultFlagBuffer = nullptr; +static std::weak_ptr gDefaultFlagBufferWeak; static size_t gDefaultFlagCount = 128; -std::pair, size_t> getDefaultFlagBuffer() { - std::shared_ptr flagBuffer = gDefaultFlagBuffer.lock(); - if (!flagBuffer) { - flagBuffer = mscclpp::detail::gpuCallocShared(gDefaultFlagCount); - std::vector initFlags(gDefaultFlagCount, 1); - mscclpp::gpuMemcpy(flagBuffer.get(), initFlags.data(), gDefaultFlagCount, cudaMemcpyHostToDevice); - gDefaultFlagBuffer = flagBuffer; +std::pair, size_t> getFlagBuffer() { + auto ptr = gDefaultFlagBufferWeak.lock(); + if (!ptr) { + if (!gDefaultFlagBuffer) { + // Intentionally never freed — CUDA driver reclaims GPU memory at process exit. + gDefaultFlagBuffer = static_cast(mscclpp::detail::gpuCalloc(gDefaultFlagCount * sizeof(uint32_t))); + std::vector initFlags(gDefaultFlagCount, 1); + mscclpp::gpuMemcpy(gDefaultFlagBuffer, initFlags.data(), gDefaultFlagCount, cudaMemcpyHostToDevice); + } + ptr = std::shared_ptr(gDefaultFlagBuffer, [](void*) {}); + gDefaultFlagBufferWeak = ptr; } - return {flagBuffer, gDefaultFlagCount * sizeof(uint32_t)}; + return {ptr, gDefaultFlagCount * sizeof(uint32_t)}; } } // namespace mscclpp \ No newline at end of file diff --git a/src/ext/nccl/nccl.cc b/src/ext/nccl/nccl.cc index bfde4786..afeb5bdb 100644 --- a/src/ext/nccl/nccl.cc +++ b/src/ext/nccl/nccl.cc @@ -294,7 +294,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI commPtr->scratchBuffer_ = mscclpp::GpuBuffer(commPtr->scratchBufferSize_).memory(); commPtr->executor = std::make_shared(mscclppComm, commPtr->scratchBuffer_); - auto [buffer, size] = mscclpp::getDefaultFlagBuffer(); + auto [buffer, size] = mscclpp::getFlagBuffer(); commPtr->flagBuffer_ = buffer; commPtr->flagBufferSize_ = size; diff --git a/test/mscclpp-test/CMakeLists.txt b/test/mscclpp-test/CMakeLists.txt index eb2b26ca..241b7e02 100644 --- a/test/mscclpp-test/CMakeLists.txt +++ b/test/mscclpp-test/CMakeLists.txt @@ -1,7 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.2/json.tar.xz) +FetchContent_Declare(json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 +) FetchContent_MakeAvailable(json) function(add_mscclpp_test_executable name sources)