mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-11 17:00:22 +00:00
Rename collective ctx/kernel param nRanksPerNode to ipcDomainNranks
The AlgorithmCtx field and the kernel/host parameters that hold the
collective's IPC peer-group size were named nRanksPerNode, which is
misleading on Multi-Node NVLink (where the value spans multiple hosts)
and on AMD (where the relevant fabric is XGMI, not NVLink). Rename to
ipcDomainNranks throughout the collective algorithms to match the
neutral naming introduced for the env helper.
Scope intentionally limited to src/ext/collectives/. The following are
left untouched on purpose:
- Bootstrap::getNranksPerNode() — physical-host detection, semantics
unchanged.
- Algorithm::Constraint::nRanksPerNode (public API in
include/mscclpp/algorithm.hpp) and the DSL plan config in
algorithm_collection_builder.cc — these describe a plan's required
physical topology.
- NCCL adapter (src/ext/nccl/) — preserves NCCL ABI compatibility.
- MAX_NRANKS_PER_NODE — sizing constant for shared-memory arrays.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -11,8 +11,8 @@ namespace collective {
|
||||
template <bool IsOutOfPlace>
|
||||
__global__ void __launch_bounds__(1024, 1)
|
||||
allgatherFullmesh(void* buff, void* scratch, void* resultBuff, DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
int rank, int nRanksPerNode, [[maybe_unused]] int worldSize, size_t nelems) {
|
||||
const int nPeer = nRanksPerNode - 1;
|
||||
int rank, int ipcDomainNranks, [[maybe_unused]] int worldSize, size_t nelems) {
|
||||
const int nPeer = ipcDomainNranks - 1;
|
||||
const size_t chanOffset = nPeer * blockIdx.x;
|
||||
// assume (nelems * sizeof(T)) is divisible by 16
|
||||
const size_t nInt4 = nelems * sizeof(int) / sizeof(int4);
|
||||
@@ -129,11 +129,11 @@ CommResult AllgatherFullmesh::allgatherKernelFunc(const std::shared_ptr<void> ct
|
||||
if ((char*)input == (char*)output + rank * inputSize) {
|
||||
allgatherFullmesh<false><<<numBlocksAndThreads.first, numBlocksAndThreads.second, 0, stream>>>(
|
||||
(void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank,
|
||||
ctx->nRanksPerNode, ctx->workSize, nElem);
|
||||
ctx->ipcDomainNranks, ctx->workSize, nElem);
|
||||
} else {
|
||||
allgatherFullmesh<true><<<numBlocksAndThreads.first, numBlocksAndThreads.second, 0, stream>>>(
|
||||
(void*)input, this->scratchBuffer_, (void*)output, ctx->memoryChannelDeviceHandles.get(), rank,
|
||||
ctx->nRanksPerNode, ctx->workSize, nElem);
|
||||
ctx->ipcDomainNranks, ctx->workSize, nElem);
|
||||
}
|
||||
cudaError_t err = cudaGetLastError();
|
||||
if (err != cudaSuccess) {
|
||||
@@ -150,7 +150,7 @@ std::shared_ptr<void> AllgatherFullmesh::initAllgatherContext(std::shared_ptr<Co
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode();
|
||||
|
||||
// setup semaphores
|
||||
ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection);
|
||||
|
||||
@@ -12,7 +12,7 @@ __device__ DeviceSyncer deviceSyncer;
|
||||
template <bool IsOutOfPlace>
|
||||
__global__ void __launch_bounds__(1024, 1)
|
||||
allgatherFullmesh2(void* sendbuff, mscclpp::DeviceHandle<mscclpp::MemoryChannel>* memoryChannels,
|
||||
size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t nRanksPerNode,
|
||||
size_t channelOutOffset, size_t rank, [[maybe_unused]] size_t worldSize, size_t ipcDomainNranks,
|
||||
size_t nelemsPerGPU) {
|
||||
const size_t tid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
const size_t lid = tid % WARP_SIZE;
|
||||
@@ -20,7 +20,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
|
||||
const size_t nThread = blockDim.x * gridDim.x;
|
||||
const size_t nWarp = nThread / WARP_SIZE;
|
||||
const size_t nPeer = nRanksPerNode - 1;
|
||||
const size_t nPeer = ipcDomainNranks - 1;
|
||||
const size_t chanOffset = nPeer * blockIdx.x;
|
||||
auto memChans = memoryChannels + chanOffset;
|
||||
|
||||
@@ -140,11 +140,11 @@ CommResult AllgatherFullmesh2::allgatherKernelFunc(const std::shared_ptr<void> c
|
||||
if ((char*)input == (char*)output + rank * inputSize) {
|
||||
allgatherFullmesh2<false><<<numBlocksAndThreads.first, numBlocksAndThreads.second, 0, stream>>>(
|
||||
(void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize,
|
||||
ctx->nRanksPerNode, nElem);
|
||||
ctx->ipcDomainNranks, nElem);
|
||||
} else {
|
||||
allgatherFullmesh2<true><<<numBlocksAndThreads.first, numBlocksAndThreads.second, 0, stream>>>(
|
||||
(void*)input, ctx->memoryChannelDeviceHandles.get(), channelOutOffset, ctx->rank, ctx->workSize,
|
||||
ctx->nRanksPerNode, nElem);
|
||||
ctx->ipcDomainNranks, nElem);
|
||||
}
|
||||
cudaError_t err = cudaGetLastError();
|
||||
if (err != cudaSuccess) {
|
||||
@@ -159,7 +159,7 @@ std::shared_ptr<void> AllgatherFullmesh2::initAllgatherContext(std::shared_ptr<m
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode();
|
||||
|
||||
// setup semaphores
|
||||
ctx->memorySemaphores = this->memorySemaphores_;
|
||||
|
||||
@@ -14,11 +14,11 @@ namespace collective {
|
||||
|
||||
template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
__global__ void allreduceAllPairs(T* buff, T* scratch, T* resultBuff, DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerNode,
|
||||
size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks,
|
||||
int worldSize, size_t nelems, uint32_t numScratchBuff, void* flags,
|
||||
uint32_t flagSize) {
|
||||
if (sizeof(T) == 2 || sizeof(T) == 1) nelems = (nelems * sizeof(T) + sizeof(T)) / sizeof(int);
|
||||
const int nPeers = nRanksPerNode - 1;
|
||||
const int nPeers = ipcDomainNranks - 1;
|
||||
|
||||
uint32_t flag = ((uint32_t*)flags)[blockIdx.x];
|
||||
size_t scratchBaseOffset = (flag % numScratchBuff) ? (scratchBufferSize / numScratchBuff) : 0;
|
||||
@@ -72,7 +72,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct AllpairAdapter {
|
||||
static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*,
|
||||
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, size_t channelInOffset, size_t,
|
||||
size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize,
|
||||
size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize,
|
||||
cudaStream_t stream, void* flags, uint32_t flagSize, uint32_t numScratchBuff, int nBlocks = 0,
|
||||
int nThreadsPerBlock = 0) {
|
||||
using ChannelType = DeviceHandle<MemoryChannel>;
|
||||
@@ -84,7 +84,7 @@ struct AllpairAdapter {
|
||||
}
|
||||
allreduceAllPairs<OpType, T, AccumT><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
(T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank,
|
||||
nRanksPerNode, worldSize, nelems, numScratchBuff, flags, flagSize);
|
||||
ipcDomainNranks, worldSize, nelems, numScratchBuff, flags, flagSize);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
};
|
||||
@@ -108,7 +108,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptr<voi
|
||||
blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, algoCtx->workSize);
|
||||
}
|
||||
// nBlocks must be at least nPeers for allpair — each block maps to one peer.
|
||||
const int nPeers = algoCtx->nRanksPerNode - 1;
|
||||
const int nPeers = algoCtx->ipcDomainNranks - 1;
|
||||
if (nPeers > 0 && blockAndThreadNum.first < nPeers) {
|
||||
return CommResult::CommInvalidArgument;
|
||||
}
|
||||
@@ -124,7 +124,7 @@ CommResult AllreduceAllpairPacket::allreduceKernelFunc(const std::shared_ptr<voi
|
||||
}
|
||||
cudaError_t error =
|
||||
allreduce(input, this->scratchBuffer_, output, algoCtx->memoryChannelDeviceHandles.get(), nullptr, nullptr,
|
||||
nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->nRanksPerNode,
|
||||
nullptr, channelInOffset, 0, this->scratchBufferSize_, algoCtx->rank, algoCtx->ipcDomainNranks,
|
||||
algoCtx->workSize, inputSize, stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_,
|
||||
this->nSegmentsForScratchBuffer_, blockAndThreadNum.first, blockAndThreadNum.second);
|
||||
if (error != cudaSuccess) {
|
||||
@@ -140,7 +140,7 @@ std::shared_ptr<void> AllreduceAllpairPacket::initAllreduceContext(std::shared_p
|
||||
const int nChannelsPerConnection = maxBlockNum_;
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = getIpcDomainNranks(comm);
|
||||
ctx->ipcDomainNranks = getIpcDomainNranks(comm);
|
||||
ctx->memorySemaphores = this->memorySemaphores_;
|
||||
ctx->registeredMemories = this->registeredMemories_;
|
||||
ctx->registeredMemories.pop_back(); // remove the local memory from previous context
|
||||
|
||||
@@ -13,8 +13,8 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
__global__ void __launch_bounds__(512, 1)
|
||||
allreduceFullmesh(T* buff, T* scratch, T* resultBuff, DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
DeviceHandle<MemoryChannel>* memoryOutChannels, size_t channelOutDataOffset, int rank,
|
||||
int nRanksPerNode, int worldSize, size_t nelems) {
|
||||
const int nPeer = nRanksPerNode - 1;
|
||||
int ipcDomainNranks, int worldSize, size_t nelems) {
|
||||
const int nPeer = ipcDomainNranks - 1;
|
||||
const size_t chanOffset = nPeer * blockIdx.x;
|
||||
// assume (nelems * sizeof(T)) is divisible by (16 * worldSize)
|
||||
const size_t nInt4 = nelems * sizeof(T) / sizeof(int4);
|
||||
@@ -159,7 +159,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct AllreduceAllconnectAdapter {
|
||||
static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* memoryOutChannels,
|
||||
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, size_t,
|
||||
size_t channelOutDataOffset, size_t, int rank, int nRanksPerNode, int worldSize,
|
||||
size_t channelOutDataOffset, size_t, int rank, int ipcDomainNranks, int worldSize,
|
||||
size_t inputSize, cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks,
|
||||
int nThreadsPerBlock) {
|
||||
using ChannelType = DeviceHandle<MemoryChannel>;
|
||||
@@ -168,7 +168,7 @@ struct AllreduceAllconnectAdapter {
|
||||
if (nThreadsPerBlock == 0) nThreadsPerBlock = 512;
|
||||
allreduceFullmesh<OpType, T, AccumT><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
(T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, (ChannelType*)memoryOutChannels,
|
||||
channelOutDataOffset, rank, nRanksPerNode, worldSize, nelems);
|
||||
channelOutDataOffset, rank, ipcDomainNranks, worldSize, nelems);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
};
|
||||
@@ -225,7 +225,7 @@ CommResult AllreduceFullmesh::allreduceKernelFunc(
|
||||
}
|
||||
cudaError_t error =
|
||||
allreduce(input, this->scratchBuffer_, output, inputChannelHandles.get(), ctx->memoryChannelDeviceHandles.get(),
|
||||
nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize,
|
||||
nullptr, nullptr, 0, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize,
|
||||
stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN("AllreduceAllconnect failed with error: %s", cudaGetErrorString(error));
|
||||
@@ -252,7 +252,7 @@ std::shared_ptr<void> AllreduceFullmesh::initAllreduceContext(std::shared_ptr<Co
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode();
|
||||
|
||||
// setup semaphores
|
||||
ctx->memorySemaphores = this->outputSemaphores_;
|
||||
|
||||
@@ -20,15 +20,15 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
[[maybe_unused]] DeviceHandle<BaseMemoryChannel>* memoryChannels,
|
||||
[[maybe_unused]] DeviceHandle<SwitchChannel>* switchChannels,
|
||||
[[maybe_unused]] size_t size, [[maybe_unused]] size_t scratchBufferSize,
|
||||
[[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerNode) {
|
||||
[[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) {
|
||||
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900
|
||||
constexpr int alignment = 16;
|
||||
int nPeers = nRanksPerNode - 1;
|
||||
int nBlocksForCopy = nRanksPerNode * 2;
|
||||
int nBlocksForReduce = nRanksPerNode;
|
||||
int nPeers = ipcDomainNranks - 1;
|
||||
int nBlocksForCopy = ipcDomainNranks * 2;
|
||||
int nBlocksForReduce = ipcDomainNranks;
|
||||
int copyReduceRatio = nBlocksForCopy / nBlocksForReduce;
|
||||
size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode;
|
||||
size_t sizePerRank = size / nRanksPerNode;
|
||||
size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks;
|
||||
size_t sizePerRank = size / ipcDomainNranks;
|
||||
assert(sizePerRank % alignment == 0);
|
||||
uint32_t sizePerBlock =
|
||||
((sizePerRank + (nBlocksForCopy - 1)) / nBlocksForCopy + alignment - 1) / alignment * alignment;
|
||||
@@ -68,7 +68,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
deviceSemaphore[bid + 2 * nBlocksForCopy].acquire();
|
||||
}
|
||||
__syncthreads();
|
||||
for (int i = 0; i < nRanksPerNode; i++) {
|
||||
for (int i = 0; i < ipcDomainNranks; i++) {
|
||||
size_t blockOffset = it * unitSize + bid * sizePerBlock + i * sizePerRank;
|
||||
uint32_t scratchOffset = scratchIt * unitSize + bid * scratchSizePerBlock + i * scratchSizePerRank;
|
||||
char* srcData = (char*)src + blockOffset;
|
||||
@@ -125,7 +125,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
channels->wait();
|
||||
}
|
||||
__syncthreads();
|
||||
for (int i = 0; i < nRanksPerNode; i++) {
|
||||
for (int i = 0; i < ipcDomainNranks; i++) {
|
||||
size_t blockOffset = it * unitSize + (bid - nBlocksForCopy - nBlocksForReduce) * sizePerBlock + i * sizePerRank;
|
||||
uint32_t scratchOffset = scratchIt * unitSize +
|
||||
(bid - nBlocksForCopy - nBlocksForReduce) * scratchSizePerBlock +
|
||||
@@ -150,7 +150,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct NvlsBlockPipelineAdapter {
|
||||
static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*,
|
||||
DeviceHandle<SwitchChannel>* nvlsChannels, DeviceHandle<SwitchChannel>*, size_t, size_t,
|
||||
size_t scratchBufferSize, int rank, int nRanksPerNode, int, size_t inputSize,
|
||||
size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize,
|
||||
cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) {
|
||||
// uint8_t is not supported for NVLS (no hardware support for byte-level reduction)
|
||||
if constexpr (std::is_same_v<T, uint8_t>) {
|
||||
@@ -166,9 +166,9 @@ struct NvlsBlockPipelineAdapter {
|
||||
#endif
|
||||
{
|
||||
using ChannelType = DeviceHandle<BaseMemoryChannel>;
|
||||
allreduceNvlsBlockPipeline<T>
|
||||
<<<nBlocks, nThreadsPerBlock, 0, stream>>>(input, scratch, output, (ChannelType*)memoryChannels,
|
||||
nvlsChannels, inputSize, scratchBufferSize, rank, nRanksPerNode);
|
||||
allreduceNvlsBlockPipeline<T><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank,
|
||||
ipcDomainNranks);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
}
|
||||
@@ -200,11 +200,11 @@ CommResult AllreduceNvlsBlockPipeline::allreduceKernelFunc(const std::shared_ptr
|
||||
}
|
||||
std::pair<int, int> blockAndThreadNum = {nBlocks, nThreadsPerBlock};
|
||||
if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) {
|
||||
blockAndThreadNum = {ctx->nRanksPerNode * 5, 1024};
|
||||
blockAndThreadNum = {ctx->ipcDomainNranks * 5, 1024};
|
||||
}
|
||||
cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr,
|
||||
ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_,
|
||||
ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, nullptr, 0, 0,
|
||||
ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0,
|
||||
blockAndThreadNum.first, blockAndThreadNum.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN("AllreduceNvlsBlockPipeline failed with error: %s", cudaGetErrorString(error));
|
||||
@@ -222,7 +222,7 @@ std::shared_ptr<void> AllreduceNvlsBlockPipeline::initAllreduceContext(std::shar
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode();
|
||||
|
||||
// setup channels
|
||||
ctx->switchChannels =
|
||||
|
||||
@@ -94,7 +94,7 @@ std::shared_ptr<void> AllreduceNvlsPacket::initAllreduceContext(std::shared_ptr<
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = getIpcDomainNranks(comm);
|
||||
ctx->ipcDomainNranks = getIpcDomainNranks(comm);
|
||||
|
||||
// setup channels
|
||||
ctx->switchChannels = this->switchChannels_;
|
||||
@@ -123,7 +123,7 @@ CommResult AllreduceNvlsPacket::allreduceKernelFunc(const std::shared_ptr<void>
|
||||
}
|
||||
cudaError_t error =
|
||||
allreduce(input, this->scratchBuffer_, output, nullptr, nullptr, ctx->switchChannelDeviceHandles.get(), nullptr,
|
||||
0, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream,
|
||||
0, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream,
|
||||
(void*)flagBuffer_, (uint32_t)flagBufferSize_, 0, blockAndThreadNum.first, blockAndThreadNum.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN(ALGO, "AllreduceNvlsPacket failed with error: ", cudaGetErrorString(error));
|
||||
|
||||
@@ -18,15 +18,15 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
[[maybe_unused]] DeviceHandle<BaseMemoryChannel>* memoryChannels,
|
||||
[[maybe_unused]] DeviceHandle<SwitchChannel>* multicast, [[maybe_unused]] size_t size,
|
||||
[[maybe_unused]] size_t scratchBufferSize, [[maybe_unused]] int rank,
|
||||
[[maybe_unused]] int nRanksPerNode) {
|
||||
[[maybe_unused]] int ipcDomainNranks) {
|
||||
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900
|
||||
constexpr int alignment = 16;
|
||||
int nPeers = nRanksPerNode - 1;
|
||||
int nPeers = ipcDomainNranks - 1;
|
||||
int nBlocks = gridDim.x;
|
||||
int nBlocksPerNvlsConn = nBlocks / NUM_NVLS_CONNECTION;
|
||||
int bid = blockIdx.x;
|
||||
size_t sizePerRank = size / nRanksPerNode;
|
||||
size_t scratchSizePerRank = scratchBufferSize / nRanksPerNode;
|
||||
size_t sizePerRank = size / ipcDomainNranks;
|
||||
size_t scratchSizePerRank = scratchBufferSize / ipcDomainNranks;
|
||||
const size_t maxSizePerBlock = ((sizePerRank + nBlocks - 1) / nBlocks + alignment - 1) / alignment * alignment;
|
||||
size_t start = bid * maxSizePerBlock;
|
||||
size_t end = min(start + maxSizePerBlock, sizePerRank);
|
||||
@@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
lastIterSize = sizePerBlock % copyPerIter;
|
||||
}
|
||||
|
||||
const size_t chanOffset = (nRanksPerNode - 1) * blockIdx.x * 2;
|
||||
const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x * 2;
|
||||
auto memoryChans = memoryChannels + chanOffset;
|
||||
__shared__ DeviceHandle<BaseMemoryChannel> channels[(MAX_NRANKS_PER_NODE - 1) * 2];
|
||||
const int lid = threadIdx.x % WARP_SIZE;
|
||||
@@ -68,7 +68,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
const size_t iterSize = (it == nIter - 1) ? lastIterSize : copyPerIter;
|
||||
if (warpId < endCopyWid) {
|
||||
int tidInCopy = threadIdx.x;
|
||||
for (int i = 0; i < nRanksPerNode; i++) {
|
||||
for (int i = 0; i < ipcDomainNranks; i++) {
|
||||
size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter;
|
||||
size_t offsetScratch =
|
||||
i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock;
|
||||
@@ -99,7 +99,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
channels[tidInRecvCopy + nPeers].wait();
|
||||
}
|
||||
asm volatile("bar.sync %0, %1;" ::"r"(3), "r"((NRECV_COPY_WARPS)*WARP_SIZE) : "memory");
|
||||
for (int i = 0; i < nRanksPerNode; i++) {
|
||||
for (int i = 0; i < ipcDomainNranks; i++) {
|
||||
size_t offset = i * sizePerRank + maxSizePerBlock * bid + it * copyPerIter;
|
||||
size_t offsetScratch =
|
||||
i * scratchSizePerRank + scratchSizePerBlock * bid + (it * copyPerIter) % scratchSizePerBlock;
|
||||
@@ -116,7 +116,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct NvlsWarpPipelineAdapter {
|
||||
static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void*,
|
||||
DeviceHandle<SwitchChannel>* nvlsChannels, DeviceHandle<SwitchChannel>*, size_t, size_t,
|
||||
size_t scratchBufferSize, int rank, int nRanksPerNode, int, size_t inputSize,
|
||||
size_t scratchBufferSize, int rank, int ipcDomainNranks, int, size_t inputSize,
|
||||
cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) {
|
||||
// uint8_t is not supported for NVLS (no hardware support for byte-level reduction)
|
||||
if constexpr (std::is_same_v<T, uint8_t>) {
|
||||
@@ -132,9 +132,9 @@ struct NvlsWarpPipelineAdapter {
|
||||
#endif
|
||||
{
|
||||
using ChannelType = DeviceHandle<BaseMemoryChannel>;
|
||||
allreduceNvlsWarpPipeline<T>
|
||||
<<<nBlocks, nThreadsPerBlock, 0, stream>>>(input, scratch, output, (ChannelType*)memoryChannels,
|
||||
nvlsChannels, inputSize, scratchBufferSize, rank, nRanksPerNode);
|
||||
allreduceNvlsWarpPipeline<T><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
input, scratch, output, (ChannelType*)memoryChannels, nvlsChannels, inputSize, scratchBufferSize, rank,
|
||||
ipcDomainNranks);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
}
|
||||
@@ -165,11 +165,11 @@ CommResult AllreduceNvlsWarpPipeline::allreduceKernelFunc(
|
||||
}
|
||||
std::pair<int, int> blockAndThreadNum = {nBlocks, nThreadsPerBlock};
|
||||
if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) {
|
||||
blockAndThreadNum = {ctx->nRanksPerNode * 4, 1024};
|
||||
blockAndThreadNum = {ctx->ipcDomainNranks * 4, 1024};
|
||||
}
|
||||
cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->memoryChannelsDeviceHandle_.get(), nullptr,
|
||||
ctx->switchChannelDeviceHandles.get(), nullptr, 0, 0, this->scratchBufferSize_,
|
||||
ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize, stream, nullptr, 0, 0,
|
||||
ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize, stream, nullptr, 0, 0,
|
||||
blockAndThreadNum.first, blockAndThreadNum.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN("AllreduceNvlsWarpPipeline failed with error: %s", cudaGetErrorString(error));
|
||||
@@ -187,7 +187,7 @@ std::shared_ptr<void> AllreduceNvlsWarpPipeline::initAllreduceContext(std::share
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode();
|
||||
|
||||
// setup channels
|
||||
ctx->switchChannels =
|
||||
|
||||
@@ -19,12 +19,12 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
[[maybe_unused]] mscclpp::DeviceHandle<mscclpp::SwitchChannel>* multicast,
|
||||
[[maybe_unused]] mscclpp::DeviceHandle<mscclpp::SwitchChannel>* multicastOut,
|
||||
[[maybe_unused]] size_t channelInOffset, [[maybe_unused]] size_t channelOutOffset,
|
||||
[[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int nRanksPerNode) {
|
||||
[[maybe_unused]] size_t size, [[maybe_unused]] int rank, [[maybe_unused]] int ipcDomainNranks) {
|
||||
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900
|
||||
int nPeers = nRanksPerNode - 1;
|
||||
int nPeers = ipcDomainNranks - 1;
|
||||
int nBlocks = gridDim.x;
|
||||
int bid = blockIdx.x;
|
||||
size_t sizePerRank = size / nRanksPerNode;
|
||||
size_t sizePerRank = size / ipcDomainNranks;
|
||||
const size_t minAlign = 16;
|
||||
// Align sizePerBlock to 16 bytes to ensure aligned vector access in handleMultiLoadReduceStore
|
||||
size_t sizePerBlock = (sizePerRank + nBlocks - 1) / nBlocks;
|
||||
@@ -40,14 +40,14 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
mscclpp::DeviceHandle<mscclpp::SwitchChannel>* multicastPtr = multicast + bid;
|
||||
mscclpp::DeviceHandle<mscclpp::SwitchChannel>* multicastOutPtr = multicastOut + bid;
|
||||
|
||||
const size_t chanOffset = (nRanksPerNode - 1) * blockIdx.x;
|
||||
const size_t chanOffset = (ipcDomainNranks - 1) * blockIdx.x;
|
||||
auto memoryChans = memoryChannels + chanOffset;
|
||||
__shared__ mscclpp::DeviceHandle<mscclpp::BaseMemoryChannel> channels[MAX_NRANKS_PER_NODE - 1];
|
||||
const int lid = threadIdx.x % WARP_SIZE;
|
||||
// Each warp redundantly loads all entries (same value, benign race) so that
|
||||
// every warp has the data its threads will read after __syncwarp(). Required
|
||||
// when nPeers > WARP_SIZE (MNNVL/NVL72 → 71 peers).
|
||||
for (int i = lid; i < nRanksPerNode - 1; i += WARP_SIZE) {
|
||||
for (int i = lid; i < ipcDomainNranks - 1; i += WARP_SIZE) {
|
||||
channels[i] = memoryChans[i];
|
||||
}
|
||||
__syncwarp();
|
||||
@@ -75,7 +75,7 @@ struct NvlsAdapter {
|
||||
static cudaError_t call(const void*, void*, void*, void* memoryChannels, void*,
|
||||
mscclpp::DeviceHandle<mscclpp::SwitchChannel>* nvlsChannels,
|
||||
mscclpp::DeviceHandle<mscclpp::SwitchChannel>* nvlsOutChannels, size_t channelInOffset,
|
||||
size_t channelOutOffset, size_t, int rank, int nRanksPerNode, int, size_t inputSize,
|
||||
size_t channelOutOffset, size_t, int rank, int ipcDomainNranks, int, size_t inputSize,
|
||||
cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) {
|
||||
// uint8_t is not supported for NVLS (no hardware support for byte-level reduction)
|
||||
if constexpr (std::is_same_v<T, uint8_t>) {
|
||||
@@ -93,7 +93,7 @@ struct NvlsAdapter {
|
||||
using ChannelType = DeviceHandle<mscclpp::BaseMemoryChannel>;
|
||||
allreduceNvls<T><<<nBlocks, nThreadsPerBlock, 0, stream>>>((ChannelType*)memoryChannels, nvlsChannels,
|
||||
nvlsOutChannels, channelInOffset, channelOutOffset,
|
||||
inputSize, rank, nRanksPerNode);
|
||||
inputSize, rank, ipcDomainNranks);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
}
|
||||
@@ -145,7 +145,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr<void> ctx_vo
|
||||
}
|
||||
std::pair<int, int> numBlocksAndThreads = {nBlocks, nThreadsPerBlock};
|
||||
if (numBlocksAndThreads.first == 0 || numBlocksAndThreads.second == 0) {
|
||||
numBlocksAndThreads = {::min(ctx->nRanksPerNode, MAX_NBLOCKS), 1024};
|
||||
numBlocksAndThreads = {::min(ctx->ipcDomainNranks, MAX_NBLOCKS), 1024};
|
||||
// For GB200 devices with MNNVLS (Multi-Node NVLink Sharp), scale the number of blocks inversely with
|
||||
// the number of GPUs. Empirically, 32 blocks works well for 4 GPUs and 16 for 8 GPUs, which
|
||||
// follows the formula 128 / nGPUs, clamped to [1, MAX_NBLOCKS].
|
||||
@@ -159,7 +159,7 @@ CommResult AllreduceNvls::allreduceKernelFunc(const std::shared_ptr<void> ctx_vo
|
||||
}
|
||||
cudaError_t error =
|
||||
allreduce(nullptr, nullptr, nullptr, this->memoryChannelsDeviceHandle_.get(), nullptr, nvlsChannels,
|
||||
nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->nRanksPerNode, ctx->workSize,
|
||||
nvlsOutChannels, channelInOffset, channelOutOffset, 0, ctx->rank, ctx->ipcDomainNranks, ctx->workSize,
|
||||
inputSize, stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN("AllreduceNvls failed with error: %s", cudaGetErrorString(error));
|
||||
@@ -183,7 +183,7 @@ std::shared_ptr<void> AllreduceNvls::initAllreduceContext(std::shared_ptr<mscclp
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = getIpcDomainNranks(comm);
|
||||
ctx->ipcDomainNranks = getIpcDomainNranks(comm);
|
||||
|
||||
size_t sendBytes, recvBytes;
|
||||
CUdeviceptr sendBasePtr, recvBasePtr;
|
||||
|
||||
@@ -15,7 +15,7 @@ namespace collective {
|
||||
template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
__global__ void __launch_bounds__(1024, 1)
|
||||
allreducePacket(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle<mscclpp::MemoryChannel>* memoryChannels,
|
||||
size_t channelDataOffset, size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize,
|
||||
size_t channelDataOffset, size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize,
|
||||
size_t nelems, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff
|
||||
#if defined(ENABLE_NPKIT)
|
||||
,
|
||||
@@ -53,7 +53,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
else
|
||||
nelems = nelems / (sizeof(int) / sizeof(T));
|
||||
|
||||
const int nPeers = nRanksPerNode - 1;
|
||||
const int nPeers = ipcDomainNranks - 1;
|
||||
const size_t nPkts = nelems / 2;
|
||||
|
||||
uint32_t flag = ((uint32_t*)flags)[blockIdx.x];
|
||||
@@ -156,7 +156,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct PacketAdapter {
|
||||
static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*,
|
||||
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, size_t channelInOffset, size_t,
|
||||
size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize,
|
||||
size_t scratchBufferSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize,
|
||||
cudaStream_t stream, void* flags, uint32_t flagBufferSize, uint32_t numScratchBuff,
|
||||
int nBlocks = 0, int nThreadsPerBlock = 0) {
|
||||
using ChannelType = DeviceHandle<MemoryChannel>;
|
||||
@@ -167,20 +167,20 @@ struct PacketAdapter {
|
||||
size_t sharedMemSize = sizeof(NpKitEvent) * NPKIT_SHM_NUM_EVENTS;
|
||||
allreducePacket<OpType, T, AccumT><<<nBlocks, nThreadsPerBlock, sharedMemSize, stream>>>(
|
||||
(T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank,
|
||||
nRanksPerNode, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(),
|
||||
ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff, NpKit::GetGpuEventCollectContexts(),
|
||||
NpKit::GetCpuTimestamp());
|
||||
#else
|
||||
allreducePacket<OpType, T, AccumT><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
(T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank,
|
||||
nRanksPerNode, worldSize, nelems, flags, flagBufferSize, numScratchBuff);
|
||||
ipcDomainNranks, worldSize, nelems, flags, flagBufferSize, numScratchBuff);
|
||||
#endif
|
||||
return cudaGetLastError();
|
||||
}
|
||||
};
|
||||
|
||||
inline std::pair<int, int> getDefaultBlockNumAndThreadNum(size_t inputSize, int nRanksPerNode, int worldSize,
|
||||
inline std::pair<int, int> getDefaultBlockNumAndThreadNum(size_t inputSize, int ipcDomainNranks, int worldSize,
|
||||
[[maybe_unused]] DataType dtype) {
|
||||
int nBlocks = (nRanksPerNode - 1) * 4;
|
||||
int nBlocks = (ipcDomainNranks - 1) * 4;
|
||||
int nThreadsPerBlock = 1024;
|
||||
if (inputSize >= 32768) {
|
||||
nBlocks = (worldSize - 1) * 8;
|
||||
@@ -232,7 +232,7 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr<void> ctx_
|
||||
auto ctx = std::static_pointer_cast<AlgorithmCtx>(ctx_void);
|
||||
std::pair<int, int> blockAndThreadNum = {nBlocks, nThreadsPerBlock};
|
||||
if (blockAndThreadNum.first == 0 || blockAndThreadNum.second == 0) {
|
||||
blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->workSize, ctx->nRanksPerNode, dtype);
|
||||
blockAndThreadNum = getDefaultBlockNumAndThreadNum(inputSize, ctx->workSize, ctx->ipcDomainNranks, dtype);
|
||||
}
|
||||
|
||||
size_t sendBytes;
|
||||
@@ -248,7 +248,7 @@ CommResult AllreducePacket::allreduceKernelFunc(const std::shared_ptr<void> ctx_
|
||||
}
|
||||
cudaError_t error =
|
||||
allreduce(input, this->scratchBuffer_, output, ctx->memoryChannelDeviceHandles.get(), nullptr, nullptr, nullptr,
|
||||
channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->nRanksPerNode, ctx->workSize, inputSize,
|
||||
channelInOffset, 0, this->scratchBufferSize_, ctx->rank, ctx->ipcDomainNranks, ctx->workSize, inputSize,
|
||||
stream, (void*)flagBuffer_, (uint32_t)flagBufferSize_, this->nSegmentsForScratchBuffer_,
|
||||
blockAndThreadNum.first, blockAndThreadNum.second);
|
||||
if (error != cudaSuccess) {
|
||||
@@ -264,7 +264,7 @@ std::shared_ptr<void> AllreducePacket::initAllreduceContext(std::shared_ptr<Comm
|
||||
const int nChannelsPerConnection = maxBlockNum_;
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = getIpcDomainNranks(comm);
|
||||
ctx->ipcDomainNranks = getIpcDomainNranks(comm);
|
||||
ctx->memorySemaphores = this->memorySemaphores_;
|
||||
ctx->registeredMemories = this->registeredMemories_;
|
||||
ctx->registeredMemories.pop_back(); // remove the local memory from previous context
|
||||
|
||||
@@ -31,18 +31,18 @@ namespace collective {
|
||||
template <ReduceOp OpType, typename T>
|
||||
__global__ void __launch_bounds__(1024, 1)
|
||||
allreduceRsAg(T* buff, T* scratch, T* resultBuff, DeviceHandle<BaseMemoryChannel>* memoryChannels,
|
||||
DeviceHandle<SwitchChannel>* switchChannels, void* remoteMemories, int rank, int nRanksPerNode,
|
||||
DeviceHandle<SwitchChannel>* switchChannels, void* remoteMemories, int rank, int ipcDomainNranks,
|
||||
int worldSize, size_t nelems) {
|
||||
int blockId = blockIdx.x;
|
||||
uint32_t nPeers = nRanksPerNode - 1;
|
||||
uint32_t nPeers = ipcDomainNranks - 1;
|
||||
|
||||
assert((uintptr_t)buff % sizeof(int4) == 0);
|
||||
assert((uintptr_t)resultBuff % sizeof(int4) == 0);
|
||||
|
||||
constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T);
|
||||
uint32_t alignedNelems = ((nelems + nRanksPerNode - 1) / nRanksPerNode + nelemsPerInt4 - 1) / nelemsPerInt4 *
|
||||
nelemsPerInt4 * nRanksPerNode;
|
||||
uint32_t nelemsPerRank = alignedNelems / nRanksPerNode;
|
||||
uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 *
|
||||
nelemsPerInt4 * ipcDomainNranks;
|
||||
uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks;
|
||||
uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4;
|
||||
uint32_t lastInt4Index = nelems / nelemsPerInt4;
|
||||
uint32_t remainder = nelems % nelemsPerInt4;
|
||||
@@ -59,7 +59,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
nInt4PerBlock += remainderForBlock;
|
||||
}
|
||||
if (nInt4PerBlock == 0) return;
|
||||
uint32_t nInt4ForCopy = nInt4PerBlock * nRanksPerNode;
|
||||
uint32_t nInt4ForCopy = nInt4PerBlock * ipcDomainNranks;
|
||||
|
||||
for (uint32_t idx = threadIdx.x; idx < nInt4ForCopy; idx += blockDim.x) {
|
||||
int rankIdx = idx / nInt4PerBlock;
|
||||
@@ -84,13 +84,13 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
if (offset > lastInt4Index) continue;
|
||||
int4 tmp = scratch4[offset];
|
||||
for (uint32_t i = 0; i < nPeers; i++) {
|
||||
int rankIdx = (rank + i + 1) % nRanksPerNode;
|
||||
int rankIdx = (rank + i + 1) % ipcDomainNranks;
|
||||
int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1;
|
||||
int4 data = mscclpp::read<int4>(((void**)remoteMemories)[peerIdx], offset);
|
||||
tmp = calVector<T, OpType>(data, tmp);
|
||||
}
|
||||
for (uint32_t i = 0; i < nPeers; i++) {
|
||||
int rankIdx = (rank + i + 1) % nRanksPerNode;
|
||||
int rankIdx = (rank + i + 1) % ipcDomainNranks;
|
||||
int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1;
|
||||
mscclpp::write<int4>(((void**)remoteMemories)[peerIdx], offset, tmp);
|
||||
}
|
||||
@@ -127,7 +127,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct AllreduceRsAgAdapter {
|
||||
static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories,
|
||||
DeviceHandle<SwitchChannel>* switchChannel, DeviceHandle<SwitchChannel>*, size_t, size_t,
|
||||
size_t, int rank, int nRanksPerNode, int worldSize, size_t inputSize, cudaStream_t stream,
|
||||
size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream,
|
||||
void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) {
|
||||
using ChannelType = DeviceHandle<BaseMemoryChannel>;
|
||||
size_t nelems = inputSize / sizeof(T);
|
||||
@@ -137,7 +137,7 @@ struct AllreduceRsAgAdapter {
|
||||
}
|
||||
allreduceRsAg<OpType, T><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
(T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank,
|
||||
nRanksPerNode, worldSize, nelems);
|
||||
ipcDomainNranks, worldSize, nelems);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
};
|
||||
@@ -185,7 +185,7 @@ CommResult AllreduceRsAg::allreduceKernelFunc(const std::shared_ptr<void> ctx, c
|
||||
}
|
||||
cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(),
|
||||
this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, 0, algoCtx->rank,
|
||||
algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, nullptr, 0, 0,
|
||||
algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr, 0, 0,
|
||||
numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error));
|
||||
@@ -203,7 +203,7 @@ std::shared_ptr<void> AllreduceRsAg::initAllreduceContext(std::shared_ptr<Commun
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = getIpcDomainNranks(comm);
|
||||
ctx->ipcDomainNranks = getIpcDomainNranks(comm);
|
||||
|
||||
ctx->memorySemaphores = this->scratchSemaphores_;
|
||||
ctx->registeredMemories = this->remoteScratchMemories_;
|
||||
|
||||
@@ -86,7 +86,7 @@ template <ReduceOp OpType, typename T>
|
||||
__global__ void __launch_bounds__(1024, 1)
|
||||
allreduceRsAgPipeline(T* buff, T* scratch, T* resultBuff, DeviceHandle<BaseMemoryChannel>* memoryChannels,
|
||||
DeviceHandle<SwitchChannel>* switchChannels, void* remoteMemories, int rank,
|
||||
int nRanksPerNode, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut,
|
||||
int ipcDomainNranks, int worldSize, size_t nelems, size_t scratchSize, uint32_t nblocksForPut,
|
||||
uint32_t nblocksForReduce, uint32_t nblocksForRecv) {
|
||||
uint32_t bid = blockIdx.x;
|
||||
constexpr uint32_t nStepsPerIter = 4;
|
||||
@@ -94,7 +94,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
uint32_t nInt4PerIter = nblocksForReduce * blockDim.x * nStepsPerIter;
|
||||
const uint32_t chunkSize = nInt4PerIter * worldSize;
|
||||
uint32_t nIters = (nInt4 + chunkSize - 1) / chunkSize;
|
||||
uint32_t nPeers = nRanksPerNode - 1;
|
||||
uint32_t nPeers = ipcDomainNranks - 1;
|
||||
int4* scratch4 = reinterpret_cast<int4*>((char*)scratch);
|
||||
const uint32_t scratchIterStride = 2 * chunkSize; // one for AS, one for AG
|
||||
const uint32_t pipelineDepth = scratchSize / sizeof(int4) / scratchIterStride;
|
||||
@@ -111,7 +111,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
__syncthreads();
|
||||
uint32_t threadIdInPut = bid * blockDim.x + threadIdx.x;
|
||||
for (uint32_t peer = 0; peer < nPeers; peer++) {
|
||||
int remoteRankId = (rank + peer + 1) % nRanksPerNode;
|
||||
int remoteRankId = (rank + peer + 1) % ipcDomainNranks;
|
||||
int peerId = remoteRankId < rank ? remoteRankId : remoteRankId - 1;
|
||||
// Read chunk[remoteRankId] from local buff, write to peer's scratch[rank] (sender's slot)
|
||||
uint32_t srcOffset = iter * chunkSize + remoteRankId * nInt4PerIter;
|
||||
@@ -164,7 +164,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
int4 tmp = loadVec(buff, myChunkOffset, nelems);
|
||||
// Add data from each peer's slot in scratch (peer sent their chunk[rank] to our scratch[peer])
|
||||
for (uint32_t peer = 0; peer < nPeers; peer++) {
|
||||
int remoteRankId = (rank + peer + 1) % nRanksPerNode;
|
||||
int remoteRankId = (rank + peer + 1) % ipcDomainNranks;
|
||||
uint32_t peerSlotOffset =
|
||||
baseOffset + remoteRankId * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut;
|
||||
int4 data = scratch4[peerSlotOffset];
|
||||
@@ -175,7 +175,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
uint32_t dstOffset =
|
||||
baseOffset + chunkSize + rank * nInt4PerIter + threadIdInPut + putStep * blockDim.x * nblocksForPut;
|
||||
for (uint32_t i = 0; i < nPeers; i++) {
|
||||
int peerIdx = (rank + i + 1) % nRanksPerNode;
|
||||
int peerIdx = (rank + i + 1) % ipcDomainNranks;
|
||||
int index = peerIdx < rank ? peerIdx : peerIdx - 1;
|
||||
mscclpp::write<int4>(((void**)remoteMemories)[index], dstOffset, tmp);
|
||||
}
|
||||
@@ -203,7 +203,7 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
__syncthreads();
|
||||
// Copy other ranks' reduced chunks from scratch to result
|
||||
for (uint32_t peer = 0; peer < nPeers; peer++) {
|
||||
int remoteRankId = (rank + peer + 1) % nRanksPerNode;
|
||||
int remoteRankId = (rank + peer + 1) % ipcDomainNranks;
|
||||
for (uint32_t step = 0; step < nStepsPerIter * REDUCE_COPY_RATIO; step++) {
|
||||
uint32_t offset = baseOffset + chunkSize + remoteRankId * nInt4PerIter + threadIdInRecv +
|
||||
step * blockDim.x * nblocksForRecv;
|
||||
@@ -224,7 +224,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct AllreduceRsAgPipelineAdapter {
|
||||
static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories,
|
||||
DeviceHandle<SwitchChannel>* switchChannel, DeviceHandle<SwitchChannel>*, size_t, size_t,
|
||||
size_t scratchSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize,
|
||||
size_t scratchSize, int rank, int ipcDomainNranks, int worldSize, size_t inputSize,
|
||||
cudaStream_t stream, void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) {
|
||||
using ChannelType = DeviceHandle<BaseMemoryChannel>;
|
||||
size_t nelems = inputSize / sizeof(T);
|
||||
@@ -248,7 +248,7 @@ struct AllreduceRsAgPipelineAdapter {
|
||||
}
|
||||
allreduceRsAgPipeline<OpType, T><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
(T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank,
|
||||
nRanksPerNode, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv);
|
||||
ipcDomainNranks, worldSize, nelems, scratchSize, nblocksForPut, nblocksForReduce, nblocksForRecv);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
};
|
||||
@@ -288,8 +288,8 @@ CommResult AllreduceRsAgPipeline::allreduceKernelFunc(
|
||||
std::pair<int, int> numBlocksAndThreads = {nBlocks, nThreadsPerBlock};
|
||||
cudaError_t error = allreduce(input, this->scratchBuffer_, output, this->baseMemoryChannelHandles_.get(),
|
||||
this->remoteMemoryHandles_.get(), nullptr, nullptr, 0, 0, this->scratchBufferSize_,
|
||||
algoCtx->rank, algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream, nullptr, 0,
|
||||
0, numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize, stream, nullptr,
|
||||
0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error));
|
||||
return CommResult::CommUnhandledCudaError;
|
||||
@@ -306,7 +306,7 @@ std::shared_ptr<void> AllreduceRsAgPipeline::initAllreduceContext(std::shared_pt
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->ipcDomainNranks = comm->bootstrap()->getNranksPerNode();
|
||||
|
||||
ctx->memorySemaphores = this->scratchSemaphores_;
|
||||
ctx->registeredMemories = this->remoteScratchMemories_;
|
||||
|
||||
@@ -35,7 +35,7 @@ __device__ mscclpp::DeviceSyncer globalSyncer;
|
||||
//
|
||||
// This approach requires registering both input and output buffers as remote
|
||||
// memories (2 * nPeers handles), but avoids scratch buffer allocation and
|
||||
// the extra copy steps of the standard RSAG. nRanksPerNode is accepted at
|
||||
// the extra copy steps of the standard RSAG. ipcDomainNranks is accepted at
|
||||
// runtime, which allows the same kernel to handle any NVLink-domain size
|
||||
// (including Multi-Node NVLink fabrics up to NVL72).
|
||||
|
||||
@@ -43,18 +43,18 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
__global__ void __launch_bounds__(1024, 1)
|
||||
allreduceRsAgZeroCopy(T* buff, T* scratch, T* resultBuff, DeviceHandle<BaseMemoryChannel>* memoryChannels,
|
||||
DeviceHandle<SwitchChannel>* switchChannels, void* remoteMemories, int rank,
|
||||
int nRanksPerNode, int worldSize, size_t nelems) {
|
||||
int ipcDomainNranks, int worldSize, size_t nelems) {
|
||||
int blockId = blockIdx.x;
|
||||
|
||||
assert((uintptr_t)buff % sizeof(int4) == 0);
|
||||
assert((uintptr_t)resultBuff % sizeof(int4) == 0);
|
||||
|
||||
const int NPeers = nRanksPerNode - 1;
|
||||
const int NPeers = ipcDomainNranks - 1;
|
||||
constexpr uint32_t nelemsPerInt4 = sizeof(int4) / sizeof(T);
|
||||
const uint32_t outputRemoteBufferOffset = NPeers;
|
||||
uint32_t alignedNelems = ((nelems + nRanksPerNode - 1) / nRanksPerNode + nelemsPerInt4 - 1) / nelemsPerInt4 *
|
||||
nelemsPerInt4 * nRanksPerNode;
|
||||
uint32_t nelemsPerRank = alignedNelems / nRanksPerNode;
|
||||
uint32_t alignedNelems = ((nelems + ipcDomainNranks - 1) / ipcDomainNranks + nelemsPerInt4 - 1) / nelemsPerInt4 *
|
||||
nelemsPerInt4 * ipcDomainNranks;
|
||||
uint32_t nelemsPerRank = alignedNelems / ipcDomainNranks;
|
||||
uint32_t nInt4PerRank = nelemsPerRank / nelemsPerInt4;
|
||||
uint32_t nInt4Total = (nelems + nelemsPerInt4 - 1) / nelemsPerInt4;
|
||||
|
||||
@@ -87,14 +87,14 @@ __global__ void __launch_bounds__(1024, 1)
|
||||
int4 data;
|
||||
AccumVec acc = mscclpp::upcastVector<T, AccumT, AccumVec>(tmp_raw);
|
||||
for (int i = 0; i < NPeers; i++) {
|
||||
int rankIdx = (rank + i + 1) % nRanksPerNode;
|
||||
int rankIdx = (rank + i + 1) % ipcDomainNranks;
|
||||
int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1;
|
||||
data = mscclpp::read<int4>(((void**)remoteMemories)[peerIdx], offset);
|
||||
acc = mscclpp::calVectorAccum<T, AccumT, OpType, AccumVec>(acc, data);
|
||||
}
|
||||
int4 tmp = mscclpp::downcastVector<T, AccumT, int4>(acc);
|
||||
for (int i = 0; i < NPeers; i++) {
|
||||
int rankIdx = (rank + i + 1) % nRanksPerNode;
|
||||
int rankIdx = (rank + i + 1) % ipcDomainNranks;
|
||||
int peerIdx = rankIdx < rank ? rankIdx : rankIdx - 1;
|
||||
mscclpp::write<int4>(((void**)remoteMemories)[outputRemoteBufferOffset + peerIdx], offset, tmp);
|
||||
}
|
||||
@@ -112,7 +112,7 @@ template <ReduceOp OpType, typename T, typename AccumT = T>
|
||||
struct AllreduceRsAgZeroCopyAdapter {
|
||||
static cudaError_t call(const void* input, void* scratch, void* output, void* memoryChannels, void* remoteMemories,
|
||||
DeviceHandle<SwitchChannel>* switchChannel, DeviceHandle<SwitchChannel>*, size_t, size_t,
|
||||
size_t, int rank, int nRanksPerNode, int worldSize, size_t inputSize, cudaStream_t stream,
|
||||
size_t, int rank, int ipcDomainNranks, int worldSize, size_t inputSize, cudaStream_t stream,
|
||||
void*, uint32_t, uint32_t, int nBlocks, int nThreadsPerBlock) {
|
||||
using ChannelType = DeviceHandle<BaseMemoryChannel>;
|
||||
size_t nelems = inputSize / sizeof(T);
|
||||
@@ -125,7 +125,7 @@ struct AllreduceRsAgZeroCopyAdapter {
|
||||
}
|
||||
allreduceRsAgZeroCopy<OpType, T, AccumT><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
|
||||
(T*)input, (T*)scratch, (T*)output, (ChannelType*)memoryChannels, switchChannel, remoteMemories, rank,
|
||||
nRanksPerNode, worldSize, nelems);
|
||||
ipcDomainNranks, worldSize, nelems);
|
||||
return cudaGetLastError();
|
||||
}
|
||||
};
|
||||
@@ -159,8 +159,8 @@ CommResult AllreduceRsAgZeroCopy::allreduceKernelFunc(const std::shared_ptr<void
|
||||
}
|
||||
cudaError_t error =
|
||||
allreduce(input, nullptr, output, this->baseMemoryChannelHandles_.get(), algoCtx->remoteMemoryHandles.get(),
|
||||
nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->nRanksPerNode, algoCtx->workSize, inputSize, stream,
|
||||
nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
nullptr, nullptr, 0, 0, 0, algoCtx->rank, algoCtx->ipcDomainNranks, algoCtx->workSize, inputSize,
|
||||
stream, nullptr, 0, 0, numBlocksAndThreads.first, numBlocksAndThreads.second);
|
||||
if (error != cudaSuccess) {
|
||||
WARN(ALGO, "Allreduce kernel launch failed with error: ", cudaGetErrorString(error));
|
||||
return CommResult::CommUnhandledCudaError;
|
||||
@@ -186,7 +186,7 @@ std::shared_ptr<void> AllreduceRsAgZeroCopy::initAllreduceContext(std::shared_pt
|
||||
auto ctx = std::make_shared<AlgorithmCtx>();
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->workSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = getIpcDomainNranks(comm);
|
||||
ctx->ipcDomainNranks = getIpcDomainNranks(comm);
|
||||
|
||||
ctx->memorySemaphores = this->semaphores_;
|
||||
|
||||
|
||||
@@ -27,8 +27,8 @@ namespace mscclpp {
|
||||
namespace collective {
|
||||
constexpr int NUM_NVLS_CONNECTION = 8;
|
||||
// Sized to cover MAX_NRANKS_PER_NODE-scale allreduce algos whose device-side
|
||||
// semaphore indices grow as O(nRanksPerNode) (e.g. nvls_block_pipeline uses
|
||||
// up to ~5 * nRanksPerNode entries).
|
||||
// semaphore indices grow as O(ipcDomainNranks) (e.g. nvls_block_pipeline uses
|
||||
// up to ~5 * ipcDomainNranks entries).
|
||||
constexpr int NUM_SEMAPHORES = 512;
|
||||
|
||||
// Upper bound on the number of NVLink-reachable ranks that participate in a
|
||||
@@ -54,8 +54,8 @@ std::vector<std::shared_ptr<MemoryDevice2DeviceSemaphore>> setupMemorySemaphores
|
||||
/// Number of ranks that participate in the same GPU-IPC-reachable peer group (e.g. a single host or
|
||||
/// a Multi-Node NVLink fabric, or an AMD XGMI domain). Returns the value of `MSCCLPP_IPC_DOMAIN_NRANKS`
|
||||
/// if set to a positive value; otherwise falls back to `bootstrap->getNranksPerNode()`. This is
|
||||
/// intentionally independent of `nRanksPerNode` so that algorithms can opt in to MNNVL-like behavior
|
||||
/// without changing the meaning of bootstrap-level APIs.
|
||||
/// intentionally independent of `Bootstrap::getNranksPerNode()` so that algorithms can opt in to
|
||||
/// MNNVL-like behavior without changing the meaning of bootstrap-level APIs.
|
||||
int getIpcDomainNranks(std::shared_ptr<Communicator> comm);
|
||||
|
||||
std::shared_ptr<DeviceHandle<MemoryChannel>> setupMemoryChannelDeviceHandles(
|
||||
@@ -86,7 +86,7 @@ class AlgorithmCtx {
|
||||
public:
|
||||
int rank;
|
||||
int workSize;
|
||||
int nRanksPerNode;
|
||||
int ipcDomainNranks;
|
||||
|
||||
std::vector<RegisteredMemory> registeredMemories;
|
||||
std::vector<MemoryChannel> memoryChannels;
|
||||
|
||||
Reference in New Issue
Block a user