diff --git a/src/ext/collectives/allgather/allgather_fullmesh.cu b/src/ext/collectives/allgather/allgather_fullmesh.cu index 17054869..cbe199bc 100644 --- a/src/ext/collectives/allgather/allgather_fullmesh.cu +++ b/src/ext/collectives/allgather/allgather_fullmesh.cu @@ -11,8 +11,8 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allgatherFullmesh(void* buff, void* scratch, void* resultBuff, DeviceHandle* memoryChannels, - int rank, int nRanksPerNode, [[maybe_unused]] int worldSize, size_t nelems) { - const int nPeer = nRanksPerNode - 1; + int rank, int ipcDomainNranks, [[maybe_unused]] int worldSize, size_t nelems) { + const int nPeer = ipcDomainNranks - 1; const size_t chanOffset = nPeer * blockIdx.x; // assume (nelems * sizeof(T)) is divisible by 16 const size_t nInt4 = nelems * sizeof(int) / sizeof(int4); @@ -129,11 +129,11 @@ CommResult AllgatherFullmesh::allgatherKernelFunc(const std::shared_ptr ct if ((char*)input == (char*)output + rank * inputSize) { allgatherFullmesh<<>>( (void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank, - ctx->nRanksPerNode, ctx->workSize, nElem); + ctx->ipcDomainNranks, ctx->workSize, nElem); } else { allgatherFullmesh<<>>( (void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank, - ctx->nRanksPerNode, ctx->workSize, nElem); + ctx->ipcDomainNranks, ctx->workSize, nElem); } cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { @@ -150,7 +150,7 @@ std::shared_ptr AllgatherFullmesh::initAllgatherContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup semaphores ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); diff --git a/src/ext/collectives/allgather/allgather_fullmesh_2.cu b/src/ext/collectives/allgather/allgather_fullmesh_2.cu index 9d169d68..6e69f81c 100644 --- a/src/ext/collectives/allgather/allgather_fullmesh_2.cu +++ b/src/ext/collectives/allgather/allgather_fullmesh_2.cu @@ -12,7 +12,7 @@ __device__ DeviceSyncer deviceSyncer; template __global__ void __launch_bounds__(1024, 1) allgatherFullmesh2(void* sendbuff, mscclpp::DeviceHandle* memoryChannels, - size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t nRanksPerNode, + size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t ipcDomainNranks, size_t nelemsPerGPU) { const size_t tid = threadIdx.x + blockIdx.x * blockDim.x; const size_t lid = tid % WARP_SIZE; @@ -20,7 +20,7 @@ __global__ void __launch_bounds__(1024, 1) const size_t nThread = blockDim.x * gridDim.x; const size_t nWarp = nThread / WARP_SIZE; - const size_t nPeer = nRanksPerNode - 1; + const size_t nPeer = ipcDomainNranks - 1; const size_t chanOffset = nPeer * blockIdx.x; auto memChans = memoryChannels + chanOffset; @@ -140,11 +140,11 @@ CommResult AllgatherFullmesh2::allgatherKernelFunc(const std::shared_ptr c if ((char*)input == (char*)output + rank * inputSize) { allgatherFullmesh2<<>>( (void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize, - ctx->nRanksPerNode, nElem); + ctx->ipcDomainNranks, nElem); } else { allgatherFullmesh2<<>>( (void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize, - ctx->nRanksPerNode, nElem); + ctx->ipcDomainNranks, nElem); } cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { @@ -159,7 +159,7 @@ std::shared_ptr AllgatherFullmesh2::initAllgatherContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup semaphores ctx->memorySemaphores = this->memorySemaphores_; diff --git a/src/ext/collectives/allreduce/allreduce_allpair_packet.cu b/src/ext/collectives/allreduce/allreduce_allpair_packet.cu index 690d0eb4..5be2f336 100644 --- a/src/ext/collectives/allreduce/allreduce_allpair_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_allpair_packet.cu @@ -14,11 +14,11 @@ namespace collective { template __global__ void allreduceAllPairs(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, - size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerNode, + size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t nelems, uint32_t numScratchBuff, void* flags, uint32_t flagSize) { if (sizeof(T) == 2 || sizeof(T) == 1) nelems = (nelems * sizeof(T) + sizeof(T)) / sizeof(int); - const int nPeers = nRanksPerNode - 1; + const int nPeers = ipcDomainNranks - 1; uint32_t flag = ((uint32_t*)flags)[blockIdx.x]; size_t scratchBaseOffset = (flag % numScratchBuff) ? (scratchBufferSize / numScratchBuff) : 0; @@ -72,7 +72,7 @@ template struct AllpairAdapter { static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*, DeviceHandle*, DeviceHandle*, size_t channelInOffset, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void* flags, uint32_t flagSize, uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) { using ChannelType = DeviceHandle; @@ -84,7 +84,7 @@ struct AllpairAdapter { } allreduceAllPairs<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - nRanksPerNode, worldSize, nelems, numScratchBuff, flags, flagSize); + ipcDomainNranks, worldSize, nelems, numScratchBuff, flags, flagSize); return cudaGetLastError(); } }; @@ -108,7 +108,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptrworkSize); } // nBlocks must be at least nPeers for allpair — each block maps to one peer. - const int nPeers = algoCtx->nRanksPerNode - 1; + const int nPeers = algoCtx->ipcDomainNranks - 1; if (nPeers > 0 && blockAndThreadNum.first < nPeers) { return CommResult::CommInvalidArgument; } @@ -124,7 +124,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptrscratchBuffer_, output, algoCtx->memoryChannelDeviceHandles.get(), nullptr, nullptr, - nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->nRanksPerNode, + nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { @@ -140,7 +140,7 @@ std::shared_ptr AllreduceAllpairPacket::initAllreduceContext(std::shared_p const int nChannelsPerConnection = maxBlockNum_; ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->memorySemaphores_; ctx->registeredMemories = this->registeredMemories_; ctx->registeredMemories.pop_back(); // remove the local memory from previous context diff --git a/src/ext/collectives/allreduce/allreduce_fullmesh.cu b/src/ext/collectives/allreduce/allreduce_fullmesh.cu index 9d144c62..b95dcb28 100644 --- a/src/ext/collectives/allreduce/allreduce_fullmesh.cu +++ b/src/ext/collectives/allreduce/allreduce_fullmesh.cu @@ -13,8 +13,8 @@ template __global__ void __launch_bounds__(512, 1) allreduceFullmesh(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* memoryOutChannels, size_t channelOutDataOffset, int rank, - int nRanksPerNode, int worldSize, size_t nelems) { - const int nPeer = nRanksPerNode - 1; + int ipcDomainNranks, int worldSize, size_t nelems) { + const int nPeer = ipcDomainNranks - 1; const size_t chanOffset = nPeer * blockIdx.x; // assume (nelems * sizeof(T)) is divisible by (16 * worldSize) const size_t nInt4 = nelems * sizeof(T) / sizeof(int4); @@ -159,7 +159,7 @@ template struct AllreduceAllconnectAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* memoryOutChannels, DeviceHandle*, DeviceHandle*, size_t, - size_t channelOutDataOffset, size_t, int rank, int nRanksPerNode, int worldSize, + size_t channelOutDataOffset, size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; @@ -168,7 +168,7 @@ struct AllreduceAllconnectAdapter { if (nThreadsPerBlock == 0) nThreadsPerBlock = 512; allreduceFullmesh<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, (ChannelType*)memoryOutChannels, - channelOutDataOffset, rank, nRanksPerNode, worldSize, nelems); + channelOutDataOffset, rank, ipcDomainNranks, worldSize, nelems); return cudaGetLastError(); } }; @@ -225,7 +225,7 @@ CommResult AllreduceFullmesh::allreduceKernelFunc( } cudaError_t error = allreduce(input, this->scratchBuffer_, output, inputChannelHandles.get(), ctx->memoryChannelDeviceHandles.get(), - nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, + nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN("AllreduceAllconnect failed with error: %s", cudaGetErrorString(error)); @@ -252,7 +252,7 @@ std::shared_ptr AllreduceFullmesh::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup semaphores ctx->memorySemaphores = this->outputSemaphores_; diff --git a/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu b/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu index 2d71cd63..3ecb361f 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu @@ -20,15 +20,15 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] DeviceHandle* memoryChannels, [[maybe_unused]] DeviceHandle* switchChannels, [[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize, - [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerNode) { + [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 constexpr int alignment = 16; - int nPeers = nRanksPerNode - 1; - int nBlocksForCopy = nRanksPerNode * 2; - int nBlocksForReduce = nRanksPerNode; + int nPeers = ipcDomainNranks - 1; + int nBlocksForCopy = ipcDomainNranks * 2; + int nBlocksForReduce = ipcDomainNranks; int copyReduceRatio = nBlocksForCopy / nBlocksForReduce; - size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode; - size_t sizePerRank = size / nRanksPerNode; + size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks; + size_t sizePerRank = size / ipcDomainNranks; assert(sizePerRank % alignment == 0); uint32_t sizePerBlock = ((sizePerRank + (nBlocksForCopy - 1)) / nBlocksForCopy + alignment - 1) / alignment * alignment; @@ -68,7 +68,7 @@ __global__ void __launch_bounds__(1024, 1) deviceSemaphore[bid + 2 * nBlocksForCopy].acquire(); } __syncthreads(); - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t blockOffset = it * unitSize + bid * sizePerBlock + i * sizePerRank; uint32_t scratchOffset = scratchIt * unitSize + bid * scratchSizePerBlock + i * scratchSizePerRank; char* srcData = (char*)src + blockOffset; @@ -125,7 +125,7 @@ __global__ void __launch_bounds__(1024, 1) channels->wait(); } __syncthreads(); - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t blockOffset = it * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * sizePerBlock + i * sizePerRank; uint32_t scratchOffset = scratchIt * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * scratchSizePerBlock + @@ -150,7 +150,7 @@ template struct NvlsBlockPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*, DeviceHandle* nvlsChannels, DeviceHandle*, size_t, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -166,9 +166,9 @@ struct NvlsBlockPipelineAdapter { #endif { using ChannelType = DeviceHandle; - allreduceNvlsBlockPipeline - <<>>(input, scratch, output, (ChannelType*)memoryChannels, - nvlsChannels, inputSize, scratchBufferSize, rank, nRanksPerNode); + allreduceNvlsBlockPipeline<<>>( + input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank, + ipcDomainNranks); return cudaGetLastError(); } } @@ -200,11 +200,11 @@ CommResult AllreduceNvlsBlockPipeline::allreduceKernelFunc(const std::shared_ptr } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = {ctx->nRanksPerNode * 5, 1024}; + blockAndThreadNum = {ctx->ipcDomainNranks * 5, 1024}; } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, - ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, nullptr, 0, 0, + ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN("AllreduceNvlsBlockPipeline failed with error: %s", cudaGetErrorString(error)); @@ -222,7 +222,7 @@ std::shared_ptr AllreduceNvlsBlockPipeline::initAllreduceContext(std::shar auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup channels ctx->switchChannels = diff --git a/src/ext/collectives/allreduce/allreduce_nvls_packet.cu b/src/ext/collectives/allreduce/allreduce_nvls_packet.cu index d331cc67..2ef0516e 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_packet.cu @@ -94,7 +94,7 @@ std::shared_ptr AllreduceNvlsPacket::initAllreduceContext(std::shared_ptr< auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); // setup channels ctx->switchChannels = this->switchChannels_; @@ -123,7 +123,7 @@ CommResult AllreduceNvlsPacket::allreduceKernelFunc(const std::shared_ptr } cudaError_t error = allreduce(input, this->scratchBuffer_, output, nullptr, nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, - 0, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, + 0, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN(ALGO, "AllreduceNvlsPacket failed with error: ", cudaGetErrorString(error)); diff --git a/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu b/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu index 9be621f0..1bdac9ad 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu @@ -18,15 +18,15 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] DeviceHandle* memoryChannels, [[maybe_unused]] DeviceHandle* multicast, [[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize, [[maybe_unused]] int rank, - [[maybe_unused]] int nRanksPerNode) { + [[maybe_unused]] int ipcDomainNranks) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 constexpr int alignment = 16; - int nPeers = nRanksPerNode - 1; + int nPeers = ipcDomainNranks - 1; int nBlocks = gridDim.x; int nBlocksPerNvlsConn = nBlocks / NUM_NVLS_CONNECTION; int bid = blockIdx.x; - size_t sizePerRank = size / nRanksPerNode; - size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode; + size_t sizePerRank = size / ipcDomainNranks; + size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks; const size_t maxSizePerBlock = ((sizePerRank + nBlocks - 1) / nBlocks + alignment - 1) / alignment * alignment; size_t start = bid * maxSizePerBlock; size_t end = min(start + maxSizePerBlock, sizePerRank); @@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1) lastIterSize = sizePerBlock % copyPerIter; } - const size_t chanOffset = (nRanksPerNode - 1) * blockIdx.x * 2; + const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x * 2; auto memoryChans = memoryChannels + chanOffset; __shared__ DeviceHandle channels[(MAX_NRANKS_PER_NODE - 1) * 2]; const int lid = threadIdx.x % WARP_SIZE; @@ -68,7 +68,7 @@ __global__ void __launch_bounds__(1024, 1) const size_t iterSize = (it == nIter - 1) ? lastIterSize : copyPerIter; if (warpId < endCopyWid) { int tidInCopy = threadIdx.x; - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter; size_t offsetScratch = i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock; @@ -99,7 +99,7 @@ __global__ void __launch_bounds__(1024, 1) channels[tidInRecvCopy + nPeers].wait(); } asm volatile("bar.sync %0, %1;" ::"r"(3), "r"((NRECV_COPY_WARPS)*WARP_SIZE) : "memory"); - for (int i = 0; i < nRanksPerNode; i++) { + for (int i = 0; i < ipcDomainNranks; i++) { size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter; size_t offsetScratch = i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock; @@ -116,7 +116,7 @@ template struct NvlsWarpPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*, DeviceHandle* nvlsChannels, DeviceHandle*, size_t, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -132,9 +132,9 @@ struct NvlsWarpPipelineAdapter { #endif { using ChannelType = DeviceHandle; - allreduceNvlsWarpPipeline - <<>>(input, scratch, output, (ChannelType*)memoryChannels, - nvlsChannels, inputSize, scratchBufferSize, rank, nRanksPerNode); + allreduceNvlsWarpPipeline<<>>( + input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank, + ipcDomainNranks); return cudaGetLastError(); } } @@ -165,11 +165,11 @@ CommResult AllreduceNvlsWarpPipeline::allreduceKernelFunc( } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = {ctx->nRanksPerNode * 4, 1024}; + blockAndThreadNum = {ctx->ipcDomainNranks * 4, 1024}; } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, - ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, nullptr, 0, 0, + ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN("AllreduceNvlsWarpPipeline failed with error: %s", cudaGetErrorString(error)); @@ -187,7 +187,7 @@ std::shared_ptr AllreduceNvlsWarpPipeline::initAllreduceContext(std::share auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); // setup channels ctx->switchChannels = diff --git a/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu b/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu index 36fcf860..a9d46d4f 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu @@ -19,12 +19,12 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] mscclpp::DeviceHandle* multicast, [[maybe_unused]] mscclpp::DeviceHandle* multicastOut, [[maybe_unused]] size_t channelInOffset, [[maybe_unused]] size_t channelOutOffset, - [[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerNode) { + [[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 - int nPeers = nRanksPerNode - 1; + int nPeers = ipcDomainNranks - 1; int nBlocks = gridDim.x; int bid = blockIdx.x; - size_t sizePerRank = size / nRanksPerNode; + size_t sizePerRank = size / ipcDomainNranks; const size_t minAlign = 16; // Align sizePerBlock to 16 bytes to ensure aligned vector access in handleMultiLoadReduceStore size_t sizePerBlock = (sizePerRank + nBlocks - 1) / nBlocks; @@ -40,14 +40,14 @@ __global__ void __launch_bounds__(1024, 1) mscclpp::DeviceHandle* multicastPtr = multicast + bid; mscclpp::DeviceHandle* multicastOutPtr = multicastOut + bid; - const size_t chanOffset = (nRanksPerNode - 1) * blockIdx.x; + const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x; auto memoryChans = memoryChannels + chanOffset; __shared__ mscclpp::DeviceHandle channels[MAX_NRANKS_PER_NODE - 1]; const int lid = threadIdx.x % WARP_SIZE; // Each warp redundantly loads all entries (same value, benign race) so that // every warp has the data its threads will read after __syncwarp(). Required // when nPeers > WARP_SIZE (MNNVL/NVL72 → 71 peers). - for (int i = lid; i < nRanksPerNode - 1; i += WARP_SIZE) { + for (int i = lid; i < ipcDomainNranks - 1; i += WARP_SIZE) { channels[i] = memoryChans[i]; } __syncwarp(); @@ -75,7 +75,7 @@ struct NvlsAdapter { static cudaError_t call(const void*, void*, void*, void* memoryChannels, void*, mscclpp::DeviceHandle* nvlsChannels, mscclpp::DeviceHandle* nvlsOutChannels, size_t channelInOffset, - size_t channelOutOffset, size_t, int rank, int nRanksPerNode, int, size_t inputSize, + size_t channelOutOffset, size_t, int rank, int ipcDomainNranks, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -93,7 +93,7 @@ struct NvlsAdapter { using ChannelType = DeviceHandle; allreduceNvls<<>>((ChannelType*)memoryChannels, nvlsChannels, nvlsOutChannels, channelInOffset, channelOutOffset, - inputSize, rank, nRanksPerNode); + inputSize, rank, ipcDomainNranks); return cudaGetLastError(); } } @@ -145,7 +145,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo } std::pair numBlocksAndThreads = {nBlocks, nThreadsPerBlock}; if (numBlocksAndThreads.first == 0 || numBlocksAndThreads.second == 0) { - numBlocksAndThreads = {::min(ctx->nRanksPerNode, MAX_NBLOCKS), 1024}; + numBlocksAndThreads = {::min(ctx->ipcDomainNranks, MAX_NBLOCKS), 1024}; // For GB200 devices with MNNVLS (Multi-Node NVLink Sharp), scale the number of blocks inversely with // the number of GPUs. Empirically, 32 blocks works well for 4 GPUs and 16 for 8 GPUs, which // follows the formula 128 / nGPUs, clamped to [1, MAX_NBLOCKS]. @@ -159,7 +159,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo } cudaError_t error = allreduce(nullptr, nullptr, nullptr, this->memoryChannelsDeviceHandle_.get(), nullptr, nvlsChannels, - nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->nRanksPerNode, ctx->workSize, + nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN("AllreduceNvls failed with error: %s", cudaGetErrorString(error)); @@ -183,7 +183,7 @@ std::shared_ptr AllreduceNvls::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); size_t sendBytes, recvBytes; CUdeviceptr sendBasePtr, recvBasePtr; diff --git a/src/ext/collectives/allreduce/allreduce_packet.cu b/src/ext/collectives/allreduce/allreduce_packet.cu index d631c35a..ebb2f618 100644 --- a/src/ext/collectives/allreduce/allreduce_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_packet.cu @@ -15,7 +15,7 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allreducePacket(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle* memoryChannels, - size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, + size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t nelems, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff #if defined(ENABLE_NPKIT) , @@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1) else nelems = nelems / (sizeof(int) / sizeof(T)); - const int nPeers = nRanksPerNode - 1; + const int nPeers = ipcDomainNranks - 1; const size_t nPkts = nelems / 2; uint32_t flag = ((uint32_t*)flags)[blockIdx.x]; @@ -156,7 +156,7 @@ template struct PacketAdapter { static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*, DeviceHandle*, DeviceHandle*, size_t channelInOffset, size_t, - size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize, + size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) { using ChannelType = DeviceHandle; @@ -167,20 +167,20 @@ struct PacketAdapter { size_t sharedMemSize = sizeof(NpKitEvent) * NPKIT_SHM_NUM_EVENTS; allreducePacket<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - nRanksPerNode, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(), + ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else allreducePacket<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - nRanksPerNode, worldSize, nelems, flags, flagBufferSize, numScratchBuff); + ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff); #endif return cudaGetLastError(); } }; -inline std::pair getDefaultBlockNumAndThreadNum(size_t inputSize, int nRanksPerNode, int worldSize, +inline std::pair getDefaultBlockNumAndThreadNum(size_t inputSize, int ipcDomainNranks, int worldSize, [[maybe_unused]] DataType dtype) { - int nBlocks = (nRanksPerNode - 1) * 4; + int nBlocks = (ipcDomainNranks - 1) * 4; int nThreadsPerBlock = 1024; if (inputSize >= 32768) { nBlocks = (worldSize - 1) * 8; @@ -232,7 +232,7 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr ctx_ auto ctx = std::static_pointer_cast(ctx_void); std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->workSize, ctx->nRanksPerNode, dtype); + blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->workSize, ctx->ipcDomainNranks, dtype); } size_t sendBytes; @@ -248,7 +248,7 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr ctx_ } cudaError_t error = allreduce(input, this->scratchBuffer_, output, ctx->memoryChannelDeviceHandles.get(), nullptr, nullptr, nullptr, - channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, + channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { @@ -264,7 +264,7 @@ std::shared_ptr AllreducePacket::initAllreduceContext(std::shared_ptrrank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->memorySemaphores_; ctx->registeredMemories = this->registeredMemories_; ctx->registeredMemories.pop_back(); // remove the local memory from previous context diff --git a/src/ext/collectives/allreduce/allreduce_rsag.cu b/src/ext/collectives/allreduce/allreduce_rsag.cu index 4c46bf9b..93e2d0c4 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag.cu @@ -31,18 +31,18 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allreduceRsAg(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, - DeviceHandle* switchChannels, void* remoteMemories, int rank, int nRanksPerNode, + DeviceHandle* switchChannels, void* remoteMemories, int rank, int ipcDomainNranks, int worldSize, size_t nelems) { int blockId = blockIdx.x; - uint32_t nPeers = nRanksPerNode - 1; + uint32_t nPeers = ipcDomainNranks - 1; assert((uintptr_t)buff % sizeof(int4) == 0); assert((uintptr_t)resultBuff % sizeof(int4) == 0); constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T); - uint32_t alignedNelems = ((nelems + nRanksPerNode - 1) / nRanksPerNode + nelemsPerInt4 - 1) / nelemsPerInt4 * - nelemsPerInt4 * nRanksPerNode; - uint32_t nelemsPerRank = alignedNelems / nRanksPerNode; + uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 * + nelemsPerInt4 * ipcDomainNranks; + uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks; uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4; uint32_t lastInt4Index = nelems / nelemsPerInt4; uint32_t remainder = nelems % nelemsPerInt4; @@ -59,7 +59,7 @@ __global__ void __launch_bounds__(1024, 1) nInt4PerBlock += remainderForBlock; } if (nInt4PerBlock == 0) return; - uint32_t nInt4ForCopy = nInt4PerBlock * nRanksPerNode; + uint32_t nInt4ForCopy = nInt4PerBlock * ipcDomainNranks; for (uint32_t idx = threadIdx.x; idx < nInt4ForCopy; idx += blockDim.x) { int rankIdx = idx / nInt4PerBlock; @@ -84,13 +84,13 @@ __global__ void __launch_bounds__(1024, 1) if (offset > lastInt4Index) continue; int4 tmp = scratch4[offset]; for (uint32_t i = 0; i < nPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; int4 data = mscclpp::read(((void**)remoteMemories)[peerIdx], offset); tmp = calVector(data, tmp); } for (uint32_t i = 0; i < nPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; mscclpp::write(((void**)remoteMemories)[peerIdx], offset, tmp); } @@ -127,7 +127,7 @@ template struct AllreduceRsAgAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t, int rank, int nRanksPerNode, int worldSize, size_t inputSize, cudaStream_t stream, + size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -137,7 +137,7 @@ struct AllreduceRsAgAdapter { } allreduceRsAg<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - nRanksPerNode, worldSize, nelems); + ipcDomainNranks, worldSize, nelems); return cudaGetLastError(); } }; @@ -185,7 +185,7 @@ CommResult AllreduceRsAg::allreduceKernelFunc(const std::shared_ptr ctx, c } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(), this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, 0, algoCtx->rank, - algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, + algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); @@ -203,7 +203,7 @@ std::shared_ptr AllreduceRsAg::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->scratchSemaphores_; ctx->registeredMemories = this->remoteScratchMemories_; diff --git a/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu b/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu index eabe3dc5..9f63e590 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu @@ -86,7 +86,7 @@ template __global__ void __launch_bounds__(1024, 1) allreduceRsAgPipeline(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* switchChannels, void* remoteMemories, int rank, - int nRanksPerNode, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut, + int ipcDomainNranks, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut, uint32_t nblocksForReduce, uint32_t nblocksForRecv) { uint32_t bid = blockIdx.x; constexpr uint32_t nStepsPerIter = 4; @@ -94,7 +94,7 @@ __global__ void __launch_bounds__(1024, 1) uint32_t nInt4PerIter = nblocksForReduce * blockDim.x * nStepsPerIter; const uint32_t chunkSize = nInt4PerIter * worldSize; uint32_t nIters = (nInt4 + chunkSize - 1) / chunkSize; - uint32_t nPeers = nRanksPerNode - 1; + uint32_t nPeers = ipcDomainNranks - 1; int4* scratch4 = reinterpret_cast((char*)scratch); const uint32_t scratchIterStride = 2 * chunkSize; // one for AS, one for AG const uint32_t pipelineDepth = scratchSize / sizeof(int4) / scratchIterStride; @@ -111,7 +111,7 @@ __global__ void __launch_bounds__(1024, 1) __syncthreads(); uint32_t threadIdInPut = bid * blockDim.x + threadIdx.x; for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % nRanksPerNode; + int remoteRankId = (rank + peer + 1) % ipcDomainNranks; int peerId = remoteRankId < rank ? remoteRankId : remoteRankId - 1; // Read chunk[remoteRankId] from local buff, write to peer's scratch[rank] (sender's slot) uint32_t srcOffset = iter * chunkSize + remoteRankId * nInt4PerIter; @@ -164,7 +164,7 @@ __global__ void __launch_bounds__(1024, 1) int4 tmp = loadVec(buff, myChunkOffset, nelems); // Add data from each peer's slot in scratch (peer sent their chunk[rank] to our scratch[peer]) for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % nRanksPerNode; + int remoteRankId = (rank + peer + 1) % ipcDomainNranks; uint32_t peerSlotOffset = baseOffset + remoteRankId * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut; int4 data = scratch4[peerSlotOffset]; @@ -175,7 +175,7 @@ __global__ void __launch_bounds__(1024, 1) uint32_t dstOffset = baseOffset + chunkSize + rank * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut; for (uint32_t i = 0; i < nPeers; i++) { - int peerIdx = (rank + i + 1) % nRanksPerNode; + int peerIdx = (rank + i + 1) % ipcDomainNranks; int index = peerIdx < rank ? peerIdx : peerIdx - 1; mscclpp::write(((void**)remoteMemories)[index], dstOffset, tmp); } @@ -203,7 +203,7 @@ __global__ void __launch_bounds__(1024, 1) __syncthreads(); // Copy other ranks' reduced chunks from scratch to result for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % nRanksPerNode; + int remoteRankId = (rank + peer + 1) % ipcDomainNranks; for (uint32_t step = 0; step < nStepsPerIter * REDUCE_COPY_RATIO; step++) { uint32_t offset = baseOffset + chunkSize + remoteRankId * nInt4PerIter + threadIdInRecv + step * blockDim.x * nblocksForRecv; @@ -224,7 +224,7 @@ template struct AllreduceRsAgPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t scratchSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize, + size_t scratchSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -248,7 +248,7 @@ struct AllreduceRsAgPipelineAdapter { } allreduceRsAgPipeline<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - nRanksPerNode, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv); + ipcDomainNranks, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv); return cudaGetLastError(); } }; @@ -288,8 +288,8 @@ CommResult AllreduceRsAgPipeline::allreduceKernelFunc( std::pair numBlocksAndThreads = {nBlocks, nThreadsPerBlock}; cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(), this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, this->scratchBufferSize_, - algoCtx->rank, algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, nullptr, 0, - 0, numBlocksAndThreads.first, numBlocksAndThreads.second); + algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, + 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; @@ -306,7 +306,7 @@ std::shared_ptr AllreduceRsAgPipeline::initAllreduceContext(std::shared_pt auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode(); + ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode(); ctx->memorySemaphores = this->scratchSemaphores_; ctx->registeredMemories = this->remoteScratchMemories_; diff --git a/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu b/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu index 67eed6d3..ea664325 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu @@ -35,7 +35,7 @@ __device__ mscclpp::DeviceSyncer globalSyncer; // // This approach requires registering both input and output buffers as remote // memories (2 * nPeers handles), but avoids scratch buffer allocation and -// the extra copy steps of the standard RSAG. nRanksPerNode is accepted at +// the extra copy steps of the standard RSAG. ipcDomainNranks is accepted at // runtime, which allows the same kernel to handle any NVLink-domain size // (including Multi-Node NVLink fabrics up to NVL72). @@ -43,18 +43,18 @@ template __global__ void __launch_bounds__(1024, 1) allreduceRsAgZeroCopy(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* switchChannels, void* remoteMemories, int rank, - int nRanksPerNode, int worldSize, size_t nelems) { + int ipcDomainNranks, int worldSize, size_t nelems) { int blockId = blockIdx.x; assert((uintptr_t)buff % sizeof(int4) == 0); assert((uintptr_t)resultBuff % sizeof(int4) == 0); - const int NPeers = nRanksPerNode - 1; + const int NPeers = ipcDomainNranks - 1; constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T); const uint32_t outputRemoteBufferOffset = NPeers; - uint32_t alignedNelems = ((nelems + nRanksPerNode - 1) / nRanksPerNode + nelemsPerInt4 - 1) / nelemsPerInt4 * - nelemsPerInt4 * nRanksPerNode; - uint32_t nelemsPerRank = alignedNelems / nRanksPerNode; + uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 * + nelemsPerInt4 * ipcDomainNranks; + uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks; uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4; uint32_t nInt4Total = (nelems + nelemsPerInt4 - 1) / nelemsPerInt4; @@ -87,14 +87,14 @@ __global__ void __launch_bounds__(1024, 1) int4 data; AccumVec acc = mscclpp::upcastVector(tmp_raw); for (int i = 0; i < NPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; data = mscclpp::read(((void**)remoteMemories)[peerIdx], offset); acc = mscclpp::calVectorAccum(acc, data); } int4 tmp = mscclpp::downcastVector(acc); for (int i = 0; i < NPeers; i++) { - int rankIdx = (rank + i + 1) % nRanksPerNode; + int rankIdx = (rank + i + 1) % ipcDomainNranks; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; mscclpp::write(((void**)remoteMemories)[outputRemoteBufferOffset + peerIdx], offset, tmp); } @@ -112,7 +112,7 @@ template struct AllreduceRsAgZeroCopyAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t, int rank, int nRanksPerNode, int worldSize, size_t inputSize, cudaStream_t stream, + size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -125,7 +125,7 @@ struct AllreduceRsAgZeroCopyAdapter { } allreduceRsAgZeroCopy<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - nRanksPerNode, worldSize, nelems); + ipcDomainNranks, worldSize, nelems); return cudaGetLastError(); } }; @@ -159,8 +159,8 @@ CommResult AllreduceRsAgZeroCopy::allreduceKernelFunc(const std::shared_ptrbaseMemoryChannelHandles_.get(), algoCtx->remoteMemoryHandles.get(), - nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, - nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); + nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, + stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; @@ -186,7 +186,7 @@ std::shared_ptr AllreduceRsAgZeroCopy::initAllreduceContext(std::shared_pt auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->nRanksPerNode = getIpcDomainNranks(comm); + ctx->ipcDomainNranks = getIpcDomainNranks(comm); ctx->memorySemaphores = this->semaphores_; diff --git a/src/ext/collectives/include/collective_utils.hpp b/src/ext/collectives/include/collective_utils.hpp index 44a21402..7fa6a81e 100644 --- a/src/ext/collectives/include/collective_utils.hpp +++ b/src/ext/collectives/include/collective_utils.hpp @@ -27,8 +27,8 @@ namespace mscclpp { namespace collective { constexpr int NUM_NVLS_CONNECTION = 8; // Sized to cover MAX_NRANKS_PER_NODE-scale allreduce algos whose device-side -// semaphore indices grow as O(nRanksPerNode) (e.g. nvls_block_pipeline uses -// up to ~5 * nRanksPerNode entries). +// semaphore indices grow as O(ipcDomainNranks) (e.g. nvls_block_pipeline uses +// up to ~5 * ipcDomainNranks entries). constexpr int NUM_SEMAPHORES = 512; // Upper bound on the number of NVLink-reachable ranks that participate in a @@ -54,8 +54,8 @@ std::vector> setupMemorySemaphores /// Number of ranks that participate in the same GPU-IPC-reachable peer group (e.g. a single host or /// a Multi-Node NVLink fabric, or an AMD XGMI domain). Returns the value of `MSCCLPP_IPC_DOMAIN_NRANKS` /// if set to a positive value; otherwise falls back to `bootstrap->getNranksPerNode()`. This is -/// intentionally independent of `nRanksPerNode` so that algorithms can opt in to MNNVL-like behavior -/// without changing the meaning of bootstrap-level APIs. +/// intentionally independent of `Bootstrap::getNranksPerNode()` so that algorithms can opt in to +/// MNNVL-like behavior without changing the meaning of bootstrap-level APIs. int getIpcDomainNranks(std::shared_ptr comm); std::shared_ptr> setupMemoryChannelDeviceHandles( @@ -86,7 +86,7 @@ class AlgorithmCtx { public: int rank; int workSize; - int nRanksPerNode; + int ipcDomainNranks; std::vector registeredMemories; std::vector memoryChannels;