diff --git a/.azure-pipelines/multi-nodes-test.yml b/.azure-pipelines/multi-nodes-test.yml index 97a95c94..914c2317 100644 --- a/.azure-pipelines/multi-nodes-test.yml +++ b/.azure-pipelines/multi-nodes-test.yml @@ -44,7 +44,7 @@ jobs: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/integration-test.yaml b/.azure-pipelines/templates/integration-test.yaml index b9dac24b..99ed6d04 100644 --- a/.azure-pipelines/templates/integration-test.yaml +++ b/.azure-pipelines/templates/integration-test.yaml @@ -19,7 +19,7 @@ steps: targetType: inline script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/nccl-test.yaml b/.azure-pipelines/templates/nccl-test.yaml index 1ce37d20..56b75d3f 100644 --- a/.azure-pipelines/templates/nccl-test.yaml +++ b/.azure-pipelines/templates/nccl-test.yaml @@ -27,7 +27,7 @@ steps: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)/mscclpp' diff --git a/.azure-pipelines/templates/ut-no-ib-env.yaml b/.azure-pipelines/templates/ut-no-ib-env.yaml index aa21c407..e6576f6d 100644 --- a/.azure-pipelines/templates/ut-no-ib-env.yaml +++ b/.azure-pipelines/templates/ut-no-ib-env.yaml @@ -16,7 +16,7 @@ steps: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/ut-npkit.yaml b/.azure-pipelines/templates/ut-npkit.yaml index 74b94fc1..d4456f89 100644 --- a/.azure-pipelines/templates/ut-npkit.yaml +++ b/.azure-pipelines/templates/ut-npkit.yaml @@ -63,7 +63,7 @@ steps: set -e; \ cd /root/mscclpp; \ mkdir -p build && cd build; \ - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \ + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \ make -j"' kill $CHILD_PID workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/templates/ut.yaml b/.azure-pipelines/templates/ut.yaml index e6e989e7..b25d11a9 100644 --- a/.azure-pipelines/templates/ut.yaml +++ b/.azure-pipelines/templates/ut.yaml @@ -20,9 +20,9 @@ steps: script: | mkdir build && cd build if [ "${{ parameters.platform }}" == "rocm" ]; then - CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. else - cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} .. fi make -j cd .. diff --git a/CMakeLists.txt b/CMakeLists.txt index fc065d29..a8d01c64 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,7 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) # Options option(MSCCLPP_ENABLE_TRACE "Enable tracing" OFF) -option(MSCCLPP_BUILD_TESTS "Build tests" ON) +option(MSCCLPP_BUILD_TESTS "Build tests" OFF) option(MSCCLPP_BUILD_PYTHON_BINDINGS "Build Python bindings" ON) option(MSCCLPP_BUILD_EXT_NCCL "Build NCCL interfaces" ON) option(MSCCLPP_BUILD_EXT_COLLECTIVES "Build collective algorithms" ON) @@ -226,9 +226,11 @@ if(MSCCLPP_USE_IB) endif() find_package(NUMA REQUIRED) find_package(Threads REQUIRED) - include(FetchContent) -FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz) +FetchContent_Declare(json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 +) FetchContent_MakeAvailable(json) if("${INSTALL_PREFIX}" STREQUAL "") diff --git a/examples/tutorials/05-switch-channel/Makefile b/examples/tutorials/05-switch-channel/Makefile new file mode 100644 index 00000000..1a211f64 --- /dev/null +++ b/examples/tutorials/05-switch-channel/Makefile @@ -0,0 +1,15 @@ +CUDA_HOME ?= /usr/local/cuda + +COMPILER := $(CUDA_HOME)/bin/nvcc +ARCH_FLAG := -arch=native + +TARGET = bidir_switch_channel +SRC = bidir_switch_channel.cu + +all: $(TARGET) + +$(TARGET): $(SRC) + $(COMPILER) $(ARCH_FLAG) -o $@ $< -lmscclpp + +clean: + rm -f $(TARGET) diff --git a/examples/tutorials/05-switch-channel/bidir_switch_channel.cu b/examples/tutorials/05-switch-channel/bidir_switch_channel.cu new file mode 100644 index 00000000..658e6f05 --- /dev/null +++ b/examples/tutorials/05-switch-channel/bidir_switch_channel.cu @@ -0,0 +1,177 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#define PORT_NUMBER "50505" + +template +void log(Args &&...args) { + std::stringstream ss; + (ss << ... << args); + ss << std::endl; + std::cout << ss.str(); +} + +int spawn_process(std::function func) { + pid_t pid = fork(); + if (pid < 0) return -1; + if (pid == 0) { + // Child process + func(); + exit(0); + } + return pid; +} + +int wait_process(int pid) { + int status; + if (waitpid(pid, &status, 0) < 0) { + return -1; + } + if (WIFEXITED(status)) { + return WEXITSTATUS(status); + } + return -1; +} + +__constant__ mscclpp::SwitchChannelDeviceHandle gConstSwitchChan; + +__device__ mscclpp::DeviceSyncer devSyncer; + +__global__ void kernelSwitchReduce(int rank, int numElements) { + const int tid = threadIdx.x + blockIdx.x * blockDim.x; + int stride = blockDim.x * gridDim.x; + + // rank 0 performs on first half of data and rank 1 on second half + int min = rank * (numElements / 2); + int max = (rank + 1) * (numElements / 2); + + for (int i = tid + min; i < max; i += stride) { + auto val = gConstSwitchChan.reduce(i); + gConstSwitchChan.broadcast(i, val); + } +} + +void worker(int myRank, int gpuId, const std::string &ipPort) { + MSCCLPP_CUDATHROW(cudaSetDevice(gpuId)); + const int nRanks = 2; + const int iter = 1000; + const size_t bufferBytes = 128 * 1024 * 1024; + + log("Rank ", myRank, " (GPU ", gpuId, "): Preparing for tests ..."); + + // Build a connection and a semaphore + auto bootstrap = std::make_shared(myRank, nRanks); + bootstrap->initialize(ipPort); + std::shared_ptr comm = std::make_shared(bootstrap); + + std::vector ranks; + ranks.reserve(nRanks); + for (int i = 0; i < nRanks; i++) ranks.push_back(i); + + auto buffer = mscclpp::GpuBuffer(bufferBytes); + + auto nvlsConnection = mscclpp::connectNvlsCollective(comm, ranks, bufferBytes); + + auto switchChannel = nvlsConnection->bindAllocatedMemory(CUdeviceptr(buffer.data()), bufferBytes); + + auto deviceHandle = switchChannel.deviceHandle(); + + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gConstSwitchChan, &deviceHandle, sizeof(deviceHandle))); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + // Call the kernel in a loop for perf evaluation + + for (size_t numElements : {1024, 1024 * 1024, 32 * 1024 * 1024}) { + cudaEvent_t start, end; + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventCreate(&start)); + MSCCLPP_CUDATHROW(cudaEventCreate(&end)); + } + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + bootstrap->barrier(); + + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventRecord(start, 0)); + } + + for (int i = 0; i < iter; ++i) { + kernelSwitchReduce<<<256, 1024>>>(myRank, numElements); + } + + MSCCLPP_CUDATHROW(cudaGetLastError()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + comm->bootstrap()->barrier(); + + if (myRank == 0) { + MSCCLPP_CUDATHROW(cudaEventRecord(end, 0)); + MSCCLPP_CUDATHROW(cudaEventSynchronize(end)); + float elapsedTime; + float elapsedTimePerIter; + float gbps; + MSCCLPP_CUDATHROW(cudaEventElapsedTime(&elapsedTime, start, end)); + elapsedTimePerIter = elapsedTime / iter; + float dataSize = numElements * 4; + gbps = dataSize / elapsedTimePerIter * 1e-6f; + log("Rank ", myRank, " (GPU ", gpuId, "): bytes ", dataSize, ", elapsed ", elapsedTimePerIter, " ms/iter, BW ", + gbps, " GB/s"); + } + } +} + +int main(int argc, char **argv) { + if (argc == 1) { + int pid0 = spawn_process([]() { worker(0, 0, "lo:127.0.0.1:" PORT_NUMBER); }); + int pid1 = spawn_process([]() { worker(1, 1, "lo:127.0.0.1:" PORT_NUMBER); }); + if (pid0 < 0 || pid1 < 0) { + log("Failed to spawn processes."); + return -1; + } + int status0 = wait_process(pid0); + int status1 = wait_process(pid1); + if (status0 < 0 || status1 < 0) { + log("Failed to wait for processes."); + return -1; + } + if (status0 != 0 || status1 != 0) { + log("One of the processes failed."); + return -1; + } + log("Succeed!"); + return 0; + } else if (argc == 4) { + std::string ipPort = argv[1]; + int rank, gpuId; + try { + rank = std::stoi(argv[2]); + gpuId = std::stoi(argv[3]); + } catch (const std::exception &) { + log("Error: rank and gpu_id must be valid integers."); + return -1; + } + if (rank < 0 || rank > 2 || gpuId < 0) { + log("Error: rank must be between 0 and 1 and gpu_id must be non-negative."); + return -1; + } + worker(rank, gpuId, ipPort); + log("Rank ", rank, ": Succeed!"); + return 0; + } else { + std::cerr << "Usage:\n" + << " " << argv[0] << " Run in intra-node mode\n" + << " " << argv[0] << " Run in inter-node mode\n"; + return -1; + } +} diff --git a/python/mscclpp/_core/comm.py b/python/mscclpp/_core/comm.py index f0c5c219..e74a0e6f 100644 --- a/python/mscclpp/_core/comm.py +++ b/python/mscclpp/_core/comm.py @@ -19,8 +19,8 @@ from mscclpp._mscclpp import ( CppTransport, CppTransportFlags, ) -import mpi4py import numpy as np +import pickle from mscclpp.utils import is_torch_tensor @@ -29,20 +29,35 @@ __all__ = ["CommGroup"] class CommGroup: def __init__( - self, mpi_comm: mpi4py.MPI.Comm = None, interfaceIpPortTrio: str = "", rank: int = None, size: int = None + self, + mpi_comm: "mpi4py.MPI.Comm" = None, + torch_group: "dist.ProcessGroup" = None, + interfaceIpPortTrio: str = "", + rank: int = None, + size: int = None, ): - if interfaceIpPortTrio == "": - self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size) + if interfaceIpPortTrio == "" and (mpi_comm is not None or torch_group is not None): uniq_id = None - if mpi_comm.rank == 0: - # similar to NCCL's unique id + self.bootstrap = CppTcpBootstrap.create(rank, size) + if rank == 0: uniq_id = self.bootstrap.create_unique_id() - uniq_id_global = mpi_comm.bcast(uniq_id, 0) + if mpi_comm is not None: + import mpi4py + + uniq_id_global = mpi_comm.bcast(uniq_id, 0) + else: + import torch + import torch.distributed as dist + + if rank == 0: + uniq_id_global = uniq_id + pickled_data = pickle.dumps(uniq_id) + data_tensor = torch.frombuffer(bytearray(pickled_data), dtype=torch.uint8).clone() + else: + data_tensor = torch.zeros(256, dtype=torch.uint8) + dist.broadcast(data_tensor, src=0, group=torch_group) + uniq_id_global = pickle.loads(data_tensor.numpy().tobytes()) self.bootstrap.initialize(uniq_id_global) - elif mpi_comm: - # use this instead - self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size) - self.bootstrap.initialize(interfaceIpPortTrio) elif not interfaceIpPortTrio == "": assert rank >= 0 and size >= 1 self.bootstrap = CppTcpBootstrap.create(rank, size) diff --git a/test/mscclpp-test/CMakeLists.txt b/test/mscclpp-test/CMakeLists.txt index eb2b26ca..241b7e02 100644 --- a/test/mscclpp-test/CMakeLists.txt +++ b/test/mscclpp-test/CMakeLists.txt @@ -1,7 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.2/json.tar.xz) +FetchContent_Declare(json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 +) FetchContent_MakeAvailable(json) function(add_mscclpp_test_executable name sources)