From 594dc79657bc15cbfa9762d986bb4756726794c2 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Sat, 16 May 2026 23:19:25 +0000 Subject: [PATCH] Address NVLS review feedback Handle unsupported FP8 NVLS paths safely, tighten IPC-domain guards, align IPC-domain naming, and add IPC-domain fabric hash logging. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- include/mscclpp/switch_channel_device.hpp | 6 +- src/core/bootstrap/bootstrap.cc | 2 + src/core/include/execution_kernel.hpp | 10 ++- .../allgather/allgather_fullmesh.cu | 10 +-- .../allgather/allgather_fullmesh_2.cu | 12 ++-- .../allreduce/allreduce_allpair_packet.cu | 29 +++++---- .../allreduce/allreduce_fullmesh.cu | 12 ++-- .../allreduce_nvls_block_pipeline.cu | 32 ++++----- .../allreduce/allreduce_nvls_packet.cu | 4 +- .../allreduce/allreduce_nvls_warp_pipeline.cu | 30 ++++----- .../allreduce/allreduce_nvls_zero_copy.cu | 33 ++++++---- .../collectives/allreduce/allreduce_packet.cu | 38 ++++++----- .../collectives/allreduce/allreduce_rsag.cu | 26 ++++---- .../allreduce/allreduce_rsag_pipeline.cu | 24 +++---- .../allreduce/allreduce_rsag_zero_copy.cu | 18 +++-- .../allreduce_nvls_block_pipeline.hpp | 2 +- .../allreduce_nvls_warp_pipeline.hpp | 2 +- .../collectives/include/allreduce/common.hpp | 65 +++++++++++-------- .../collectives/include/collective_utils.hpp | 8 +-- 19 files changed, 203 insertions(+), 160 deletions(-) diff --git a/include/mscclpp/switch_channel_device.hpp b/include/mscclpp/switch_channel_device.hpp index e95dfcf5..df22bd3a 100644 --- a/include/mscclpp/switch_channel_device.hpp +++ b/include/mscclpp/switch_channel_device.hpp @@ -155,7 +155,7 @@ struct SwitchChannelDeviceHandle { } #endif else { - assert(false && "Unsupported vector type for multimemLoadReduce"); + static_assert(dependentFalse, "Unsupported vector type for multimemLoadReduce"); } return val; }; @@ -223,7 +223,7 @@ struct SwitchChannelDeviceHandle { } #endif else { - assert(false && "Unsupported vector type for multimemStore"); + static_assert(dependentFalse, "Unsupported vector type for multimemStore"); } }; @@ -248,7 +248,7 @@ struct SwitchChannelDeviceHandle { } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.red.relaxed.sys.global.add.f16x2 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); } else { - assert(false && "Unsupported vector type for multimemStoreReduce"); + static_assert(dependentFalse, "Unsupported vector type for multimemStoreReduce"); } }; #endif // defined(MSCCLPP_DEVICE_CUDA) diff --git a/src/core/bootstrap/bootstrap.cc b/src/core/bootstrap/bootstrap.cc index a5835751..ffdd9c1c 100644 --- a/src/core/bootstrap/bootstrap.cc +++ b/src/core/bootstrap/bootstrap.cc @@ -468,6 +468,8 @@ int TcpBootstrap::Impl::getNranksPerIpcDomain() { ++nRanksPerIpcDomain; } } + INFO(MSCCLPP_INIT, "rank %d IPC domain fabric hash 0x%016llx nRanksPerIpcDomain %d", rank_, + static_cast(ipcDomainHashes[rank_]), nRanksPerIpcDomain); nRanksPerIpcDomain_ = nRanksPerIpcDomain; return nRanksPerIpcDomain_; } diff --git a/src/core/include/execution_kernel.hpp b/src/core/include/execution_kernel.hpp index cb808bc8..e9095ada 100644 --- a/src/core/include/execution_kernel.hpp +++ b/src/core/include/execution_kernel.hpp @@ -525,7 +525,15 @@ MSCCLPP_DEVICE_INLINE void handleMultiLoadReduceStore(const Operation& op, uint3 if constexpr (std::is_same_v) { assert(false && "MULTI_LOAD_REDUCE_STORE is not supported for uint8_t data type"); return; - } else { + } +#if defined(__FP8_TYPES_EXIST__) && \ + (!(defined(__CUDA_ARCH_SPECIFIC__) || defined(__CUDA_ARCH_FAMILY_SPECIFIC__)) || (__CUDA_ARCH__ < 1000)) + else if constexpr (std::is_same_v || std::is_same_v) { + assert(false && "FP8 MULTI_LOAD_REDUCE_STORE requires sm_100a or newer"); + return; + } +#endif + else { static_assert(sizeof(T) <= 8, "Only support type with size <= 8 bytes"); const uint32_t size = min(op.inputBufferSizes[0] - offset, unitSize); if (size <= 0) { diff --git a/src/ext/collectives/allgather/allgather_fullmesh.cu b/src/ext/collectives/allgather/allgather_fullmesh.cu index 84dd4d47..570a2d61 100644 --- a/src/ext/collectives/allgather/allgather_fullmesh.cu +++ b/src/ext/collectives/allgather/allgather_fullmesh.cu @@ -11,8 +11,8 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allgatherFullmesh(void* buff, void* scratch, void* resultBuff, DeviceHandle* memoryChannels, - int rank, int ipcDomainNranks, [[maybe_unused]] int worldSize, size_t nelems) { - const int nPeer = ipcDomainNranks - 1; + int rank, int nRanksPerIpcDomain, [[maybe_unused]] int worldSize, size_t nelems) { + const int nPeer = nRanksPerIpcDomain - 1; const size_t chanOffset = nPeer * blockIdx.x; // assume (nelems * sizeof(T)) is divisible by 16 const size_t nInt4 = nelems * sizeof(int) / sizeof(int4); @@ -127,11 +127,11 @@ CommResult AllgatherFullmesh::allgatherKernelFunc(const std::shared_ptr ct if ((char*)input == (char*)output + rank * inputSize) { allgatherFullmesh<<>>( (void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank, - ctx->ipcDomainNranks, ctx->workSize, nElem); + ctx->nRanksPerIpcDomain, ctx->workSize, nElem); } else { allgatherFullmesh<<>>( (void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank, - ctx->ipcDomainNranks, ctx->workSize, nElem); + ctx->nRanksPerIpcDomain, ctx->workSize, nElem); } cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { @@ -148,7 +148,7 @@ std::shared_ptr AllgatherFullmesh::initAllgatherContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); // setup semaphores ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection); diff --git a/src/ext/collectives/allgather/allgather_fullmesh_2.cu b/src/ext/collectives/allgather/allgather_fullmesh_2.cu index 5a353922..f344824f 100644 --- a/src/ext/collectives/allgather/allgather_fullmesh_2.cu +++ b/src/ext/collectives/allgather/allgather_fullmesh_2.cu @@ -12,15 +12,15 @@ __device__ DeviceSyncer deviceSyncer; template __global__ void __launch_bounds__(1024, 1) allgatherFullmesh2(void* sendbuff, mscclpp::DeviceHandle* memoryChannels, - size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t ipcDomainNranks, - size_t nelemsPerGPU) { + size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, + size_t nRanksPerIpcDomain, size_t nelemsPerGPU) { const size_t tid = threadIdx.x + blockIdx.x * blockDim.x; const size_t lid = tid % WARP_SIZE; const size_t wid = tid / WARP_SIZE; const size_t nThread = blockDim.x * gridDim.x; const size_t nWarp = nThread / WARP_SIZE; - const size_t nPeer = ipcDomainNranks - 1; + const size_t nPeer = nRanksPerIpcDomain - 1; const size_t chanOffset = nPeer * blockIdx.x; auto memChans = memoryChannels + chanOffset; @@ -140,11 +140,11 @@ CommResult AllgatherFullmesh2::allgatherKernelFunc(const std::shared_ptr c if ((char*)input == (char*)output + rank * inputSize) { allgatherFullmesh2<<>>( (void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize, - ctx->ipcDomainNranks, nElem); + ctx->nRanksPerIpcDomain, nElem); } else { allgatherFullmesh2<<>>( (void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize, - ctx->ipcDomainNranks, nElem); + ctx->nRanksPerIpcDomain, nElem); } cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { @@ -159,7 +159,7 @@ std::shared_ptr AllgatherFullmesh2::initAllgatherContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); // setup semaphores ctx->memorySemaphores = this->memorySemaphores_; diff --git a/src/ext/collectives/allreduce/allreduce_allpair_packet.cu b/src/ext/collectives/allreduce/allreduce_allpair_packet.cu index 29ef2055..47c4f61d 100644 --- a/src/ext/collectives/allreduce/allreduce_allpair_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_allpair_packet.cu @@ -14,11 +14,11 @@ namespace collective { template __global__ void allreduceAllPairs(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, - size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, + size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerIpcDomain, int worldSize, size_t nelems, uint32_t numScratchBuff, void* flags, uint32_t flagSize) { if (sizeof(T) == 2 || sizeof(T) == 1) nelems = (nelems * sizeof(T) + sizeof(T)) / sizeof(int); - const int nPeers = ipcDomainNranks - 1; + const int nPeers = nRanksPerIpcDomain - 1; uint32_t flag = ((uint32_t*)flags)[blockIdx.x]; size_t scratchBaseOffset = (flag % numScratchBuff) ? (scratchBufferSize / numScratchBuff) : 0; @@ -72,19 +72,17 @@ template struct AllpairAdapter { static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*, DeviceHandle*, DeviceHandle*, size_t channelInOffset, size_t, - size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, + size_t scratchBufferSize, int rank, int nRanksPerIpcDomain, int worldSize, size_t inputSize, cudaStream_t stream, void* flags, uint32_t flagSize, uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) { using ChannelType = DeviceHandle; const size_t nelems = inputSize / sizeof(T); // Round nBlocks to multiple of nPeers so every block maps to a valid peer. - const int nPeers = worldSize - 1; - if (nPeers > 0) { - nBlocks = (nBlocks / nPeers) * nPeers; - } + const int nPeers = nRanksPerIpcDomain - 1; + nBlocks = (nBlocks / nPeers) * nPeers; allreduceAllPairs<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - ipcDomainNranks, worldSize, nelems, numScratchBuff, flags, flagSize); + nRanksPerIpcDomain, worldSize, nelems, numScratchBuff, flags, flagSize); return cudaGetLastError(); } }; @@ -103,13 +101,18 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptr&, DataType accumDtype) { auto algoCtx = std::static_pointer_cast(ctx); + if (algoCtx->workSize != algoCtx->nRanksPerIpcDomain) { + WARN("AllreduceAllpairPacket requires workSize to match nRanksPerIpcDomain, got workSize=%d, nRanksPerIpcDomain=%d", + algoCtx->workSize, algoCtx->nRanksPerIpcDomain); + return CommResult::CommInvalidArgument; + } std::pair blockAndThreadNum{nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, algoCtx->workSize); + blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, algoCtx->nRanksPerIpcDomain); } // nBlocks must be at least nPeers for allpair — each block maps to one peer. - const int nPeers = algoCtx->ipcDomainNranks - 1; - if (nPeers > 0 && blockAndThreadNum.first < nPeers) { + const int nPeers = algoCtx->nRanksPerIpcDomain - 1; + if (blockAndThreadNum.first < nPeers) { return CommResult::CommInvalidArgument; } size_t sendBytes; @@ -124,7 +127,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptrscratchBuffer_, output, algoCtx->memoryChannelDeviceHandles.get(), nullptr, nullptr, - nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->ipcDomainNranks, + nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->nRanksPerIpcDomain, algoCtx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { @@ -140,7 +143,7 @@ std::shared_ptr AllreduceAllpairPacket::initAllreduceContext(std::shared_p const int nChannelsPerConnection = maxBlockNum_; ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); ctx->memorySemaphores = this->memorySemaphores_; ctx->registeredMemories = this->registeredMemories_; ctx->registeredMemories.pop_back(); // remove the local memory from previous context diff --git a/src/ext/collectives/allreduce/allreduce_fullmesh.cu b/src/ext/collectives/allreduce/allreduce_fullmesh.cu index b158f817..2790295e 100644 --- a/src/ext/collectives/allreduce/allreduce_fullmesh.cu +++ b/src/ext/collectives/allreduce/allreduce_fullmesh.cu @@ -13,8 +13,8 @@ template __global__ void __launch_bounds__(512, 1) allreduceFullmesh(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* memoryOutChannels, size_t channelOutDataOffset, int rank, - int ipcDomainNranks, int worldSize, size_t nelems) { - const int nPeer = ipcDomainNranks - 1; + int nRanksPerIpcDomain, int worldSize, size_t nelems) { + const int nPeer = nRanksPerIpcDomain - 1; const size_t chanOffset = nPeer * blockIdx.x; // assume (nelems * sizeof(T)) is divisible by (16 * worldSize) const size_t nInt4 = nelems * sizeof(T) / sizeof(int4); @@ -157,7 +157,7 @@ template struct AllreduceAllconnectAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* memoryOutChannels, DeviceHandle*, DeviceHandle*, size_t, - size_t channelOutDataOffset, size_t, int rank, int ipcDomainNranks, int worldSize, + size_t channelOutDataOffset, size_t, int rank, int nRanksPerIpcDomain, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; @@ -166,7 +166,7 @@ struct AllreduceAllconnectAdapter { if (nThreadsPerBlock == 0) nThreadsPerBlock = 512; allreduceFullmesh<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, (ChannelType*)memoryOutChannels, - channelOutDataOffset, rank, ipcDomainNranks, worldSize, nelems); + channelOutDataOffset, rank, nRanksPerIpcDomain, worldSize, nelems); return cudaGetLastError(); } }; @@ -223,7 +223,7 @@ CommResult AllreduceFullmesh::allreduceKernelFunc( } cudaError_t error = allreduce(input, this->scratchBuffer_, output, inputChannelHandles.get(), ctx->memoryChannelDeviceHandles.get(), - nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, + nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->nRanksPerIpcDomain, ctx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN("AllreduceAllconnect failed with error: %s", cudaGetErrorString(error)); @@ -250,7 +250,7 @@ std::shared_ptr AllreduceFullmesh::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); // setup semaphores ctx->memorySemaphores = this->outputSemaphores_; diff --git a/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu b/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu index 890e50f5..347ce8b4 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_block_pipeline.cu @@ -21,15 +21,15 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] DeviceHandle* memoryChannels, [[maybe_unused]] DeviceHandle* switchChannels, [[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize, - [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) { + [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerIpcDomain) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 constexpr int alignment = 16; - int nPeers = ipcDomainNranks - 1; - int nBlocksForCopy = ipcDomainNranks * 2; - int nBlocksForReduce = ipcDomainNranks; + int nPeers = nRanksPerIpcDomain - 1; + int nBlocksForCopy = nRanksPerIpcDomain * 2; + int nBlocksForReduce = nRanksPerIpcDomain; int copyReduceRatio = nBlocksForCopy / nBlocksForReduce; - size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks; - size_t sizePerRank = size / ipcDomainNranks; + size_t scratchSizePerRank = scratchBufferSize / nRanksPerIpcDomain; + size_t sizePerRank = size / nRanksPerIpcDomain; assert(sizePerRank % alignment == 0); uint32_t sizePerBlock = ((sizePerRank + (nBlocksForCopy - 1)) / nBlocksForCopy + alignment - 1) / alignment * alignment; @@ -69,7 +69,7 @@ __global__ void __launch_bounds__(1024, 1) deviceSemaphore[bid + 2 * nBlocksForCopy].acquire(); } __syncthreads(); - for (int i = 0; i < ipcDomainNranks; i++) { + for (int i = 0; i < nRanksPerIpcDomain; i++) { size_t blockOffset = it * unitSize + bid * sizePerBlock + i * sizePerRank; uint32_t scratchOffset = scratchIt * unitSize + bid * scratchSizePerBlock + i * scratchSizePerRank; char* srcData = (char*)src + blockOffset; @@ -126,7 +126,7 @@ __global__ void __launch_bounds__(1024, 1) channels->wait(); } __syncthreads(); - for (int i = 0; i < ipcDomainNranks; i++) { + for (int i = 0; i < nRanksPerIpcDomain; i++) { size_t blockOffset = it * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * sizePerBlock + i * sizePerRank; uint32_t scratchOffset = scratchIt * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * scratchSizePerBlock + @@ -151,7 +151,7 @@ template struct NvlsBlockPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*, DeviceHandle* nvlsChannels, DeviceHandle*, size_t, size_t, - size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize, + size_t scratchBufferSize, int rank, int nRanksPerIpcDomain, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -169,7 +169,7 @@ struct NvlsBlockPipelineAdapter { using ChannelType = DeviceHandle; allreduceNvlsBlockPipeline<<>>( input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank, - ipcDomainNranks); + nRanksPerIpcDomain); return cudaGetLastError(); } } @@ -177,9 +177,9 @@ struct NvlsBlockPipelineAdapter { void AllreduceNvlsBlockPipeline::initialize(std::shared_ptr comm) { nSwitchChannels_ = 8; - ipcDomainNranks_ = comm->bootstrap()->getNranksPerIpcDomain(); - // Per-peer channel allocation must hold up to 4 * ipcDomainNranks entries (see kernel). - nBaseChannels_ = std::max(64, 4 * ipcDomainNranks_); + nRanksPerIpcDomain_ = comm->bootstrap()->getNranksPerIpcDomain(); + // Per-peer channel allocation must hold up to 4 * nRanksPerIpcDomain entries (see kernel). + nBaseChannels_ = std::max(64, 4 * nRanksPerIpcDomain_); this->conns_ = setupConnections(comm); // setup semaphores std::vector> memorySemaphores = @@ -202,11 +202,11 @@ CommResult AllreduceNvlsBlockPipeline::allreduceKernelFunc( } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = {ctx->ipcDomainNranks * 5, 1024}; + blockAndThreadNum = {ctx->nRanksPerIpcDomain * 5, 1024}; } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, - ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, + ctx->rank, ctx->nRanksPerIpcDomain, ctx->workSize, inputSize, stream, nullptr, 0, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN("AllreduceNvlsBlockPipeline failed with error: %s", cudaGetErrorString(error)); @@ -224,7 +224,7 @@ std::shared_ptr AllreduceNvlsBlockPipeline::initAllreduceContext(std::shar auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); // setup channels ctx->switchChannels = diff --git a/src/ext/collectives/allreduce/allreduce_nvls_packet.cu b/src/ext/collectives/allreduce/allreduce_nvls_packet.cu index e8ecfb73..f16e8b05 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_packet.cu @@ -95,7 +95,7 @@ std::shared_ptr AllreduceNvlsPacket::initAllreduceContext(std::shared_ptr< auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); // setup channels ctx->switchChannels = this->switchChannels_; @@ -124,7 +124,7 @@ CommResult AllreduceNvlsPacket::allreduceKernelFunc(const std::shared_ptr } cudaError_t error = allreduce(input, this->scratchBuffer_, output, nullptr, nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, - 0, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, + 0, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerIpcDomain, ctx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN(ALGO, "AllreduceNvlsPacket failed with error: ", cudaGetErrorString(error)); diff --git a/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu b/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu index 68efc2ab..ba447d32 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_warp_pipeline.cu @@ -19,15 +19,15 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] DeviceHandle* memoryChannels, [[maybe_unused]] DeviceHandle* multicast, [[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize, [[maybe_unused]] int rank, - [[maybe_unused]] int ipcDomainNranks) { + [[maybe_unused]] int nRanksPerIpcDomain) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 constexpr int alignment = 16; - int nPeers = ipcDomainNranks - 1; + int nPeers = nRanksPerIpcDomain - 1; int nBlocks = gridDim.x; int nBlocksPerNvlsConn = nBlocks / NUM_NVLS_CONNECTION; int bid = blockIdx.x; - size_t sizePerRank = size / ipcDomainNranks; - size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks; + size_t sizePerRank = size / nRanksPerIpcDomain; + size_t scratchSizePerRank = scratchBufferSize / nRanksPerIpcDomain; const size_t maxSizePerBlock = ((sizePerRank + nBlocks - 1) / nBlocks + alignment - 1) / alignment * alignment; size_t start = bid * maxSizePerBlock; size_t end = min(start + maxSizePerBlock, sizePerRank); @@ -54,7 +54,7 @@ __global__ void __launch_bounds__(1024, 1) lastIterSize = sizePerBlock % copyPerIter; } - const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x * 2; + const size_t chanOffset = (nRanksPerIpcDomain - 1) * blockIdx.x * 2; auto memoryChans = memoryChannels + chanOffset; __shared__ DeviceHandle channels[(MAX_IPC_DOMAIN_NRANKS - 1) * 2]; const int lid = threadIdx.x % WARP_SIZE; @@ -67,7 +67,7 @@ __global__ void __launch_bounds__(1024, 1) const size_t iterSize = (it == nIter - 1) ? lastIterSize : copyPerIter; if (warpId < endCopyWid) { int tidInCopy = threadIdx.x; - for (int i = 0; i < ipcDomainNranks; i++) { + for (int i = 0; i < nRanksPerIpcDomain; i++) { size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter; size_t offsetScratch = i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock; @@ -98,7 +98,7 @@ __global__ void __launch_bounds__(1024, 1) channels[tidInRecvCopy + nPeers].wait(); } asm volatile("bar.sync %0, %1;" ::"r"(3), "r"((NRECV_COPY_WARPS)*WARP_SIZE) : "memory"); - for (int i = 0; i < ipcDomainNranks; i++) { + for (int i = 0; i < nRanksPerIpcDomain; i++) { size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter; size_t offsetScratch = i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock; @@ -115,7 +115,7 @@ template struct NvlsWarpPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*, DeviceHandle* nvlsChannels, DeviceHandle*, size_t, size_t, - size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize, + size_t scratchBufferSize, int rank, int nRanksPerIpcDomain, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -133,7 +133,7 @@ struct NvlsWarpPipelineAdapter { using ChannelType = DeviceHandle; allreduceNvlsWarpPipeline<<>>( input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank, - ipcDomainNranks); + nRanksPerIpcDomain); return cudaGetLastError(); } } @@ -141,9 +141,9 @@ struct NvlsWarpPipelineAdapter { void AllreduceNvlsWarpPipeline::initialize(std::shared_ptr comm) { nSwitchChannels_ = NUM_NVLS_CONNECTION; - ipcDomainNranks_ = comm->bootstrap()->getNranksPerIpcDomain(); - // Per-peer channel allocation must hold 2 * nBlocks entries; default nBlocks = 4 * ipcDomainNranks. - nBaseChannels_ = std::max(64, 8 * ipcDomainNranks_); + nRanksPerIpcDomain_ = comm->bootstrap()->getNranksPerIpcDomain(); + // Per-peer channel allocation must hold 2 * nBlocks entries; default nBlocks = 4 * nRanksPerIpcDomain. + nBaseChannels_ = std::max(64, 8 * nRanksPerIpcDomain_); this->conns_ = setupConnections(comm); // setup semaphores std::vector> memorySemaphores = @@ -166,11 +166,11 @@ CommResult AllreduceNvlsWarpPipeline::allreduceKernelFunc( } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = {ctx->ipcDomainNranks * 4, 1024}; + blockAndThreadNum = {ctx->nRanksPerIpcDomain * 4, 1024}; } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr, ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_, - ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0, + ctx->rank, ctx->nRanksPerIpcDomain, ctx->workSize, inputSize, stream, nullptr, 0, 0, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN("AllreduceNvlsWarpPipeline failed with error: %s", cudaGetErrorString(error)); @@ -188,7 +188,7 @@ std::shared_ptr AllreduceNvlsWarpPipeline::initAllreduceContext(std::share auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); // setup channels ctx->switchChannels = diff --git a/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu b/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu index a6f699b2..32fc6142 100644 --- a/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu +++ b/src/ext/collectives/allreduce/allreduce_nvls_zero_copy.cu @@ -20,12 +20,12 @@ __global__ void __launch_bounds__(1024, 1) [[maybe_unused]] mscclpp::DeviceHandle* multicast, [[maybe_unused]] mscclpp::DeviceHandle* multicastOut, [[maybe_unused]] size_t channelInOffset, [[maybe_unused]] size_t channelOutOffset, - [[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) { + [[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerIpcDomain) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 - int nPeers = ipcDomainNranks - 1; + int nPeers = nRanksPerIpcDomain - 1; int nBlocks = gridDim.x; int bid = blockIdx.x; - size_t sizePerRank = size / ipcDomainNranks; + size_t sizePerRank = size / nRanksPerIpcDomain; const size_t minAlign = 16; // Align sizePerBlock to 16 bytes to ensure aligned vector access in handleMultiLoadReduceStore size_t sizePerBlock = (sizePerRank + nBlocks - 1) / nBlocks; @@ -41,12 +41,12 @@ __global__ void __launch_bounds__(1024, 1) mscclpp::DeviceHandle* multicastPtr = multicast + bid; mscclpp::DeviceHandle* multicastOutPtr = multicastOut + bid; - const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x; + const size_t chanOffset = (nRanksPerIpcDomain - 1) * blockIdx.x; auto memoryChans = memoryChannels + chanOffset; __shared__ mscclpp::DeviceHandle channels[MAX_IPC_DOMAIN_NRANKS - 1]; const int lid = threadIdx.x % WARP_SIZE; // Peer count may exceed WARP_SIZE on MNNVL. - for (int i = lid; i < ipcDomainNranks - 1; i += WARP_SIZE) { + for (int i = lid; i < nRanksPerIpcDomain - 1; i += WARP_SIZE) { channels[i] = memoryChans[i]; } __syncwarp(); @@ -74,7 +74,7 @@ struct NvlsAdapter { static cudaError_t call(const void*, void*, void*, void* memoryChannels, void*, mscclpp::DeviceHandle* nvlsChannels, mscclpp::DeviceHandle* nvlsOutChannels, size_t channelInOffset, - size_t channelOutOffset, size_t, int rank, int ipcDomainNranks, int, size_t inputSize, + size_t channelOutOffset, size_t, int rank, int nRanksPerIpcDomain, int, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { // uint8_t is not supported for NVLS (no hardware support for byte-level reduction) if constexpr (std::is_same_v) { @@ -86,7 +86,7 @@ struct NvlsAdapter { using ChannelType = DeviceHandle; allreduceNvls<<>>( (ChannelType*)memoryChannels, nvlsChannels, nvlsOutChannels, channelInOffset, channelOutOffset, inputSize, - rank, ipcDomainNranks); + rank, nRanksPerIpcDomain); return cudaGetLastError(); } } @@ -120,6 +120,13 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo return CommResult::CommInvalidArgument; } auto ctx = std::static_pointer_cast(ctx_void); +#if defined(__FP8_TYPES_EXIST__) + bool isFp8Dtype = dtype == mscclpp::DataType::FLOAT8_E4M3FN || dtype == mscclpp::DataType::FLOAT8_E5M2; + if (isFp8Dtype && computeCapabilityMajor_ < 10) { + WARN("FP8 NVLS allreduce requires compute capability 10.x or newer."); + return CommResult::CommInvalidArgument; + } +#endif AllreduceFunc allreduce = dispatch(op, dtype, accumDtype); if (!allreduce) { WARN("Unsupported operation or data type for allreduce, dtype=%d", static_cast(dtype)); @@ -138,7 +145,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo } std::pair numBlocksAndThreads = {nBlocks, nThreadsPerBlock}; if (numBlocksAndThreads.first == 0 || numBlocksAndThreads.second == 0) { - numBlocksAndThreads = {::min(ctx->ipcDomainNranks, MAX_NBLOCKS), 1024}; + numBlocksAndThreads = {::min(ctx->nRanksPerIpcDomain, MAX_NBLOCKS), 1024}; // For GB200 devices with MNNVLS (Multi-Node NVLink Sharp), scale the number of blocks inversely with // the number of GPUs. Empirically, 32 blocks works well for 4 GPUs and 16 for 8 GPUs, which // follows the formula 128 / nGPUs, clamped to [1, MAX_NBLOCKS]. @@ -152,9 +159,13 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr ctx_vo } cudaError_t error = allreduce(nullptr, nullptr, nullptr, this->memoryChannelsDeviceHandle_.get(), nullptr, nvlsChannels, - nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, - inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); + nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->nRanksPerIpcDomain, + ctx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { + if (error == cudaErrorNotSupported) { + WARN("AllreduceNvls does not support the requested data type."); + return CommResult::CommInvalidArgument; + } WARN("AllreduceNvls failed with error: %s", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; } @@ -176,7 +187,7 @@ std::shared_ptr AllreduceNvls::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); size_t sendBytes, recvBytes; CUdeviceptr sendBasePtr, recvBasePtr; diff --git a/src/ext/collectives/allreduce/allreduce_packet.cu b/src/ext/collectives/allreduce/allreduce_packet.cu index 801bed62..d20625ee 100644 --- a/src/ext/collectives/allreduce/allreduce_packet.cu +++ b/src/ext/collectives/allreduce/allreduce_packet.cu @@ -15,7 +15,7 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allreducePacket(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle* memoryChannels, - size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, + size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerIpcDomain, int worldSize, size_t nelems, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff #if defined(ENABLE_NPKIT) , @@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1) else nelems = nelems / (sizeof(int) / sizeof(T)); - const int nPeers = ipcDomainNranks - 1; + const int nPeers = nRanksPerIpcDomain - 1; const size_t nPkts = nelems / 2; uint32_t flag = ((uint32_t*)flags)[blockIdx.x]; @@ -154,31 +154,32 @@ template struct PacketAdapter { static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*, DeviceHandle*, DeviceHandle*, size_t channelInOffset, size_t, - size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, + size_t scratchBufferSize, int rank, int nRanksPerIpcDomain, int worldSize, size_t inputSize, cudaStream_t stream, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) { using ChannelType = DeviceHandle; const size_t nelems = inputSize / sizeof(T); - // Optimize the number of blocks to be multiple of (worldSize - 1) - nBlocks = nBlocks / (worldSize - 1) * (worldSize - 1); + // Optimize the number of blocks to be multiple of the IPC-domain peer count. + const int nPeers = nRanksPerIpcDomain - 1; + nBlocks = nBlocks / nPeers * nPeers; #if defined(ENABLE_NPKIT) size_t sharedMemSize = sizeof(NpKitEvent) * NPKIT_SHM_NUM_EVENTS; allreducePacket<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(), - NpKit::GetCpuTimestamp()); + nRanksPerIpcDomain, worldSize, nelems, flags, flagBufferSize, numScratchBuff, + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else allreducePacket<<>>( (T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank, - ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff); + nRanksPerIpcDomain, worldSize, nelems, flags, flagBufferSize, numScratchBuff); #endif return cudaGetLastError(); } }; -inline std::pair getDefaultBlockNumAndThreadNum(size_t inputSize, int ipcDomainNranks, int worldSize, +inline std::pair getDefaultBlockNumAndThreadNum(size_t inputSize, int nRanksPerIpcDomain, int worldSize, [[maybe_unused]] DataType dtype) { - int nBlocks = (ipcDomainNranks - 1) * 4; + int nBlocks = (nRanksPerIpcDomain - 1) * 4; int nThreadsPerBlock = 1024; if (inputSize >= 32768) { nBlocks = (worldSize - 1) * 8; @@ -229,12 +230,17 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr ctx_ const std::unordered_map&, DataType accumDtype) { auto ctx = std::static_pointer_cast(ctx_void); + if (ctx->workSize != ctx->nRanksPerIpcDomain) { + WARN(ALGO, "AllreducePacket requires workSize to match nRanksPerIpcDomain, got workSize=", ctx->workSize, + ", nRanksPerIpcDomain=", ctx->nRanksPerIpcDomain); + return CommResult::CommInvalidArgument; + } std::pair blockAndThreadNum = {nBlocks, nThreadsPerBlock}; if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) { - blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->ipcDomainNranks, ctx->workSize, dtype); + blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->nRanksPerIpcDomain, ctx->workSize, dtype); } else { - const int nPeers = ctx->workSize - 1; - if (nPeers > 0 && blockAndThreadNum.first < nPeers) { + const int nPeers = ctx->nRanksPerIpcDomain - 1; + if (blockAndThreadNum.first < nPeers) { return CommResult::CommInvalidArgument; } } @@ -252,8 +258,8 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr ctx_ } cudaError_t error = allreduce(input, this->scratchBuffer_, output, ctx->memoryChannelDeviceHandles.get(), nullptr, nullptr, nullptr, - channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, - stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, + channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerIpcDomain, ctx->workSize, + inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second); if (error != cudaSuccess) { WARN(ALGO, "AllreducePacket failed with error: ", cudaGetErrorString(error)); @@ -268,7 +274,7 @@ std::shared_ptr AllreducePacket::initAllreduceContext(std::shared_ptrrank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); ctx->memorySemaphores = this->memorySemaphores_; ctx->registeredMemories = this->registeredMemories_; ctx->registeredMemories.pop_back(); // remove the local memory from previous context diff --git a/src/ext/collectives/allreduce/allreduce_rsag.cu b/src/ext/collectives/allreduce/allreduce_rsag.cu index 22e3a4ee..f07e0e2c 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag.cu @@ -31,18 +31,18 @@ namespace collective { template __global__ void __launch_bounds__(1024, 1) allreduceRsAg(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, - DeviceHandle* switchChannels, void* remoteMemories, int rank, int ipcDomainNranks, + DeviceHandle* switchChannels, void* remoteMemories, int rank, int nRanksPerIpcDomain, int worldSize, size_t nelems) { int blockId = blockIdx.x; - uint32_t nPeers = ipcDomainNranks - 1; + uint32_t nPeers = nRanksPerIpcDomain - 1; assert((uintptr_t)buff % sizeof(int4) == 0); assert((uintptr_t)resultBuff % sizeof(int4) == 0); constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T); - uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 * - nelemsPerInt4 * ipcDomainNranks; - uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks; + uint32_t alignedNelems = ((nelems + nRanksPerIpcDomain - 1) / nRanksPerIpcDomain + nelemsPerInt4 - 1) / + nelemsPerInt4 * nelemsPerInt4 * nRanksPerIpcDomain; + uint32_t nelemsPerRank = alignedNelems / nRanksPerIpcDomain; uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4; uint32_t lastInt4Index = nelems / nelemsPerInt4; uint32_t remainder = nelems % nelemsPerInt4; @@ -59,7 +59,7 @@ __global__ void __launch_bounds__(1024, 1) nInt4PerBlock += remainderForBlock; } if (nInt4PerBlock == 0) return; - uint32_t nInt4ForCopy = nInt4PerBlock * ipcDomainNranks; + uint32_t nInt4ForCopy = nInt4PerBlock * nRanksPerIpcDomain; for (uint32_t idx = threadIdx.x; idx < nInt4ForCopy; idx += blockDim.x) { int rankIdx = idx / nInt4PerBlock; @@ -84,13 +84,13 @@ __global__ void __launch_bounds__(1024, 1) if (offset > lastInt4Index) continue; int4 tmp = scratch4[offset]; for (uint32_t i = 0; i < nPeers; i++) { - int rankIdx = (rank + i + 1) % ipcDomainNranks; + int rankIdx = (rank + i + 1) % nRanksPerIpcDomain; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; int4 data = mscclpp::read(((void**)remoteMemories)[peerIdx], offset); tmp = calVector(data, tmp); } for (uint32_t i = 0; i < nPeers; i++) { - int rankIdx = (rank + i + 1) % ipcDomainNranks; + int rankIdx = (rank + i + 1) % nRanksPerIpcDomain; int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1; mscclpp::write(((void**)remoteMemories)[peerIdx], offset, tmp); } @@ -127,8 +127,8 @@ template struct AllreduceRsAgAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, - void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { + size_t, int rank, int nRanksPerIpcDomain, int worldSize, size_t inputSize, + cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); if (nBlocks == 0 || nThreadsPerBlock == 0) { @@ -137,7 +137,7 @@ struct AllreduceRsAgAdapter { } allreduceRsAg<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - ipcDomainNranks, worldSize, nelems); + nRanksPerIpcDomain, worldSize, nelems); return cudaGetLastError(); } }; @@ -185,7 +185,7 @@ CommResult AllreduceRsAg::allreduceKernelFunc(const std::shared_ptr ctx, c } cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(), this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, 0, algoCtx->rank, - algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, + algoCtx->nRanksPerIpcDomain, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); @@ -203,7 +203,7 @@ std::shared_ptr AllreduceRsAg::initAllreduceContext(std::shared_ptr(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); ctx->memorySemaphores = this->scratchSemaphores_; ctx->registeredMemories = this->remoteScratchMemories_; diff --git a/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu b/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu index bedf15c5..e9d543ea 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag_pipeline.cu @@ -86,15 +86,15 @@ template __global__ void __launch_bounds__(1024, 1) allreduceRsAgPipeline(T* buff, T* scratch, T* resultBuff, DeviceHandle* memoryChannels, DeviceHandle* switchChannels, void* remoteMemories, int rank, - int ipcDomainNranks, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut, - uint32_t nblocksForReduce, uint32_t nblocksForRecv) { + int nRanksPerIpcDomain, int worldSize, size_t nelems, size_t scratchSize, + uint32_t nblocksForPut, uint32_t nblocksForReduce, uint32_t nblocksForRecv) { uint32_t bid = blockIdx.x; constexpr uint32_t nStepsPerIter = 4; uint32_t nInt4 = (nelems * sizeof(T) + sizeof(int4) - 1) / sizeof(int4); uint32_t nInt4PerIter = nblocksForReduce * blockDim.x * nStepsPerIter; const uint32_t chunkSize = nInt4PerIter * worldSize; uint32_t nIters = (nInt4 + chunkSize - 1) / chunkSize; - uint32_t nPeers = ipcDomainNranks - 1; + uint32_t nPeers = nRanksPerIpcDomain - 1; int4* scratch4 = reinterpret_cast((char*)scratch); const uint32_t scratchIterStride = 2 * chunkSize; // one for AS, one for AG const uint32_t pipelineDepth = scratchSize / sizeof(int4) / scratchIterStride; @@ -111,7 +111,7 @@ __global__ void __launch_bounds__(1024, 1) __syncthreads(); uint32_t threadIdInPut = bid * blockDim.x + threadIdx.x; for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % ipcDomainNranks; + int remoteRankId = (rank + peer + 1) % nRanksPerIpcDomain; int peerId = remoteRankId < rank ? remoteRankId : remoteRankId - 1; // Read chunk[remoteRankId] from local buff, write to peer's scratch[rank] (sender's slot) uint32_t srcOffset = iter * chunkSize + remoteRankId * nInt4PerIter; @@ -164,7 +164,7 @@ __global__ void __launch_bounds__(1024, 1) int4 tmp = loadVec(buff, myChunkOffset, nelems); // Add data from each peer's slot in scratch (peer sent their chunk[rank] to our scratch[peer]) for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % ipcDomainNranks; + int remoteRankId = (rank + peer + 1) % nRanksPerIpcDomain; uint32_t peerSlotOffset = baseOffset + remoteRankId * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut; int4 data = scratch4[peerSlotOffset]; @@ -175,7 +175,7 @@ __global__ void __launch_bounds__(1024, 1) uint32_t dstOffset = baseOffset + chunkSize + rank * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut; for (uint32_t i = 0; i < nPeers; i++) { - int peerIdx = (rank + i + 1) % ipcDomainNranks; + int peerIdx = (rank + i + 1) % nRanksPerIpcDomain; int index = peerIdx < rank ? peerIdx : peerIdx - 1; mscclpp::write(((void**)remoteMemories)[index], dstOffset, tmp); } @@ -203,7 +203,7 @@ __global__ void __launch_bounds__(1024, 1) __syncthreads(); // Copy other ranks' reduced chunks from scratch to result for (uint32_t peer = 0; peer < nPeers; peer++) { - int remoteRankId = (rank + peer + 1) % ipcDomainNranks; + int remoteRankId = (rank + peer + 1) % nRanksPerIpcDomain; for (uint32_t step = 0; step < nStepsPerIter * REDUCE_COPY_RATIO; step++) { uint32_t offset = baseOffset + chunkSize + remoteRankId * nInt4PerIter + threadIdInRecv + step * blockDim.x * nblocksForRecv; @@ -224,7 +224,7 @@ template struct AllreduceRsAgPipelineAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t scratchSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, + size_t scratchSize, int rank, int nRanksPerIpcDomain, int worldSize, size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); @@ -248,7 +248,7 @@ struct AllreduceRsAgPipelineAdapter { } allreduceRsAgPipeline<<>>( (T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, - ipcDomainNranks, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv); + nRanksPerIpcDomain, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv); return cudaGetLastError(); } }; @@ -288,8 +288,8 @@ CommResult AllreduceRsAgPipeline::allreduceKernelFunc( std::pair numBlocksAndThreads = {nBlocks, nThreadsPerBlock}; cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(), this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, this->scratchBufferSize_, - algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, - 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); + algoCtx->rank, algoCtx->nRanksPerIpcDomain, algoCtx->workSize, inputSize, stream, + nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; @@ -306,7 +306,7 @@ std::shared_ptr AllreduceRsAgPipeline::initAllreduceContext(std::shared_pt auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); ctx->memorySemaphores = this->scratchSemaphores_; ctx->registeredMemories = this->remoteScratchMemories_; diff --git a/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu b/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu index 10d3a35c..753ad799 100644 --- a/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu +++ b/src/ext/collectives/allreduce/allreduce_rsag_zero_copy.cu @@ -116,8 +116,8 @@ template struct AllreduceRsAgZeroCopyAdapter { static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories, DeviceHandle* switchChannel, DeviceHandle*, size_t, size_t, - size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream, - void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { + size_t, int rank, int nRanksPerIpcDomain, int worldSize, size_t inputSize, + cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) { using ChannelType = DeviceHandle; size_t nelems = inputSize / sizeof(T); if (nBlocks == 0 || nThreadsPerBlock == 0) { @@ -127,16 +127,16 @@ struct AllreduceRsAgZeroCopyAdapter { nBlocks = 128; } } - if (ipcDomainNranks == 4) { + if (nRanksPerIpcDomain == 4) { allreduceRsAgZeroCopy<4, OpType, T, AccumT> <<>>((T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, worldSize, nelems); - } else if (ipcDomainNranks == 8) { + } else if (nRanksPerIpcDomain == 8) { allreduceRsAgZeroCopy<8, OpType, T, AccumT> <<>>((T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank, worldSize, nelems); } else { - WARN(ALGO, "AllreduceRsAgZeroCopy only supports ipcDomainNranks of 4 or 8, got: ", ipcDomainNranks); + WARN(ALGO, "AllreduceRsAgZeroCopy only supports nRanksPerIpcDomain of 4 or 8, got: ", nRanksPerIpcDomain); return cudaErrorInvalidValue; } return cudaGetLastError(); @@ -172,9 +172,13 @@ CommResult AllreduceRsAgZeroCopy::allreduceKernelFunc(const std::shared_ptrbaseMemoryChannelHandles_.get(), algoCtx->remoteMemoryHandles.get(), - nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, + nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->nRanksPerIpcDomain, algoCtx->workSize, inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second); if (error != cudaSuccess) { + if (error == cudaErrorInvalidValue) { + WARN(ALGO, "AllreduceRsAgZeroCopy received invalid launch arguments: ", cudaGetErrorString(error)); + return CommResult::CommInvalidArgument; + } WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error)); return CommResult::CommUnhandledCudaError; } @@ -200,7 +204,7 @@ std::shared_ptr AllreduceRsAgZeroCopy::initAllreduceContext(std::shared_pt auto ctx = std::make_shared(); ctx->rank = comm->bootstrap()->getRank(); ctx->workSize = comm->bootstrap()->getNranks(); - ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerIpcDomain(); + ctx->nRanksPerIpcDomain = comm->bootstrap()->getNranksPerIpcDomain(); ctx->memorySemaphores = this->semaphores_; diff --git a/src/ext/collectives/include/allreduce/allreduce_nvls_block_pipeline.hpp b/src/ext/collectives/include/allreduce/allreduce_nvls_block_pipeline.hpp index 9a1742db..5662d116 100644 --- a/src/ext/collectives/include/allreduce/allreduce_nvls_block_pipeline.hpp +++ b/src/ext/collectives/include/allreduce/allreduce_nvls_block_pipeline.hpp @@ -29,7 +29,7 @@ class AllreduceNvlsBlockPipeline : public AlgorithmBuilder { void* scratchBuffer_; size_t scratchBufferSize_; uint32_t nSwitchChannels_; - int ipcDomainNranks_ = 0; + int nRanksPerIpcDomain_ = 0; int nBaseChannels_ = 0; std::shared_ptr> memoryChannelsDeviceHandle_; std::vector baseChannels_; diff --git a/src/ext/collectives/include/allreduce/allreduce_nvls_warp_pipeline.hpp b/src/ext/collectives/include/allreduce/allreduce_nvls_warp_pipeline.hpp index e2aa8c87..f347c871 100644 --- a/src/ext/collectives/include/allreduce/allreduce_nvls_warp_pipeline.hpp +++ b/src/ext/collectives/include/allreduce/allreduce_nvls_warp_pipeline.hpp @@ -29,7 +29,7 @@ class AllreduceNvlsWarpPipeline : public AlgorithmBuilder { void* scratchBuffer_; size_t scratchBufferSize_; uint32_t nSwitchChannels_; - int ipcDomainNranks_ = 0; + int nRanksPerIpcDomain_ = 0; int nBaseChannels_ = 0; std::shared_ptr> memoryChannelsDeviceHandle_; std::vector baseChannels_; diff --git a/src/ext/collectives/include/allreduce/common.hpp b/src/ext/collectives/include/allreduce/common.hpp index 22513ace..5d593449 100644 --- a/src/ext/collectives/include/allreduce/common.hpp +++ b/src/ext/collectives/include/allreduce/common.hpp @@ -39,34 +39,43 @@ MSCCLPP_DEVICE_INLINE constexpr std::size_t calcVectorSize() { template MSCCLPP_DEVICE_INLINE void handleMultiLoadReduceStore(T* src, T* dst, size_t srcOffset, size_t dstOffset, size_t size, int tid, int nThreads) { - // nvls can only handle 4 bytes alignment - MSCCLPP_ASSERT_DEVICE(size % 4 == 0, "size must be 4 bytes aligned"); - constexpr size_t nElem = calcVectorSize(); - // For integer types, use 1-element vectors since multimem doesn't support vectorized integer operations - constexpr size_t vecSize = (std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v) - ? 1 - : nElem; - using vectorType = mscclpp::VectorType; - const size_t nVec = size / sizeof(vectorType); - const size_t srcOffset4 = srcOffset / sizeof(vectorType); - const size_t dstOffset4 = dstOffset / sizeof(vectorType); - vectorType* src4 = (vectorType*)src; - vectorType* dst4 = (vectorType*)dst; - for (size_t idx = tid; idx < nVec; idx += nThreads) { - auto val = mscclpp::SwitchChannelDeviceHandle::multimemLoadReduce(src4 + srcOffset4 + idx); - mscclpp::SwitchChannelDeviceHandle::multimemStore(val, dst4 + dstOffset4 + idx); - } - // handle rest of data - size_t processed = nVec * sizeof(vectorType); - constexpr size_t nRestElem = 4 / sizeof(T); - using restVectorType = mscclpp::VectorType; - const size_t startIdx = (srcOffset + processed) / sizeof(restVectorType); - const size_t endIdx = (srcOffset + size) / sizeof(restVectorType); - for (size_t idx = tid + startIdx; idx < endIdx; idx += nThreads) { - auto val = - mscclpp::SwitchChannelDeviceHandle::multimemLoadReduce((restVectorType*)src + idx); - mscclpp::SwitchChannelDeviceHandle::multimemStore(val, (restVectorType*)dst + idx); +#if defined(__FP8_TYPES_EXIST__) && \ + (!(defined(__CUDA_ARCH_SPECIFIC__) || defined(__CUDA_ARCH_FAMILY_SPECIFIC__)) || (__CUDA_ARCH__ < 1000)) + if constexpr (std::is_same_v || std::is_same_v) { + assert(false && "FP8 NVLS multimem requires sm_100a or newer"); + return; + } else +#endif + { + // nvls can only handle 4 bytes alignment + MSCCLPP_ASSERT_DEVICE(size % 4 == 0, "size must be 4 bytes aligned"); + constexpr size_t nElem = calcVectorSize(); + // For integer types, use 1-element vectors since multimem doesn't support vectorized integer operations + constexpr size_t vecSize = (std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v) + ? 1 + : nElem; + using vectorType = mscclpp::VectorType; + const size_t nVec = size / sizeof(vectorType); + const size_t srcOffset4 = srcOffset / sizeof(vectorType); + const size_t dstOffset4 = dstOffset / sizeof(vectorType); + vectorType* src4 = (vectorType*)src; + vectorType* dst4 = (vectorType*)dst; + for (size_t idx = tid; idx < nVec; idx += nThreads) { + auto val = mscclpp::SwitchChannelDeviceHandle::multimemLoadReduce(src4 + srcOffset4 + idx); + mscclpp::SwitchChannelDeviceHandle::multimemStore(val, dst4 + dstOffset4 + idx); + } + // handle rest of data + size_t processed = nVec * sizeof(vectorType); + constexpr size_t nRestElem = 4 / sizeof(T); + using restVectorType = mscclpp::VectorType; + const size_t startIdx = (srcOffset + processed) / sizeof(restVectorType); + const size_t endIdx = (srcOffset + size) / sizeof(restVectorType); + for (size_t idx = tid + startIdx; idx < endIdx; idx += nThreads) { + auto val = + mscclpp::SwitchChannelDeviceHandle::multimemLoadReduce((restVectorType*)src + idx); + mscclpp::SwitchChannelDeviceHandle::multimemStore(val, (restVectorType*)dst + idx); + } } } #endif // defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 diff --git a/src/ext/collectives/include/collective_utils.hpp b/src/ext/collectives/include/collective_utils.hpp index c1cad412..2e61b937 100644 --- a/src/ext/collectives/include/collective_utils.hpp +++ b/src/ext/collectives/include/collective_utils.hpp @@ -27,8 +27,8 @@ namespace mscclpp { namespace collective { constexpr int NUM_NVLS_CONNECTION = 8; // Sized to cover MAX_IPC_DOMAIN_NRANKS-scale allreduce algos whose device-side -// semaphore indices grow as O(ipcDomainNranks) (e.g. nvls_block_pipeline uses -// up to ~5 * ipcDomainNranks entries). +// semaphore indices grow as O(nRanksPerIpcDomain) (e.g. nvls_block_pipeline uses +// up to ~5 * nRanksPerIpcDomain entries). constexpr int NUM_SEMAPHORES = 512; // Upper bound on the number of NVLink-reachable ranks that participate in a @@ -37,7 +37,7 @@ constexpr int NUM_SEMAPHORES = 512; // of shared-memory channel arrays in the allreduce/allgather kernels. constexpr int MAX_IPC_DOMAIN_NRANKS = 72; -constexpr int SCRATCH_SIZE = 2 * 1024 * 1024 * 70; // double buffer * 35 thread-blocks * 8 ranks * 256KB = 70MB +constexpr int SCRATCH_SIZE = 2 * 1024 * 1024 * 70; // Two 70 MiB buffers for double-buffered packet scratch space. std::vector setupRemoteMemories(std::shared_ptr comm, int rank, RegisteredMemory localMemory); @@ -79,7 +79,7 @@ class AlgorithmCtx { public: int rank; int workSize; - int ipcDomainNranks; + int nRanksPerIpcDomain; std::vector registeredMemories; std::vector memoryChannels;