Files
mscclpp/examples/tutorials/05-switch-channel/bidir_memory_channel.cu
2026-02-20 18:14:33 +00:00

262 lines
8.8 KiB
Plaintext

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <sys/wait.h>
#include <unistd.h>
#include <functional>
#include <iostream>
#include <mscclpp/concurrency_device.hpp>
#include <mscclpp/core.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <mscclpp/switch_channel.hpp>
#include <mscclpp/switch_channel_device.hpp>
#include <sstream>
#define PORT_NUMBER "50505"
template <typename... Args>
void log(Args &&...args) {
std::stringstream ss;
(ss << ... << args);
ss << std::endl;
std::cout << ss.str();
}
int spawn_process(std::function<void()> func) {
pid_t pid = fork();
if (pid < 0) return -1;
if (pid == 0) {
// Child process
func();
exit(0);
}
return pid;
}
int wait_process(int pid) {
int status;
if (waitpid(pid, &status, 0) < 0) {
return -1;
}
if (WIFEXITED(status)) {
return WEXITSTATUS(status);
}
return -1;
}
__constant__ mscclpp::SwitchChannelDeviceHandle gConstSwitchChan;
__device__ mscclpp::DeviceSyncer devSyncer;
__global__ void kernelSwitchReduce() {
auto val = gConstSwitchChan.reduce<mscclpp::f32x1>(0);
gConstSwitchChan.broadcast(0, val);
}
void worker(int myRank, int gpuId, const std::string &ipPort) {
MSCCLPP_CUDATHROW(cudaSetDevice(gpuId));
const int remoteRank = myRank == 0 ? 1 : 0;
const int nRanks = 2;
const int iter = 1000;
//const mscclpp::Transport transport = mscclpp::Transport::CudaIpc;
const size_t bufferBytes = 256 * 1024 * 1024;
//const size_t pktBufferBytes = 256 * 1024 * 1024;
log("Rank ", myRank, " (GPU ", gpuId, "): Preparing for tests ...");
// Build a connection and a semaphore
auto bootstrap = std::make_shared<mscclpp::TcpBootstrap>(myRank, nRanks);
bootstrap->initialize(ipPort);
log("Rank ", myRank, " (GPU ", gpuId, "): build communicator ...");
std::shared_ptr<mscclpp::Communicator> comm = std::make_shared<mscclpp::Communicator>(bootstrap);
//mscclpp::Communicator comm(bootstrap);
//auto conn = comm.connect({transport, {mscclpp::DeviceType::GPU, gpuId}}, remoteRank).get();
//auto sema = comm.buildSemaphore(conn, remoteRank).get();
std::vector<int> ranks;
ranks.reserve(nRanks);
for (int i = 0; i < nRanks; i++) ranks.push_back(i);
log("Rank ", myRank, " (GPU ", gpuId, "): Allocate Buffers ...");
auto buffer = mscclpp::GpuBuffer<float>(1024);
float data = static_cast<float>(myRank) + 1.0f;
cudaMemcpy(buffer.data(), &data, sizeof(data), cudaMemcpyHostToDevice);
log("Rank ", myRank, " (GPU ", gpuId, "): establish nvls connection ...");
auto nvlsConnection = mscclpp::connectNvlsCollective(comm, ranks, 1024);
log("Rank ", myRank, " (GPU ", gpuId, "): bind memory ...");
auto switchChannel = nvlsConnection->bindAllocatedMemory(CUdeviceptr(buffer.data()), 1024);
log("Rank ", myRank, " (GPU ", gpuId, "): synchronize ...");
auto deviceHandle = switchChannel.deviceHandle();
cudaMemcpyToSymbol(gConstSwitchChan, &deviceHandle, sizeof(deviceHandle));
cudaDeviceSynchronize();
log("Rank ", myRank, " (GPU ", gpuId, "): launch kernel ...");
comm->bootstrap()->barrier();
if (myRank == 0) {
kernelSwitchReduce<<<1, 1>>>();
cudaGetLastError();
cudaDeviceSynchronize();
}
comm->bootstrap()->barrier();
log("Rank ", myRank, " (GPU ", gpuId, "): kernel launched ...");
//mscclpp::GpuBuffer buffer(bufferBytes);
//mscclpp::GpuBuffer pktBuffer(pktBufferBytes);
//mscclpp::RegisteredMemory localRegMem = comm.registerMemory(buffer.data(), buffer.bytes(), transport);
//mscclpp::RegisteredMemory localPktRegMem = comm.registerMemory(pktBuffer.data(), pktBuffer.bytes(), transport);
//comm.sendMemory(localRegMem, remoteRank);
//comm.sendMemory(localPktRegMem, remoteRank);
//auto remoteRegMemFuture = comm.recvMemory(remoteRank);
//auto remotePktRegMemFuture = comm.recvMemory(remoteRank);
//mscclpp::RegisteredMemory remoteRegMem = remoteRegMemFuture.get();
//mscclpp::RegisteredMemory remotePktRegMem = remotePktRegMemFuture.get();
//mscclpp::MemoryChannel memChan(sema, /*dst*/ remoteRegMem, /*src*/ localRegMem);
//mscclpp::MemoryChannel memPktChan(sema, /*dst*/ remotePktRegMem, /*src*/ localRegMem,
// /*packetBuffer*/ localPktRegMem.data());
/*auto memChanHandle = memChan.deviceHandle();
auto memPktChanHandle = memPktChan.deviceHandle();
void *devHandle;
void *devPktHandle;
MSCCLPP_CUDATHROW(cudaMalloc(&devHandle, sizeof(memChanHandle)));
MSCCLPP_CUDATHROW(cudaMalloc(&devPktHandle, sizeof(memPktChanHandle)));
MSCCLPP_CUDATHROW(cudaMemcpy(devHandle, &memChanHandle, sizeof(memChanHandle), cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(devPktHandle, &memPktChanHandle, sizeof(memPktChanHandle), cudaMemcpyHostToDevice));
cudaStream_t stream;
MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
std::function<void(size_t)> kernels[3];
kernels[0] = [&](size_t copyBytes) {
bidirPutKernel<<<32, 1024, 0, stream>>>(reinterpret_cast<mscclpp::MemoryChannelDeviceHandle *>(devHandle),
copyBytes, myRank);
};
kernels[1] = [&](size_t copyBytes) {
bidirGetKernel<<<32, 1024, 0, stream>>>(reinterpret_cast<mscclpp::MemoryChannelDeviceHandle *>(devHandle),
copyBytes, myRank);
};
kernels[2] = [&](size_t copyBytes) {
static uint32_t flag = 1;
bidirPutPacketKernel<<<32, 1024, 0, stream>>>(reinterpret_cast<mscclpp::MemoryChannelDeviceHandle *>(devPktHandle),
copyBytes, myRank, flag++);
};
cudaEvent_t start, end;
if (myRank == 0) {
MSCCLPP_CUDATHROW(cudaEventCreate(&start));
MSCCLPP_CUDATHROW(cudaEventCreate(&end));
}
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
bootstrap->barrier();
for (int kernelId = 0; kernelId < 3; ++kernelId) {
const std::string testName = (kernelId == 0) ? "Bidir Put" : (kernelId == 1) ? "Bidir Get" : "Bidir Put Packets";
for (size_t copyBytes : {1024, 1024 * 1024, 128 * 1024 * 1024}) {
cudaGraph_t graph;
cudaGraphExec_t graphExec;
MSCCLPP_CUDATHROW(cudaGraphCreate(&graph, 0));
MSCCLPP_CUDATHROW(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal));
for (int i = 0; i < iter; ++i) {
kernels[kernelId](copyBytes);
}
MSCCLPP_CUDATHROW(cudaStreamEndCapture(stream, &graph));
MSCCLPP_CUDATHROW(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0));
// Synchronize before timing
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
bootstrap->barrier();
if (myRank == 0) {
MSCCLPP_CUDATHROW(cudaEventRecord(start, stream));
}
MSCCLPP_CUDATHROW(cudaGraphLaunch(graphExec, stream));
if (myRank == 0) {
MSCCLPP_CUDATHROW(cudaEventRecord(end, stream));
MSCCLPP_CUDATHROW(cudaEventSynchronize(end));
float elapsedTime;
float elapsedTimePerIter;
float gbps;
MSCCLPP_CUDATHROW(cudaEventElapsedTime(&elapsedTime, start, end));
elapsedTimePerIter = elapsedTime / iter;
gbps = float(copyBytes) / elapsedTimePerIter * 1e-6f;
log("Rank ", myRank, " (GPU ", gpuId, "): [", testName, "] bytes ", copyBytes, ", elapsed ", elapsedTimePerIter,
" ms/iter, BW ", gbps, " GB/s");
}
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaGraphExecDestroy(graphExec));
MSCCLPP_CUDATHROW(cudaGraphDestroy(graph));
}
}
bootstrap->barrier();
*/
}
int main(int argc, char **argv) {
if (argc == 1) {
int pid0 = spawn_process([]() { worker(0, 0, "lo:127.0.0.1:" PORT_NUMBER); });
int pid1 = spawn_process([]() { worker(1, 1, "lo:127.0.0.1:" PORT_NUMBER); });
if (pid0 < 0 || pid1 < 0) {
log("Failed to spawn processes.");
return -1;
}
int status0 = wait_process(pid0);
int status1 = wait_process(pid1);
if (status0 < 0 || status1 < 0) {
log("Failed to wait for processes.");
return -1;
}
if (status0 != 0 || status1 != 0) {
log("One of the processes failed.");
return -1;
}
log("Succeed!");
return 0;
} else if (argc == 4) {
std::string ipPort = argv[1];
int rank, gpuId;
try {
rank = std::stoi(argv[2]);
gpuId = std::stoi(argv[3]);
} catch (const std::exception &) {
log("Error: rank and gpu_id must be valid integers.");
return -1;
}
if (rank < 0 || rank > 2 || gpuId < 0) {
log("Error: rank must be between 0 and 1 and gpu_id must be non-negative.");
return -1;
}
worker(rank, gpuId, ipPort);
log("Rank ", rank, ": Succeed!");
return 0;
} else {
std::cerr << "Usage:\n"
<< " " << argv[0] << " Run in intra-node mode\n"
<< " " << argv[0] << " <ip_port> <rank> <gpu_id> Run in inter-node mode\n";
return -1;
}
}