diff --git a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu index 7aa97390..e89f6de4 100644 --- a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu +++ b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu @@ -24,32 +24,38 @@ namespace collective { #define ALLTOALLV_WARP_SIZE 32 #endif +using MultiNodeMode = AlltoallvFullmesh::MultiNodeMode; + // Context to hold all necessary state for alltoallv execution struct AllToAllVContext { int rank; int worldSize; int nRanksPerNode; - // Intra-node (CudaIpc) channels — MemoryChannel for direct NVLink copy + // MemoryChannel (CudaIpc) — used for intra-node (always) and cross-node (NVSwitch mode) std::vector registeredMemories; std::vector memoryChannels; std::vector> memorySemaphores; std::shared_ptr> memoryChannelDeviceHandles; - // Inter-node (IB) channels — PortChannel via ProxyService + // PortChannel (IB) — used for cross-node peers in IB mode only std::shared_ptr proxyService; std::vector portChannels; std::shared_ptr portChannelDeviceHandles; - // Peer locality map: peerIsLocal[peerIdx] = 1 if intra-node, 0 if inter-node - // peerIdx is the index into the channel arrays (0..nPeers-1), NOT the rank + // Peer locality map (IB mode only) std::shared_ptr d_peerIsLocal; // GPU array [nPeers] - // For inter-node peers, maps peerIdx → portChannel index (dense indexing) std::shared_ptr d_peerToPortChannelIdx; // GPU array [nPeers] - bool hasRemotePeers; // true if any inter-node connections exist + // Staging buffers (NVSwitch mode only): allocated via GpuBuffer (cuMemCreate → Fabric handles) + bool useStaging; + std::shared_ptr> inputStaging; + std::shared_ptr> outputStaging; - std::shared_ptr deviceSyncer; // GPU-allocated, for multi-block grid sync + // Which kernel dispatch path to use + AlltoallvFullmesh::MultiNodeMode mode; + + std::shared_ptr deviceSyncer; }; AlltoallvFullmesh::~AlltoallvFullmesh() = default; @@ -89,28 +95,27 @@ void AlltoallvFullmesh::initialize(std::shared_ptr comm) { int rank = comm->bootstrap()->getRank(); int nRanksPerNode = comm->bootstrap()->getNranksPerNode(); int localGpuIdx = rank % nRanksPerNode; - - // Use hybrid connections: CudaIpc for intra-node, IB for inter-node. - // On systems where CudaIpc works across nodes (e.g. GB200 NVSwitch), - // set MSCCLPP_FORCE_CUDAIPC=1 to skip IB and use CudaIpc for all peers. - const char* forceCudaIpc = std::getenv("MSCCLPP_FORCE_CUDAIPC"); - bool useIB = (getIBDeviceCount() > 0) && !(forceCudaIpc && std::string(forceCudaIpc) == "1"); bool isMultiNode = (worldSize_ > nRanksPerNode); - if (useIB && isMultiNode) { - this->conns_ = setupHybridConnections(comm, localGpuIdx); - // Check if any connections are actually inter-node - hasRemotePeers_ = false; - for (const auto& conn : this->conns_) { - if (!isIntraNodeConnection(conn)) { - hasRemotePeers_ = true; - break; - } - } - } else { - // Single-node or no IB: use CudaIpc for all + if (!isMultiNode) { + // ── Single-node: CudaIpc for all peers ───────────────────────────── + multiNodeMode_ = MultiNodeMode::SingleNode; this->conns_ = setupConnections(comm); - hasRemotePeers_ = false; + } else if (isNvlsSupported()) { + // ── GB200 NVSwitch: CudaIpc for ALL peers + staging GpuBuffers ───── + // GpuBuffer uses cuMemCreate → Fabric handles → cross-node CudaIpc works. + multiNodeMode_ = MultiNodeMode::NVSwitch; + this->conns_ = setupConnections(comm); + } else { + // ── IB: CudaIpc intra-node + IB inter-node ──────────────────────── + // For non-NVSwitch systems (H100 etc.) where CudaIpc doesn't work cross-node. + if (getIBDeviceCount() <= 0) { + throw Error("Multi-node alltoallv requires IB transport but no IB devices found. " + "Ensure IB drivers are loaded and devices are available.", + ErrorCode::InvalidUsage); + } + multiNodeMode_ = MultiNodeMode::IB; + this->conns_ = setupHybridConnections(comm, localGpuIdx); } } @@ -149,41 +154,47 @@ CommResult AlltoallvFullmesh::alltoallvKernelFunc( int nPeers = worldSize - 1; if (nPeers < 1) nPeers = 1; - if (algoCtx->hasRemotePeers) { - // Multi-node: use hybrid kernel with MemoryChannel (intra) + PortChannel (inter) - // PortChannel put() is single-threaded (FIFO push), so we use 1 block per peer. - // For large intra-node messages, multiple blocks per local peer would help, - // but keeping it simple for now: 1 block per peer for both local and remote. + // Determine send/recv buffer pointers. + // NVSwitch mode: copy PyTorch data to/from GpuBuffer staging buffers. + const void* sendBuff = input; + void* recvBuff = output; + + if (algoCtx->useStaging) { + sendBuff = algoCtx->inputStaging->data(); + recvBuff = algoCtx->outputStaging->data(); + MSCCLPP_CUDATHROW(cudaMemcpyAsync( + const_cast(sendBuff), input, + inputSize, cudaMemcpyDeviceToDevice, stream)); + } + + if (algoCtx->mode == MultiNodeMode::IB) { + // ── IB mode: PortChannel kernel for ALL peers ────────────────────── + // PortChannel handles both CudaIpc (intra) and IB (inter) connections + // via the ProxyService proxy thread. int numBlocks = nPeers; - alltoallvHybridKernel<<>>( - algoCtx->memoryChannelDeviceHandles.get(), + alltoallvPortChannelKernel<<>>( algoCtx->portChannelDeviceHandles.get(), - algoCtx->d_peerIsLocal.get(), - algoCtx->d_peerToPortChannelIdx.get(), - algoCtx->deviceSyncer.get(), rank, worldSize, - input, output, + sendBuff, recvBuff, d_sendCounts, d_sendDispls, d_recvCounts, d_recvDispls, d_remoteRecvDispls); } else { - // Single-node: use the optimized peer-parallel kernel (MemoryChannel only) + // ── SingleNode / NVSwitch mode: MemoryChannel kernel ─────────────── constexpr size_t SIZE_THRESHOLD = 1 << 20; // 1MB size_t avgMsgSize = inputSize / worldSize; if (avgMsgSize < SIZE_THRESHOLD) { - // Small messages: 1 block per peer, parallel signal/wait, no barrier int numBlocks = nPeers; alltoallvPeerParallelKernel<<>>( algoCtx->memoryChannelDeviceHandles.get(), algoCtx->deviceSyncer.get(), rank, worldSize, - input, output, + sendBuff, recvBuff, d_sendCounts, d_sendDispls, d_recvCounts, d_recvDispls, d_remoteRecvDispls); } else { - // Large messages: multiple blocks per peer for maximum put bandwidth. int blocksPerPeer = (nBlocks > 0 && nBlocks <= 128) ? ((nBlocks + nPeers - 1) / nPeers) : ALLTOALLV_DEFAULT_BLOCKS_PER_PEER; @@ -194,13 +205,19 @@ CommResult AlltoallvFullmesh::alltoallvKernelFunc( algoCtx->memoryChannelDeviceHandles.get(), algoCtx->deviceSyncer.get(), rank, worldSize, - input, output, + sendBuff, recvBuff, d_sendCounts, d_sendDispls, d_recvCounts, d_recvDispls, d_remoteRecvDispls); } } + if (algoCtx->useStaging) { + MSCCLPP_CUDATHROW(cudaMemcpyAsync( + output, recvBuff, + outputSize, cudaMemcpyDeviceToDevice, stream)); + } + if (cudaGetLastError() == cudaSuccess) { return CommResult::CommSuccess; } @@ -215,73 +232,75 @@ std::shared_ptr AlltoallvFullmesh::initAlltoallvContext( ctx->rank = comm->bootstrap()->getRank(); ctx->worldSize = comm->bootstrap()->getNranks(); ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); - ctx->hasRemotePeers = this->hasRemotePeers_; + ctx->mode = this->multiNodeMode_; + ctx->useStaging = (ctx->mode == MultiNodeMode::NVSwitch); int rank = ctx->rank; - int nRanksPerNode = ctx->nRanksPerNode; - int localGpuIdx = rank % nRanksPerNode; + int localGpuIdx = rank % ctx->nRanksPerNode; - // Determine transport flags for memory registration. - // If we have remote peers, register with both CudaIpc and IB transports. - TransportFlags allTransports = Transport::CudaIpc; - if (ctx->hasRemotePeers) { - allTransports |= getIBTransportForGpu(localGpuIdx); - } + if (ctx->mode == MultiNodeMode::NVSwitch) { + // ── NVSwitch (GB200): staging GpuBuffers + CudaIpc MemoryChannel for all peers + ctx->inputStaging = std::make_shared>(inputSize); + ctx->outputStaging = std::make_shared>(outputSize); - // Register memories for input and output buffers - RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports); - RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports); + TransportFlags allTransports = Transport::CudaIpc; + RegisteredMemory inputBufRegMem = comm->registerMemory( + ctx->inputStaging->data(), ctx->inputStaging->bytes(), allTransports); + RegisteredMemory outputBufRegMem = comm->registerMemory( + ctx->outputStaging->data(), ctx->outputStaging->bytes(), allTransports); - // Exchange output buffer registration with all peers (we write to peer's output buffer) - std::vector remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem); + std::vector remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem); - // Build peer locality map and channel index mappings - int nPeers = ctx->worldSize - 1; - std::vector peerIsLocal(nPeers, 1); - std::vector peerToPortChannelIdx(nPeers, -1); - int portChannelCount = 0; + constexpr int nChannelsPerConnection = 1; + ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); + ctx->memoryChannels = setupMemoryChannels( + this->conns_, ctx->memorySemaphores, remoteOutputMemories, inputBufRegMem, nChannelsPerConnection); + ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels); - for (size_t cid = 0; cid < this->conns_.size(); ++cid) { - if (!isIntraNodeConnection(this->conns_[cid])) { - peerIsLocal[cid] = 0; - peerToPortChannelIdx[cid] = portChannelCount++; - } - } + ctx->registeredMemories = std::move(remoteOutputMemories); + ctx->registeredMemories.push_back(inputBufRegMem); + ctx->registeredMemories.push_back(outputBufRegMem); - // Setup intra-node MemoryChannels (CudaIpc connections only) - constexpr int nChannelsPerConnection = 1; - ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); - ctx->memoryChannels = setupMemoryChannels( - this->conns_, - ctx->memorySemaphores, - remoteOutputMemories, // remote output buffers (where we write) - inputBufRegMem, // local input buffer (where we read from) - nChannelsPerConnection); - ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels); + } else if (ctx->mode == MultiNodeMode::IB) { + // ── IB: PortChannel for ALL peers (CudaIpc intra + IB inter connections) + TransportFlags allTransports = Transport::CudaIpc | getIBTransportForGpu(localGpuIdx); + RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports); + RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports); + + std::vector remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem); - // Setup inter-node PortChannels (IB connections only) - if (ctx->hasRemotePeers) { ctx->proxyService = std::make_shared(); - ctx->portChannels = setupPortChannels( + ctx->portChannels = setupAllPortChannels( ctx->proxyService, *comm, this->conns_, remoteOutputMemories, inputBufRegMem); ctx->portChannelDeviceHandles = setupPortChannelDeviceHandles(ctx->portChannels); ctx->proxyService->startProxy(true); + + ctx->registeredMemories = std::move(remoteOutputMemories); + ctx->registeredMemories.push_back(inputBufRegMem); + ctx->registeredMemories.push_back(outputBufRegMem); + + } else { + // ── SingleNode: CudaIpc MemoryChannel (direct PyTorch buffers) + TransportFlags allTransports = Transport::CudaIpc; + RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports); + RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports); + + std::vector remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem); + + constexpr int nChannelsPerConnection = 1; + ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); + ctx->memoryChannels = setupMemoryChannels( + this->conns_, ctx->memorySemaphores, remoteOutputMemories, inputBufRegMem, nChannelsPerConnection); + ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels); + + ctx->registeredMemories = std::move(remoteOutputMemories); + ctx->registeredMemories.push_back(inputBufRegMem); + ctx->registeredMemories.push_back(outputBufRegMem); } - // Copy peer locality info to GPU - ctx->d_peerIsLocal = mscclpp::detail::gpuCallocShared(nPeers); - mscclpp::gpuMemcpy(ctx->d_peerIsLocal.get(), peerIsLocal.data(), nPeers, cudaMemcpyHostToDevice); - ctx->d_peerToPortChannelIdx = mscclpp::detail::gpuCallocShared(nPeers); - mscclpp::gpuMemcpy(ctx->d_peerToPortChannelIdx.get(), peerToPortChannelIdx.data(), nPeers, cudaMemcpyHostToDevice); - - // Allocate GPU DeviceSyncer for multi-block grid-wide barrier (zero-initialized) + // Allocate GPU DeviceSyncer for multi-block grid-wide barrier ctx->deviceSyncer = mscclpp::detail::gpuCallocShared(); - // Keep registered memory references to prevent deallocation - ctx->registeredMemories = std::move(remoteOutputMemories); - ctx->registeredMemories.push_back(inputBufRegMem); - ctx->registeredMemories.push_back(outputBufRegMem); - return ctx; } diff --git a/src/ext/collectives/collective_utils.cc b/src/ext/collectives/collective_utils.cc index b2b22469..270223b3 100644 --- a/src/ext/collectives/collective_utils.cc +++ b/src/ext/collectives/collective_utils.cc @@ -124,6 +124,26 @@ std::vector setupPortChannels( return channels; } +std::vector setupAllPortChannels( + std::shared_ptr proxyService, + mscclpp::Communicator& comm, + const std::vector& connections, + const std::vector& remoteMemories, + mscclpp::RegisteredMemory localMemory) { + std::vector channels; + mscclpp::MemoryId srcMemId = proxyService->addMemory(localMemory); + for (size_t cid = 0; cid < connections.size(); ++cid) { + // Create PortChannel for EVERY connection (CudaIpc and IB alike). + // The ProxyService proxy thread handles both connection types: + // - CudaIpc: cudaMemcpyD2D via IPC-mapped pointer + // - IB: RDMA write via ibv_post_send + mscclpp::SemaphoreId semId = proxyService->buildAndAddSemaphore(comm, connections[cid]); + mscclpp::MemoryId dstMemId = proxyService->addMemory(remoteMemories[cid]); + channels.emplace_back(proxyService->portChannel(semId, dstMemId, srcMemId)); + } + return channels; +} + std::shared_ptr setupPortChannelDeviceHandles( const std::vector& portChannels) { if (portChannels.empty()) return nullptr; diff --git a/src/ext/collectives/include/alltoallv/alltoallv_fullmesh.hpp b/src/ext/collectives/include/alltoallv/alltoallv_fullmesh.hpp index 2467d9c6..22c1cf72 100644 --- a/src/ext/collectives/include/alltoallv/alltoallv_fullmesh.hpp +++ b/src/ext/collectives/include/alltoallv/alltoallv_fullmesh.hpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -34,6 +35,9 @@ class AlltoallvFullmesh : public AlgorithmBuilder { std::shared_ptr build() override; + // Multi-node transport mode, decided at initialize() time + enum class MultiNodeMode { SingleNode, NVSwitch, IB }; + private: void initialize(std::shared_ptr comm); @@ -51,7 +55,7 @@ class AlltoallvFullmesh : public AlgorithmBuilder { std::vector conns_; int worldSize_; - bool hasRemotePeers_; // true if any inter-node connections + MultiNodeMode multiNodeMode_ = MultiNodeMode::SingleNode; }; } // namespace collective diff --git a/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp b/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp index 2cc17159..e00773f0 100644 --- a/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp +++ b/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp @@ -512,6 +512,79 @@ __global__ void __launch_bounds__(1024) } } +/** + * PortChannel-only AllToAllV kernel for multi-node. + * + * Uses PortChannel (proxy-based) for ALL peers — both intra-node and inter-node. + * This follows the proven pattern from allgather_test_cpp.cu which works reliably + * on GB200 multi-node NVSwitch systems. + * + * For intra-node CudaIpc connections, the proxy performs cudaMemcpyD2D. + * For inter-node IB connections, the proxy performs RDMA writes. + * + * Each block handles one peer. Thread 0 pushes a put descriptor to the FIFO + * (single-threaded), which triggers the proxy to perform the data transfer. + * + * Launch config: <<>> + */ +__global__ void __launch_bounds__(1024) + alltoallvPortChannelKernel(PortChannelDeviceHandle* portChannels, + int rank, + int worldSize, + const void* sendBuff, + void* recvBuff, + const size_t* sendCounts, + const size_t* sendDispls, + const size_t* recvCounts, + const size_t* recvDispls, + const size_t* remoteRecvDispls) { + const int nPeers = worldSize - 1; + + // Handle trivial case (single rank) + if (nPeers == 0) { + const int gtid = threadIdx.x + blockIdx.x * blockDim.x; + const int nThreads = blockDim.x * gridDim.x; + if (sendCounts[rank] > 0) { + mscclpp::copy((char*)recvBuff + recvDispls[rank], + (void*)((const char*)sendBuff + sendDispls[rank]), + sendCounts[rank], gtid, nThreads); + } + return; + } + + // Phase 1: Local copy — all blocks cooperate using global thread IDs + const int gtid = threadIdx.x + blockIdx.x * blockDim.x; + const int nThreads = blockDim.x * gridDim.x; + if (sendCounts[rank] > 0) { + mscclpp::copy((char*)recvBuff + recvDispls[rank], + (void*)((const char*)sendBuff + sendDispls[rank]), + sendCounts[rank], gtid, nThreads); + } + + // Phase 2: Per-peer data transfer via PortChannel (proxy-based). + // Each block handles one peer: blockIdx.x == peerIdx. + const int peerIdx = blockIdx.x; + if (peerIdx >= nPeers) return; + + const int peer = peerIdx < rank ? peerIdx : peerIdx + 1; + + // Thread 0 pushes a put+signal+flush descriptor to the proxy FIFO. + // The proxy thread performs the actual data transfer (cudaMemcpy or RDMA). + if (threadIdx.x == 0 && sendCounts[peer] > 0) { + portChannels[peerIdx].putWithSignalAndFlush( + remoteRecvDispls[peer], // dst offset in peer's output buffer + sendDispls[peer], // src offset in our input buffer + sendCounts[peer] // bytes to transfer + ); + } + __syncthreads(); + + // Wait for incoming data from this peer + if (threadIdx.x == 0 && recvCounts[peer] > 0) { + portChannels[peerIdx].wait(); + } +} + #undef ALLTOALLV_WARP_SIZE } // namespace collective } // namespace mscclpp \ No newline at end of file diff --git a/src/ext/collectives/include/collective_utils.hpp b/src/ext/collectives/include/collective_utils.hpp index 59b32878..02c85096 100644 --- a/src/ext/collectives/include/collective_utils.hpp +++ b/src/ext/collectives/include/collective_utils.hpp @@ -78,6 +78,24 @@ std::vector setupPortChannels( const std::vector& remoteMemories, RegisteredMemory localMemory); +/// Setup PortChannels for ALL connections (both CudaIpc and IB) via ProxyService. +/// This follows the proven pattern from allgather_test_cpp.cu: +/// - CudaIpc connections: proxy does cudaMemcpyD2D +/// - IB connections: proxy does RDMA write +/// Creates one PortChannel per peer (dense indexing by peerIdx). +/// @param proxyService The ProxyService managing transfers +/// @param comm The communicator +/// @param connections All connections (mixed CudaIpc + IB) +/// @param remoteMemories Remote registered memories (one per peer) +/// @param localMemory Local registered memory +/// @return Vector of PortChannels (one per peer, in connection order) +std::vector setupAllPortChannels( + std::shared_ptr proxyService, + Communicator& comm, + const std::vector& connections, + const std::vector& remoteMemories, + RegisteredMemory localMemory); + /// Setup PortChannel device handles (GPU-allocated array). std::shared_ptr setupPortChannelDeviceHandles( const std::vector& portChannels);