mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 09:17:06 +00:00
Merge branch 'main' into copilot/remove-gtest-use-custom-framework
This commit is contained in:
@@ -44,7 +44,7 @@ jobs:
|
||||
targetType: 'inline'
|
||||
script: |
|
||||
mkdir build && cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON ..
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON ..
|
||||
make -j
|
||||
workingDirectory: '$(System.DefaultWorkingDirectory)'
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ steps:
|
||||
targetType: inline
|
||||
script: |
|
||||
mkdir build && cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
make -j
|
||||
workingDirectory: '$(System.DefaultWorkingDirectory)'
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ steps:
|
||||
targetType: 'inline'
|
||||
script: |
|
||||
mkdir build && cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON ..
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON ..
|
||||
make -j
|
||||
workingDirectory: '$(System.DefaultWorkingDirectory)/mscclpp'
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ steps:
|
||||
targetType: 'inline'
|
||||
script: |
|
||||
mkdir build && cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
make -j
|
||||
workingDirectory: '$(System.DefaultWorkingDirectory)'
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ steps:
|
||||
set -e; \
|
||||
cd /root/mscclpp; \
|
||||
mkdir -p build && cd build; \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} -DMSCCLPP_NPKIT_FLAGS=\"-DENABLE_NPKIT -DENABLE_NPKIT_EVENT_TIME_SYNC_CPU -DENABLE_NPKIT_EVENT_TIME_SYNC_GPU -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY -DENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT\" ..; \
|
||||
make -j"'
|
||||
kill $CHILD_PID
|
||||
workingDirectory: '$(System.DefaultWorkingDirectory)'
|
||||
|
||||
@@ -20,9 +20,9 @@ steps:
|
||||
script: |
|
||||
mkdir build && cd build
|
||||
if [ "${{ parameters.platform }}" == "rocm" ]; then
|
||||
CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
CXX=/opt/rocm/bin/hipcc cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_ROCM=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
else
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON -DMSCCLPP_BUILD_TESTS=ON -DMSCCLPP_GPU_ARCHS=${{ parameters.gpuArch }} ..
|
||||
fi
|
||||
make -j
|
||||
cd ..
|
||||
|
||||
@@ -47,7 +47,7 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
|
||||
|
||||
# Options
|
||||
option(MSCCLPP_ENABLE_TRACE "Enable tracing" OFF)
|
||||
option(MSCCLPP_BUILD_TESTS "Build tests" ON)
|
||||
option(MSCCLPP_BUILD_TESTS "Build tests" OFF)
|
||||
option(MSCCLPP_BUILD_PYTHON_BINDINGS "Build Python bindings" ON)
|
||||
option(MSCCLPP_BUILD_EXT_NCCL "Build NCCL interfaces" ON)
|
||||
option(MSCCLPP_BUILD_EXT_COLLECTIVES "Build collective algorithms" ON)
|
||||
@@ -226,9 +226,11 @@ if(MSCCLPP_USE_IB)
|
||||
endif()
|
||||
find_package(NUMA REQUIRED)
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
include(FetchContent)
|
||||
FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz)
|
||||
FetchContent_Declare(json
|
||||
GIT_REPOSITORY https://github.com/nlohmann/json.git
|
||||
GIT_TAG v3.12.0
|
||||
)
|
||||
FetchContent_MakeAvailable(json)
|
||||
|
||||
if("${INSTALL_PREFIX}" STREQUAL "")
|
||||
|
||||
15
examples/tutorials/05-switch-channel/Makefile
Normal file
15
examples/tutorials/05-switch-channel/Makefile
Normal file
@@ -0,0 +1,15 @@
|
||||
CUDA_HOME ?= /usr/local/cuda
|
||||
|
||||
COMPILER := $(CUDA_HOME)/bin/nvcc
|
||||
ARCH_FLAG := -arch=native
|
||||
|
||||
TARGET = bidir_switch_channel
|
||||
SRC = bidir_switch_channel.cu
|
||||
|
||||
all: $(TARGET)
|
||||
|
||||
$(TARGET): $(SRC)
|
||||
$(COMPILER) $(ARCH_FLAG) -o $@ $< -lmscclpp
|
||||
|
||||
clean:
|
||||
rm -f $(TARGET)
|
||||
177
examples/tutorials/05-switch-channel/bidir_switch_channel.cu
Normal file
177
examples/tutorials/05-switch-channel/bidir_switch_channel.cu
Normal file
@@ -0,0 +1,177 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <mscclpp/concurrency_device.hpp>
|
||||
#include <mscclpp/core.hpp>
|
||||
#include <mscclpp/gpu_utils.hpp>
|
||||
#include <mscclpp/switch_channel.hpp>
|
||||
#include <mscclpp/switch_channel_device.hpp>
|
||||
#include <sstream>
|
||||
|
||||
#define PORT_NUMBER "50505"
|
||||
|
||||
template <typename... Args>
|
||||
void log(Args &&...args) {
|
||||
std::stringstream ss;
|
||||
(ss << ... << args);
|
||||
ss << std::endl;
|
||||
std::cout << ss.str();
|
||||
}
|
||||
|
||||
int spawn_process(std::function<void()> func) {
|
||||
pid_t pid = fork();
|
||||
if (pid < 0) return -1;
|
||||
if (pid == 0) {
|
||||
// Child process
|
||||
func();
|
||||
exit(0);
|
||||
}
|
||||
return pid;
|
||||
}
|
||||
|
||||
int wait_process(int pid) {
|
||||
int status;
|
||||
if (waitpid(pid, &status, 0) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (WIFEXITED(status)) {
|
||||
return WEXITSTATUS(status);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
__constant__ mscclpp::SwitchChannelDeviceHandle gConstSwitchChan;
|
||||
|
||||
__device__ mscclpp::DeviceSyncer devSyncer;
|
||||
|
||||
__global__ void kernelSwitchReduce(int rank, int numElements) {
|
||||
const int tid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
int stride = blockDim.x * gridDim.x;
|
||||
|
||||
// rank 0 performs on first half of data and rank 1 on second half
|
||||
int min = rank * (numElements / 2);
|
||||
int max = (rank + 1) * (numElements / 2);
|
||||
|
||||
for (int i = tid + min; i < max; i += stride) {
|
||||
auto val = gConstSwitchChan.reduce<mscclpp::f32x1>(i);
|
||||
gConstSwitchChan.broadcast(i, val);
|
||||
}
|
||||
}
|
||||
|
||||
void worker(int myRank, int gpuId, const std::string &ipPort) {
|
||||
MSCCLPP_CUDATHROW(cudaSetDevice(gpuId));
|
||||
const int nRanks = 2;
|
||||
const int iter = 1000;
|
||||
const size_t bufferBytes = 128 * 1024 * 1024;
|
||||
|
||||
log("Rank ", myRank, " (GPU ", gpuId, "): Preparing for tests ...");
|
||||
|
||||
// Build a connection and a semaphore
|
||||
auto bootstrap = std::make_shared<mscclpp::TcpBootstrap>(myRank, nRanks);
|
||||
bootstrap->initialize(ipPort);
|
||||
std::shared_ptr<mscclpp::Communicator> comm = std::make_shared<mscclpp::Communicator>(bootstrap);
|
||||
|
||||
std::vector<int> ranks;
|
||||
ranks.reserve(nRanks);
|
||||
for (int i = 0; i < nRanks; i++) ranks.push_back(i);
|
||||
|
||||
auto buffer = mscclpp::GpuBuffer<float>(bufferBytes);
|
||||
|
||||
auto nvlsConnection = mscclpp::connectNvlsCollective(comm, ranks, bufferBytes);
|
||||
|
||||
auto switchChannel = nvlsConnection->bindAllocatedMemory(CUdeviceptr(buffer.data()), bufferBytes);
|
||||
|
||||
auto deviceHandle = switchChannel.deviceHandle();
|
||||
|
||||
MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gConstSwitchChan, &deviceHandle, sizeof(deviceHandle)));
|
||||
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
|
||||
|
||||
// Call the kernel in a loop for perf evaluation
|
||||
|
||||
for (size_t numElements : {1024, 1024 * 1024, 32 * 1024 * 1024}) {
|
||||
cudaEvent_t start, end;
|
||||
if (myRank == 0) {
|
||||
MSCCLPP_CUDATHROW(cudaEventCreate(&start));
|
||||
MSCCLPP_CUDATHROW(cudaEventCreate(&end));
|
||||
}
|
||||
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
|
||||
bootstrap->barrier();
|
||||
|
||||
if (myRank == 0) {
|
||||
MSCCLPP_CUDATHROW(cudaEventRecord(start, 0));
|
||||
}
|
||||
|
||||
for (int i = 0; i < iter; ++i) {
|
||||
kernelSwitchReduce<<<256, 1024>>>(myRank, numElements);
|
||||
}
|
||||
|
||||
MSCCLPP_CUDATHROW(cudaGetLastError());
|
||||
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
|
||||
|
||||
comm->bootstrap()->barrier();
|
||||
|
||||
if (myRank == 0) {
|
||||
MSCCLPP_CUDATHROW(cudaEventRecord(end, 0));
|
||||
MSCCLPP_CUDATHROW(cudaEventSynchronize(end));
|
||||
float elapsedTime;
|
||||
float elapsedTimePerIter;
|
||||
float gbps;
|
||||
MSCCLPP_CUDATHROW(cudaEventElapsedTime(&elapsedTime, start, end));
|
||||
elapsedTimePerIter = elapsedTime / iter;
|
||||
float dataSize = numElements * 4;
|
||||
gbps = dataSize / elapsedTimePerIter * 1e-6f;
|
||||
log("Rank ", myRank, " (GPU ", gpuId, "): bytes ", dataSize, ", elapsed ", elapsedTimePerIter, " ms/iter, BW ",
|
||||
gbps, " GB/s");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
if (argc == 1) {
|
||||
int pid0 = spawn_process([]() { worker(0, 0, "lo:127.0.0.1:" PORT_NUMBER); });
|
||||
int pid1 = spawn_process([]() { worker(1, 1, "lo:127.0.0.1:" PORT_NUMBER); });
|
||||
if (pid0 < 0 || pid1 < 0) {
|
||||
log("Failed to spawn processes.");
|
||||
return -1;
|
||||
}
|
||||
int status0 = wait_process(pid0);
|
||||
int status1 = wait_process(pid1);
|
||||
if (status0 < 0 || status1 < 0) {
|
||||
log("Failed to wait for processes.");
|
||||
return -1;
|
||||
}
|
||||
if (status0 != 0 || status1 != 0) {
|
||||
log("One of the processes failed.");
|
||||
return -1;
|
||||
}
|
||||
log("Succeed!");
|
||||
return 0;
|
||||
} else if (argc == 4) {
|
||||
std::string ipPort = argv[1];
|
||||
int rank, gpuId;
|
||||
try {
|
||||
rank = std::stoi(argv[2]);
|
||||
gpuId = std::stoi(argv[3]);
|
||||
} catch (const std::exception &) {
|
||||
log("Error: rank and gpu_id must be valid integers.");
|
||||
return -1;
|
||||
}
|
||||
if (rank < 0 || rank > 2 || gpuId < 0) {
|
||||
log("Error: rank must be between 0 and 1 and gpu_id must be non-negative.");
|
||||
return -1;
|
||||
}
|
||||
worker(rank, gpuId, ipPort);
|
||||
log("Rank ", rank, ": Succeed!");
|
||||
return 0;
|
||||
} else {
|
||||
std::cerr << "Usage:\n"
|
||||
<< " " << argv[0] << " Run in intra-node mode\n"
|
||||
<< " " << argv[0] << " <ip_port> <rank> <gpu_id> Run in inter-node mode\n";
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -19,8 +19,8 @@ from mscclpp._mscclpp import (
|
||||
CppTransport,
|
||||
CppTransportFlags,
|
||||
)
|
||||
import mpi4py
|
||||
import numpy as np
|
||||
import pickle
|
||||
|
||||
from mscclpp.utils import is_torch_tensor
|
||||
|
||||
@@ -29,20 +29,35 @@ __all__ = ["CommGroup"]
|
||||
|
||||
class CommGroup:
|
||||
def __init__(
|
||||
self, mpi_comm: mpi4py.MPI.Comm = None, interfaceIpPortTrio: str = "", rank: int = None, size: int = None
|
||||
self,
|
||||
mpi_comm: "mpi4py.MPI.Comm" = None,
|
||||
torch_group: "dist.ProcessGroup" = None,
|
||||
interfaceIpPortTrio: str = "",
|
||||
rank: int = None,
|
||||
size: int = None,
|
||||
):
|
||||
if interfaceIpPortTrio == "":
|
||||
self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
|
||||
if interfaceIpPortTrio == "" and (mpi_comm is not None or torch_group is not None):
|
||||
uniq_id = None
|
||||
if mpi_comm.rank == 0:
|
||||
# similar to NCCL's unique id
|
||||
self.bootstrap = CppTcpBootstrap.create(rank, size)
|
||||
if rank == 0:
|
||||
uniq_id = self.bootstrap.create_unique_id()
|
||||
uniq_id_global = mpi_comm.bcast(uniq_id, 0)
|
||||
if mpi_comm is not None:
|
||||
import mpi4py
|
||||
|
||||
uniq_id_global = mpi_comm.bcast(uniq_id, 0)
|
||||
else:
|
||||
import torch
|
||||
import torch.distributed as dist
|
||||
|
||||
if rank == 0:
|
||||
uniq_id_global = uniq_id
|
||||
pickled_data = pickle.dumps(uniq_id)
|
||||
data_tensor = torch.frombuffer(bytearray(pickled_data), dtype=torch.uint8).clone()
|
||||
else:
|
||||
data_tensor = torch.zeros(256, dtype=torch.uint8)
|
||||
dist.broadcast(data_tensor, src=0, group=torch_group)
|
||||
uniq_id_global = pickle.loads(data_tensor.numpy().tobytes())
|
||||
self.bootstrap.initialize(uniq_id_global)
|
||||
elif mpi_comm:
|
||||
# use this instead
|
||||
self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
|
||||
self.bootstrap.initialize(interfaceIpPortTrio)
|
||||
elif not interfaceIpPortTrio == "":
|
||||
assert rank >= 0 and size >= 1
|
||||
self.bootstrap = CppTcpBootstrap.create(rank, size)
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT license.
|
||||
|
||||
FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.2/json.tar.xz)
|
||||
FetchContent_Declare(json
|
||||
GIT_REPOSITORY https://github.com/nlohmann/json.git
|
||||
GIT_TAG v3.12.0
|
||||
)
|
||||
FetchContent_MakeAvailable(json)
|
||||
|
||||
function(add_mscclpp_test_executable name sources)
|
||||
|
||||
Reference in New Issue
Block a user