From 2a2fca8a587a658888fe5a21f5b42cd07bf6cec2 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 1 May 2026 19:06:07 +0000 Subject: [PATCH] Rename collective ctx/kernel param nRanksPerNode to ipcDomainNranks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The AlgorithmCtx field and the kernel/host parameters that hold the collective's IPC peer-group size were named nRanksPerNode, which is misleading on Multi-Node NVLink (where the value spans multiple hosts) and on AMD (where the relevant fabric is XGMI, not NVLink). Rename to ipcDomainNranks throughout the collective algorithms to match the neutral naming introduced for the env helper. Scope intentionally limited to src/ext/collectives/. The following are left untouched on purpose: - Bootstrap::getNranksPerNode() — physical-host detection, semantics unchanged. - Algorithm::Constraint::nRanksPerNode (public API in include/mscclpp/algorithm.hpp) and the DSL plan config in algorithm_collection_builder.cc — these describe a plan's required physical topology. - NCCL adapter (src/ext/nccl/) — preserves NCCL ABI compatibility. - MAX_NRANKS_PER_NODE — sizing constant for shared-memory arrays. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../allgather/allgather_fullmesh.cu | 10 +++---- .../allgather/allgather_fullmesh_2.cu | 10 +++---- .../allreduce/allreduce_allpair_packet.cu | 14 ++++----- .../allreduce/allreduce_fullmesh.cu | 12 ++++---- .../allreduce_nvls_block_pipeline.cu | 30 +++++++++---------- .../allreduce/allreduce_nvls_packet.cu | 4 +-- .../allreduce/allreduce_nvls_warp_pipeline.cu | 28 ++++++++--------- .../allreduce/allreduce_nvls_zero_copy.cu | 20 ++++++------- .../collectives/allreduce/allreduce_packet.cu | 20 ++++++------- .../collectives/allreduce/allreduce_rsag.cu | 24 +++++++-------- .../allreduce/allreduce_rsag_pipeline.cu | 22 +++++++------- .../allreduce/allreduce_rsag_zero_copy.cu | 26 ++++++++-------- .../collectives/include/collective_utils.hpp | 10 +++---- 13 files changed, 115 insertions(+), 115 deletions(-) diff --git a/src/ext/collectives/allgather/allgather_fullmesh.cu b/src/ext/collectives/allgather/allgather_fullmesh.cu index 17054869..cbe199bc 100644 --- a/src/ext/collectives/allgather/allgather_fullmesh.cu +++ b/src/ext/collectives/allgather/allgather_fullmesh.cu @@ -11,8 +11,8 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allgatherFullmesh(void* buff, void* scratch, void* resultBuff, DeviceHandle* memoryChannels, - int rank, int nRanksPerNode, [[maybe_unused]] int worldSize, size_t nelems) { - const int nPeer = nRanksPerNode - 1; + int rank, int ipcDomainNranks, [[maybe_unused]] int worldSize, size_t nelems) { + const int nPeer = ipcDomainNranks - 1; const size_t chanOffset = nPeer * blockIdx.x; // assume (nelems * sizeof(T)) is divisible by 16 const size_t nInt4 = nelems * sizeof(int) / sizeof(int4); @@ -129,11 +129,11 @@ CommResult AllgatherFullmesh::allgatherKernelFunc(const std::shared_ptr ct if ((char*)input == (char*)output + rank * inputSize) { allgatherFullmesh<<>>( (void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank, - ctx->nRanksPerNode, ctx->workSize, nElem); + ctx->ipcDomainNranks, ctx->workSize, nElem); } else { allgatherFullmesh<<>>( (void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank, - ctx->nRanksPerNode, ctx->workSize, nElem); + ctx->ipcDomainNranks, ctx->workSize, nElem); } cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { @@ -150,7 +150,7 @@ std::shared_ptr AllgatherFullmesh::initAllgatherContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup semaphores ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); diff --git a/src/ext/collectives/allgather/allgather_fullmesh_2.cu b/src/ext/collectives/allgather/allgather_fullmesh_2.cu index 9d169d68..6e69f81c 100644 --- a/src/ext/collectives/allgather/allgather_fullmesh_2.cu +++ b/src/ext/collectives/allgather/allgather_fullmesh_2.cu @@ -12,7 +12,7 @@ __device__ DeviceSyncer deviceSyncer; template __global__ void __launch_bounds__(1024, 1) allgatherFullmesh2(void* sendbuff, mscclpp::DeviceHandle* memoryChannels, - size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t nRanksPerNode, + size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t ipcDomainNranks, size_t nelemsPerGPU) { const size_t tid = threadIdx.x + blockIdx.x * blockDim.x; const size_t lid = tid % WARP_SIZE; @@ -20,7 +20,7 @@ __global__ void __launch_bounds__(1024, 1) const size_t nThread = blockDim.x * gridDim.x; const size_t nWarp = nThread / WARP_SIZE; - const size_t nPeer = nRanksPerNode - 1; + const size_t nPeer = ipcDomainNranks - 1; const size_t chanOffset = nPeer * blockIdx.x; auto memChans = memoryChannels + chanOffset; @@ -140,11 +140,11 @@ CommResult AllgatherFullmesh2::allgatherKernelFunc(const std::shared_ptr c if ((char*)input == (char*)output + rank * inputSize) { allgatherFullmesh2<<>>( (void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize, - ctx->nRanksPerNode, nElem); + ctx->ipcDomainNranks, nElem); } else { allgatherFullmesh2<<>>( (void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize, - ctx->nRanksPerNode, nElem); + ctx->ipcDomainNranks, nElem); } cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { @@ -159,7 +159,7 @@ std::shared_ptr AllgatherFullmesh2::initAllgatherContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup semaphores ctx->memorySemaphores = this->memorySemaphores_; diff --git a/src/ext/collectives/allreduce/allreduce_allpair_packet.cu b/src/ext/collectives/allreduce/allreduce_allpair_packet.cu index 690d0eb4..5be2f336 100644 --- a/src/ext/collectives/allreduce/allreduce_allpair_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_allpair_packet.cu @@ -14,11 +14,11 @@ namespace collective { template __global__ void allreduceAllPairs(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, - size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerNode, + size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t nelems, uint32_t numScratchBuff, void* flags, uint32_t flagSize) { if (sizeof(T) == 2 || sizeof(T) == 1) nelems = (nelems * sizeof(T) + sizeof(T)) / sizeof(int); - const int nPeers = nRanksPerNode - 1; + const int nPeers = ipcDomainNranks - 1; uint32_t flag = ((uint32_t*)flags)[blockIdx.x]; size_t scratchBaseOffset = (flag % numScratchBuff) ? (scratchBufferSize / numScratchBuff) : 0; @@ -72,7 +72,7 @@ template struct AllpairAdapter { static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*, DeviceHandle*, DeviceHandle*, size_t channelInOffset, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void* flags, uint32_t flagSize, uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) { using ChannelType = DeviceHandle; @@ -84,7 +84,7 @@ struct AllpairAdapter { } allreduceAllPairs<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - nRanksPerNode, worldSize, nelems, numScratchBuff, flags, flagSize); + ipcDomainNranks, worldSize, nelems, numScratchBuff, flags, flagSize); return cudaGetLastError(); } }; @@ -108,7 +108,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptrworkSize); } // nBlocks must be at least nPeers for allpair — each block maps to one peer. - const int nPeers = algoCtx->nRanksPerNode - 1; + const int nPeers = algoCtx->ipcDomainNranks - 1; if (nPeers > 0 && blockAndThreadNum.first < nPeers) { return CommResult::CommInvalidArgument; } @@ -124,7 +124,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptrscratchBuffer_, output, algoCtx->memoryChannelDeviceHandles.get(), nullptr, nullptr, - nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->nRanksPerNode, + nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { @@ -140,7 +140,7 @@ std::shared_ptr AllreduceAllpairPacket::initAllreduceContext(std::shared_p const int nChannelsPerConnection = maxBlockNum_; ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->memorySemaphores_; ctx->registeredMemories = this->registeredMemories_; ctx->registeredMemories.pop_back(); // remove the local memory from previous context diff --git a/src/ext/collectives/allreduce/allreduce_fullmesh.cu b/src/ext/collectives/allreduce/allreduce_fullmesh.cu index 9d144c62..b95dcb28 100644 --- a/src/ext/collectives/allreduce/allreduce_fullmesh.cu +++ b/src/ext/collectives/allreduce/allreduce_fullmesh.cu @@ -13,8 +13,8 @@ template __global__ void __launch_bounds__(512, 1) allreduceFullmesh(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* memoryOutChannels, size_t channelOutDataOffset, int rank, - int nRanksPerNode, int worldSize, size_t nelems) { - const int nPeer = nRanksPerNode - 1; + int ipcDomainNranks, int worldSize, size_t nelems) { + const int nPeer = ipcDomainNranks - 1; const size_t chanOffset = nPeer * blockIdx.x; // assume (nelems * sizeof(T)) is divisible by (16 * worldSize) const size_t nInt4 = nelems * sizeof(T) / sizeof(int4); @@ -159,7 +159,7 @@ template struct AllreduceAllconnectAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* memoryOutChannels, DeviceHandle*, DeviceHandle*, size_t, - size_t channelOutDataOffset, size_t, int rank, int nRanksPerNode, int worldSize, + size_t channelOutDataOffset, size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; @@ -168,7 +168,7 @@ struct AllreduceAllconnectAdapter { if (nThreadsPerBlock == 0) nThreadsPerBlock = 512; allreduceFullmesh<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, (ChannelType*)memoryOutChannels, - channelOutDataOffset, rank, nRanksPerNode, worldSize, nelems); + channelOutDataOffset, rank, ipcDomainNranks, worldSize, nelems); return cudaGetLastError(); } }; @@ -225,7 +225,7 @@ CommResult AllreduceFullmesh::allreduceKernelFunc( } cudaError_t error = allreduce(input, this->scratchBuffer_, output, inputChannelHandles.get(), ctx->memoryChannelDeviceHandles.get(), - nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, + nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN("AllreduceAllconnect failed with error: %s", cudaGetErrorString(error)); @@ -252,7 +252,7 @@ std::shared_ptr AllreduceFullmesh::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup semaphores ctx->memorySemaphores = this->outputSemaphores_; diff --git a/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu b/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu index 2d71cd63..3ecb361f 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu @@ -20,15 +20,15 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] DeviceHandle* memoryChannels, [[maybe_unused]] DeviceHandle* switchChannels, [[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize, - [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerNode) { + [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 constexpr int alignment = 16; - int nPeers = nRanksPerNode - 1; - int nBlocksForCopy = nRanksPerNode * 2; - int nBlocksForReduce = nRanksPerNode; + int nPeers = ipcDomainNranks - 1; + int nBlocksForCopy = ipcDomainNranks * 2; + int nBlocksForReduce = ipcDomainNranks; int copyReduceRatio = nBlocksForCopy / nBlocksForReduce; - size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode; - size_t sizePerRank = size / nRanksPerNode; + size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks; + size_t sizePerRank = size / ipcDomainNranks; assert(sizePerRank % alignment == 0); uint32_t sizePerBlock = ((sizePerRank + (nBlocksForCopy - 1)) / nBlocksForCopy + alignment - 1) / alignment * alignment; @@ -68,7 +68,7 @@ __global__ void __launch_bounds__(1024, 1) deviceSemaphore[bid + 2 * nBlocksForCopy].acquire(); } __syncthreads(); - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t blockOffset = it * unitSize + bid * sizePerBlock + i * sizePerRank; uint32_t scratchOffset = scratchIt * unitSize + bid * scratchSizePerBlock + i * scratchSizePerRank; char* srcData = (char*)src + blockOffset; @@ -125,7 +125,7 @@ __global__ void __launch_bounds__(1024, 1) channels->wait(); } __syncthreads(); - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t blockOffset = it * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * sizePerBlock + i * sizePerRank; uint32_t scratchOffset = scratchIt * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * scratchSizePerBlock + @@ -150,7 +150,7 @@ template struct NvlsBlockPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*, DeviceHandle* nvlsChannels, DeviceHandle*, size_t, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -166,9 +166,9 @@ struct NvlsBlockPipelineAdapter { #endif { using ChannelType = DeviceHandle; - allreduceNvlsBlockPipeline - <<>>(input, scratch, output, (ChannelType*)memoryChannels, - nvlsChannels, inputSize, scratchBufferSize, rank, nRanksPerNode); + allreduceNvlsBlockPipeline<<>>( + input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank, + ipcDomainNranks); return cudaGetLastError(); } } @@ -200,11 +200,11 @@ CommResult AllreduceNvlsBlockPipeline::allreduceKernelFunc(const std::shared_ptr } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = {ctx->nRanksPerNode * 5, 1024}; + blockAndThreadNum = {ctx->ipcDomainNranks * 5, 1024}; } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, - ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, nullptr, 0, 0, + ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN("AllreduceNvlsBlockPipeline failed with error: %s", cudaGetErrorString(error)); @@ -222,7 +222,7 @@ std::shared_ptr AllreduceNvlsBlockPipeline::initAllreduceContext(std::shar auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup channels ctx->switchChannels = diff --git a/src/ext/collectives/allreduce/allreduce_nvls_packet.cu b/src/ext/collectives/allreduce/allreduce_nvls_packet.cu index d331cc67..2ef0516e 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_packet.cu @@ -94,7 +94,7 @@ std::shared_ptr AllreduceNvlsPacket::initAllreduceContext(std::shared_ptr< auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); // setup channels ctx->switchChannels = this->switchChannels_; @@ -123,7 +123,7 @@ CommResult AllreduceNvlsPacket::allreduceKernelFunc(const std::shared_ptr } cudaError_t error = allreduce(input, this->scratchBuffer_, output, nullptr, nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, - 0, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, + 0, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN(ALGO, "AllreduceNvlsPacket failed with error: ", cudaGetErrorString(error)); diff --git a/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu b/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu index 9be621f0..1bdac9ad 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu @@ -18,15 +18,15 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] DeviceHandle* memoryChannels, [[maybe_unused]] DeviceHandle* multicast, [[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize, [[maybe_unused]] int rank, - [[maybe_unused]] int nRanksPerNode) { + [[maybe_unused]] int ipcDomainNranks) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 constexpr int alignment = 16; - int nPeers = nRanksPerNode - 1; + int nPeers = ipcDomainNranks - 1; int nBlocks = gridDim.x; int nBlocksPerNvlsConn = nBlocks / NUM_NVLS_CONNECTION; int bid = blockIdx.x; - size_t sizePerRank = size / nRanksPerNode; - size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode; + size_t sizePerRank = size / ipcDomainNranks; + size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks; const size_t maxSizePerBlock = ((sizePerRank + nBlocks - 1) / nBlocks + alignment - 1) / alignment * alignment; size_t start = bid * maxSizePerBlock; size_t end = min(start + maxSizePerBlock, sizePerRank); @@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1) lastIterSize = sizePerBlock % copyPerIter; } - const size_t chanOffset = (nRanksPerNode - 1) * blockIdx.x * 2; + const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x * 2; auto memoryChans = memoryChannels + chanOffset; __shared__ DeviceHandle channels[(MAX_NRANKS_PER_NODE - 1) * 2]; const int lid = threadIdx.x % WARP_SIZE; @@ -68,7 +68,7 @@ __global__ void __launch_bounds__(1024, 1) const size_t iterSize = (it == nIter - 1) ? lastIterSize : copyPerIter; if (warpId < endCopyWid) { int tidInCopy = threadIdx.x; - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter; size_t offsetScratch = i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock; @@ -99,7 +99,7 @@ __global__ void __launch_bounds__(1024, 1) channels[tidInRecvCopy + nPeers].wait(); } asm volatile("bar.sync %0, %1;" ::"r"(3), "r"((NRECV_COPY_WARPS)*WARP_SIZE) : "memory"); - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter; size_t offsetScratch = i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock; @@ -116,7 +116,7 @@ template struct NvlsWarpPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*, DeviceHandle* nvlsChannels, DeviceHandle*, size_t, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -132,9 +132,9 @@ struct NvlsWarpPipelineAdapter { #endif { using ChannelType = DeviceHandle; - allreduceNvlsWarpPipeline - <<>>(input, scratch, output, (ChannelType*)memoryChannels, - nvlsChannels, inputSize, scratchBufferSize, rank, nRanksPerNode); + allreduceNvlsWarpPipeline<<>>( + input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank, + ipcDomainNranks); return cudaGetLastError(); } } @@ -165,11 +165,11 @@ CommResult AllreduceNvlsWarpPipeline::allreduceKernelFunc( } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = {ctx->nRanksPerNode * 4, 1024}; + blockAndThreadNum = {ctx->ipcDomainNranks * 4, 1024}; } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, - ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, nullptr, 0, 0, + ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN("AllreduceNvlsWarpPipeline failed with error: %s", cudaGetErrorString(error)); @@ -187,7 +187,7 @@ std::shared_ptr AllreduceNvlsWarpPipeline::initAllreduceContext(std::share auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup channels ctx->switchChannels = diff --git a/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu b/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu index 36fcf860..a9d46d4f 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu @@ -19,12 +19,12 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] mscclpp::DeviceHandle* multicast, [[maybe_unused]] mscclpp::DeviceHandle* multicastOut, [[maybe_unused]] size_t channelInOffset, [[maybe_unused]] size_t channelOutOffset, - [[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerNode) { + [[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 - int nPeers = nRanksPerNode - 1; + int nPeers = ipcDomainNranks - 1; int nBlocks = gridDim.x; int bid = blockIdx.x; - size_t sizePerRank = size / nRanksPerNode; + size_t sizePerRank = size / ipcDomainNranks; const size_t minAlign = 16; // Align sizePerBlock to 16 bytes to ensure aligned vector access in handleMultiLoadReduceStore size_t sizePerBlock = (sizePerRank + nBlocks - 1) / nBlocks; @@ -40,14 +40,14 @@ __global__ void __launch_bounds__(1024, 1) mscclpp::DeviceHandle* multicastPtr = multicast + bid; mscclpp::DeviceHandle* multicastOutPtr = multicastOut + bid; - const size_t chanOffset = (nRanksPerNode - 1) * blockIdx.x; + const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x; auto memoryChans = memoryChannels + chanOffset; __shared__ mscclpp::DeviceHandle channels[MAX_NRANKS_PER_NODE - 1]; const int lid = threadIdx.x % WARP_SIZE; // Each warp redundantly loads all entries (same value, benign race) so that // every warp has the data its threads will read after __syncwarp(). Required // when nPeers > WARP_SIZE (MNNVL/NVL72 → 71 peers). - for (int i = lid; i < nRanksPerNode - 1; i += WARP_SIZE) { + for (int i = lid; i < ipcDomainNranks - 1; i += WARP_SIZE) { channels[i] = memoryChans[i]; } __syncwarp(); @@ -75,7 +75,7 @@ struct NvlsAdapter { static cudaError_t call(const void*, void*, void*, void* memoryChannels, void*, mscclpp::DeviceHandle* nvlsChannels, mscclpp::DeviceHandle* nvlsOutChannels, size_t channelInOffset, - size_t channelOutOffset, size_t, int rank, int nRanksPerNode, int, size_t inputSize, + size_t channelOutOffset, size_t, int rank, int ipcDomainNranks, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -93,7 +93,7 @@ struct NvlsAdapter { using ChannelType = DeviceHandle; allreduceNvls<<>>((ChannelType*)memoryChannels, nvlsChannels, nvlsOutChannels, channelInOffset, channelOutOffset, - inputSize, rank, nRanksPerNode); + inputSize, rank, ipcDomainNranks); return cudaGetLastError(); } } @@ -145,7 +145,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo } std::pair numBlocksAndThreads = {nBlocks, nThreadsPerBlock}; if (numBlocksAndThreads.first == 0 || numBlocksAndThreads.second == 0) { - numBlocksAndThreads = {::min(ctx->nRanksPerNode, MAX_NBLOCKS), 1024}; + numBlocksAndThreads = {::min(ctx->ipcDomainNranks, MAX_NBLOCKS), 1024}; // For GB200 devices with MNNVLS (Multi-Node NVLink Sharp), scale the number of blocks inversely with // the number of GPUs. Empirically, 32 blocks works well for 4 GPUs and 16 for 8 GPUs, which // follows the formula 128 / nGPUs, clamped to [1, MAX_NBLOCKS]. @@ -159,7 +159,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo } cudaError_t error = allreduce(nullptr, nullptr, nullptr, this->memoryChannelsDeviceHandle_.get(), nullptr, nvlsChannels, - nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->nRanksPerNode, ctx->workSize, + nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN("AllreduceNvls failed with error: %s", cudaGetErrorString(error)); @@ -183,7 +183,7 @@ std::shared_ptr AllreduceNvls::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); size_t sendBytes, recvBytes; CUdeviceptr sendBasePtr, recvBasePtr; diff --git a/src/ext/collectives/allreduce/allreduce_packet.cu b/src/ext/collectives/allreduce/allreduce_packet.cu index d631c35a..ebb2f618 100644 --- a/src/ext/collectives/allreduce/allreduce_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_packet.cu @@ -15,7 +15,7 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allreducePacket(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle* memoryChannels, - size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, + size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t nelems, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff #if defined(ENABLE_NPKIT) , @@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1) else nelems = nelems / (sizeof(int) / sizeof(T)); - const int nPeers = nRanksPerNode - 1; + const int nPeers = ipcDomainNranks - 1; const size_t nPkts = nelems / 2; uint32_t flag = ((uint32_t*)flags)[blockIdx.x]; @@ -156,7 +156,7 @@ template struct PacketAdapter { static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*, DeviceHandle*, DeviceHandle*, size_t channelInOffset, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) { using ChannelType = DeviceHandle; @@ -167,20 +167,20 @@ struct PacketAdapter { size_t sharedMemSize = sizeof(NpKitEvent) * NPKIT_SHM_NUM_EVENTS; allreducePacket<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - nRanksPerNode, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(), + ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else allreducePacket<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - nRanksPerNode, worldSize, nelems, flags, flagBufferSize, numScratchBuff); + ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff); #endif return cudaGetLastError(); } }; -inline std::pair getDefaultBlockNumAndThreadNum(size_t inputSize, int nRanksPerNode, int worldSize, +inline std::pair getDefaultBlockNumAndThreadNum(size_t inputSize, int ipcDomainNranks, int worldSize, [[maybe_unused]] DataType dtype) { - int nBlocks = (nRanksPerNode - 1) * 4; + int nBlocks = (ipcDomainNranks - 1) * 4; int nThreadsPerBlock = 1024; if (inputSize >= 32768) { nBlocks = (worldSize - 1) * 8; @@ -232,7 +232,7 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr ctx_ auto ctx = std::static_pointer_cast(ctx_void); std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->workSize, ctx->nRanksPerNode, dtype); + blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->workSize, ctx->ipcDomainNranks, dtype); } size_t sendBytes; @@ -248,7 +248,7 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr ctx_ } cudaError_t error = allreduce(input, this->scratchBuffer_, output, ctx->memoryChannelDeviceHandles.get(), nullptr, nullptr, nullptr, - channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, + channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { @@ -264,7 +264,7 @@ std::shared_ptr AllreducePacket::initAllreduceContext(std::shared_ptrrank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->memorySemaphores_; ctx->registeredMemories = this->registeredMemories_; ctx->registeredMemories.pop_back(); // remove the local memory from previous context diff --git a/src/ext/collectives/allreduce/allreduce_rsag.cu b/src/ext/collectives/allreduce/allreduce_rsag.cu index 4c46bf9b..93e2d0c4 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag.cu @@ -31,18 +31,18 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allreduceRsAg(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, - DeviceHandle* switchChannels, void* remoteMemories, int rank, int nRanksPerNode, + DeviceHandle* switchChannels, void* remoteMemories, int rank, int ipcDomainNranks, int worldSize, size_t nelems) { int blockId = blockIdx.x; - uint32_t nPeers = nRanksPerNode - 1; + uint32_t nPeers = ipcDomainNranks - 1; assert((uintptr_t)buff % sizeof(int4) == 0); assert((uintptr_t)resultBuff % sizeof(int4) == 0); constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T); - uint32_t alignedNelems = ((nelems + nRanksPerNode - 1) / nRanksPerNode + nelemsPerInt4 - 1) / nelemsPerInt4 * - nelemsPerInt4 * nRanksPerNode; - uint32_t nelemsPerRank = alignedNelems / nRanksPerNode; + uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 * + nelemsPerInt4 * ipcDomainNranks; + uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks; uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4; uint32_t lastInt4Index = nelems / nelemsPerInt4; uint32_t remainder = nelems % nelemsPerInt4; @@ -59,7 +59,7 @@ __global__ void __launch_bounds__(1024, 1) nInt4PerBlock += remainderForBlock; } if (nInt4PerBlock == 0) return; - uint32_t nInt4ForCopy = nInt4PerBlock * nRanksPerNode; + uint32_t nInt4ForCopy = nInt4PerBlock * ipcDomainNranks; for (uint32_t idx = threadIdx.x; idx < nInt4ForCopy; idx += blockDim.x) { int rankIdx = idx / nInt4PerBlock; @@ -84,13 +84,13 @@ __global__ void __launch_bounds__(1024, 1) if (offset > lastInt4Index) continue; int4 tmp = scratch4[offset]; for (uint32_t i = 0; i < nPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; int4 data = mscclpp::read(((void**)remoteMemories)[peerIdx], offset); tmp = calVector(data, tmp); } for (uint32_t i = 0; i < nPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; mscclpp::write(((void**)remoteMemories)[peerIdx], offset, tmp); } @@ -127,7 +127,7 @@ template struct AllreduceRsAgAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t, int rank, int nRanksPerNode, int worldSize, size_t inputSize, cudaStream_t stream, + size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -137,7 +137,7 @@ struct AllreduceRsAgAdapter { } allreduceRsAg<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - nRanksPerNode, worldSize, nelems); + ipcDomainNranks, worldSize, nelems); return cudaGetLastError(); } }; @@ -185,7 +185,7 @@ CommResult AllreduceRsAg::allreduceKernelFunc(const std::shared_ptr ctx, c } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(), this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, 0, algoCtx->rank, - algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, + algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); @@ -203,7 +203,7 @@ std::shared_ptr AllreduceRsAg::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->scratchSemaphores_; ctx->registeredMemories = this->remoteScratchMemories_; diff --git a/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu b/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu index eabe3dc5..9f63e590 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu @@ -86,7 +86,7 @@ template __global__ void __launch_bounds__(1024, 1) allreduceRsAgPipeline(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* switchChannels, void* remoteMemories, int rank, - int nRanksPerNode, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut, + int ipcDomainNranks, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut, uint32_t nblocksForReduce, uint32_t nblocksForRecv) { uint32_t bid = blockIdx.x; constexpr uint32_t nStepsPerIter = 4; @@ -94,7 +94,7 @@ __global__ void __launch_bounds__(1024, 1) uint32_t nInt4PerIter = nblocksForReduce * blockDim.x * nStepsPerIter; const uint32_t chunkSize = nInt4PerIter * worldSize; uint32_t nIters = (nInt4 + chunkSize - 1) / chunkSize; - uint32_t nPeers = nRanksPerNode - 1; + uint32_t nPeers = ipcDomainNranks - 1; int4* scratch4 = reinterpret_cast((char*)scratch); const uint32_t scratchIterStride = 2 * chunkSize; // one for AS, one for AG const uint32_t pipelineDepth = scratchSize / sizeof(int4) / scratchIterStride; @@ -111,7 +111,7 @@ __global__ void __launch_bounds__(1024, 1) __syncthreads(); uint32_t threadIdInPut = bid * blockDim.x + threadIdx.x; for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % nRanksPerNode; + int remoteRankId = (rank + peer + 1) % ipcDomainNranks; int peerId = remoteRankId < rank ? remoteRankId : remoteRankId - 1; // Read chunk[remoteRankId] from local buff, write to peer's scratch[rank] (sender's slot) uint32_t srcOffset = iter * chunkSize + remoteRankId * nInt4PerIter; @@ -164,7 +164,7 @@ __global__ void __launch_bounds__(1024, 1) int4 tmp = loadVec(buff, myChunkOffset, nelems); // Add data from each peer's slot in scratch (peer sent their chunk[rank] to our scratch[peer]) for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % nRanksPerNode; + int remoteRankId = (rank + peer + 1) % ipcDomainNranks; uint32_t peerSlotOffset = baseOffset + remoteRankId * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut; int4 data = scratch4[peerSlotOffset]; @@ -175,7 +175,7 @@ __global__ void __launch_bounds__(1024, 1) uint32_t dstOffset = baseOffset + chunkSize + rank * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut; for (uint32_t i = 0; i < nPeers; i++) { - int peerIdx = (rank + i + 1) % nRanksPerNode; + int peerIdx = (rank + i + 1) % ipcDomainNranks; int index = peerIdx < rank ? peerIdx : peerIdx - 1; mscclpp::write(((void**)remoteMemories)[index], dstOffset, tmp); } @@ -203,7 +203,7 @@ __global__ void __launch_bounds__(1024, 1) __syncthreads(); // Copy other ranks' reduced chunks from scratch to result for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % nRanksPerNode; + int remoteRankId = (rank + peer + 1) % ipcDomainNranks; for (uint32_t step = 0; step < nStepsPerIter * REDUCE_COPY_RATIO; step++) { uint32_t offset = baseOffset + chunkSize + remoteRankId * nInt4PerIter + threadIdInRecv + step * blockDim.x * nblocksForRecv; @@ -224,7 +224,7 @@ template struct AllreduceRsAgPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t scratchSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize, + size_t scratchSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -248,7 +248,7 @@ struct AllreduceRsAgPipelineAdapter { } allreduceRsAgPipeline<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - nRanksPerNode, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv); + ipcDomainNranks, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv); return cudaGetLastError(); } }; @@ -288,8 +288,8 @@ CommResult AllreduceRsAgPipeline::allreduceKernelFunc( std::pair numBlocksAndThreads = {nBlocks, nThreadsPerBlock}; cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(), this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, this->scratchBufferSize_, - algoCtx->rank, algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, nullptr, 0, - 0, numBlocksAndThreads.first, numBlocksAndThreads.second); + algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, + 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; @@ -306,7 +306,7 @@ std::shared_ptr AllreduceRsAgPipeline::initAllreduceContext(std::shared_pt auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); ctx->memorySemaphores = this->scratchSemaphores_; ctx->registeredMemories = this->remoteScratchMemories_; diff --git a/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu b/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu index 67eed6d3..ea664325 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu @@ -35,7 +35,7 @@ __device__ mscclpp::DeviceSyncer globalSyncer; // // This approach requires registering both input and output buffers as remote // memories (2 * nPeers handles), but avoids scratch buffer allocation and -// the extra copy steps of the standard RSAG. nRanksPerNode is accepted at +// the extra copy steps of the standard RSAG. ipcDomainNranks is accepted at // runtime, which allows the same kernel to handle any NVLink-domain size // (including Multi-Node NVLink fabrics up to NVL72). @@ -43,18 +43,18 @@ template __global__ void __launch_bounds__(1024, 1) allreduceRsAgZeroCopy(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* switchChannels, void* remoteMemories, int rank, - int nRanksPerNode, int worldSize, size_t nelems) { + int ipcDomainNranks, int worldSize, size_t nelems) { int blockId = blockIdx.x; assert((uintptr_t)buff % sizeof(int4) == 0); assert((uintptr_t)resultBuff % sizeof(int4) == 0); - const int NPeers = nRanksPerNode - 1; + const int NPeers = ipcDomainNranks - 1; constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T); const uint32_t outputRemoteBufferOffset = NPeers; - uint32_t alignedNelems = ((nelems + nRanksPerNode - 1) / nRanksPerNode + nelemsPerInt4 - 1) / nelemsPerInt4 * - nelemsPerInt4 * nRanksPerNode; - uint32_t nelemsPerRank = alignedNelems / nRanksPerNode; + uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 * + nelemsPerInt4 * ipcDomainNranks; + uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks; uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4; uint32_t nInt4Total = (nelems + nelemsPerInt4 - 1) / nelemsPerInt4; @@ -87,14 +87,14 @@ __global__ void __launch_bounds__(1024, 1) int4 data; AccumVec acc = mscclpp::upcastVector(tmp_raw); for (int i = 0; i < NPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; data = mscclpp::read(((void**)remoteMemories)[peerIdx], offset); acc = mscclpp::calVectorAccum(acc, data); } int4 tmp = mscclpp::downcastVector(acc); for (int i = 0; i < NPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; mscclpp::write(((void**)remoteMemories)[outputRemoteBufferOffset + peerIdx], offset, tmp); } @@ -112,7 +112,7 @@ template struct AllreduceRsAgZeroCopyAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t, int rank, int nRanksPerNode, int worldSize, size_t inputSize, cudaStream_t stream, + size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -125,7 +125,7 @@ struct AllreduceRsAgZeroCopyAdapter { } allreduceRsAgZeroCopy<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - nRanksPerNode, worldSize, nelems); + ipcDomainNranks, worldSize, nelems); return cudaGetLastError(); } }; @@ -159,8 +159,8 @@ CommResult AllreduceRsAgZeroCopy::allreduceKernelFunc(const std::shared_ptrbaseMemoryChannelHandles_.get(), algoCtx->remoteMemoryHandles.get(), - nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, - nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); + nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, + stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; @@ -186,7 +186,7 @@ std::shared_ptr AllreduceRsAgZeroCopy::initAllreduceContext(std::shared_pt auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->semaphores_; diff --git a/src/ext/collectives/include/collective_utils.hpp b/src/ext/collectives/include/collective_utils.hpp index 44a21402..7fa6a81e 100644 --- a/src/ext/collectives/include/collective_utils.hpp +++ b/src/ext/collectives/include/collective_utils.hpp @@ -27,8 +27,8 @@ namespace mscclpp { namespace collective { constexpr int NUM_NVLS_CONNECTION = 8; // Sized to cover MAX_NRANKS_PER_NODE-scale allreduce algos whose device-side -// semaphore indices grow as O(nRanksPerNode) (e.g. nvls_block_pipeline uses -// up to ~5 * nRanksPerNode entries). +// semaphore indices grow as O(ipcDomainNranks) (e.g. nvls_block_pipeline uses +// up to ~5 * ipcDomainNranks entries). constexpr int NUM_SEMAPHORES = 512; // Upper bound on the number of NVLink-reachable ranks that participate in a @@ -54,8 +54,8 @@ std::vector> setupMemorySemaphores /// Number of ranks that participate in the same GPU-IPC-reachable peer group (e.g. a single host or /// a Multi-Node NVLink fabric, or an AMD XGMI domain). Returns the value of `MSCCLPP_IPC_DOMAIN_NRANKS` /// if set to a positive value; otherwise falls back to `bootstrap->getNranksPerNode()`. This is -/// intentionally independent of `nRanksPerNode` so that algorithms can opt in to MNNVL-like behavior -/// without changing the meaning of bootstrap-level APIs. +/// intentionally independent of `Bootstrap::getNranksPerNode()` so that algorithms can opt in to +/// MNNVL-like behavior without changing the meaning of bootstrap-level APIs. int getIpcDomainNranks(std::shared_ptr comm); std::shared_ptr> setupMemoryChannelDeviceHandles( @@ -86,7 +86,7 @@ class AlgorithmCtx { public: int rank; int workSize; - int nRanksPerNode; + int ipcDomainNranks; std::vector registeredMemories; std::vector memoryChannels;