From 0c14a67ad264bb8aaa0fcf7284eaa47dc30eb79a Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Wed, 7 Jun 2023 18:58:47 +0800 Subject: [PATCH] [mscclpp-test] Add AllReduce and AllToAll tests (#83) --- CMakeLists.txt | 1 + include/mscclpp/channel.hpp | 13 +- include/mscclpp/concurrency.hpp | 3 + test/allreduce_test.cu | 246 --------------------- test/mscclpp-test/CMakeLists.txt | 2 + test/mscclpp-test/allgather_test.cu | 38 +--- test/mscclpp-test/allreduce_test.cu | 328 ++++++++++++++++++++++++++++ test/mscclpp-test/alltoall_test.cu | 153 +++++++++++++ test/mscclpp-test/common.cu | 49 +++++ test/mscclpp-test/common.hpp | 5 +- test/mscclpp-test/sendrecv_test.cu | 6 +- 11 files changed, 555 insertions(+), 289 deletions(-) delete mode 100644 test/allreduce_test.cu create mode 100644 test/mscclpp-test/allreduce_test.cu create mode 100644 test/mscclpp-test/alltoall_test.cu diff --git a/CMakeLists.txt b/CMakeLists.txt index 5635433f..0c2ff448 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.26) project(mscclpp LANGUAGES CUDA CXX) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CUDA_STANDARD 17) +set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -gencode arch=compute_80,code=sm_80 -gencode arch=compute_90,code=sm_90") option(ENABLE_TRACE "Enable tracing" OFF) option(USE_MPI_FOR_TESTS "Use MPI for tests" ON) diff --git a/include/mscclpp/channel.hpp b/include/mscclpp/channel.hpp index 76248ea9..407e7f68 100644 --- a/include/mscclpp/channel.hpp +++ b/include/mscclpp/channel.hpp @@ -228,8 +228,12 @@ struct SimpleDeviceChannel { SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src) : devChan_(devChan), dst_(dst), src_(src) {} - SimpleDeviceChannel(DeviceChannel devChan, void* dstPtr, void* srcPtr) - : devChan_(devChan), srcPtr_(srcPtr), dstPtr_(dstPtr) {} + SimpleDeviceChannel(DeviceChannel devChan, void* dstPtr, void* srcPtr, void* tmpPtr = nullptr) + : devChan_(devChan), dstPtr_(dstPtr), srcPtr_(srcPtr), tmpPtr_(tmpPtr) {} + + SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src, void* dstPtr, void* srcPtr, + void* tmpPtr = nullptr) + : devChan_(devChan), dst_(dst), src_(src), dstPtr_(dstPtr), srcPtr_(srcPtr), tmpPtr_(tmpPtr) {} SimpleDeviceChannel(const SimpleDeviceChannel& other) = default; @@ -278,8 +282,11 @@ struct SimpleDeviceChannel { MemoryId src_; // these are used for direct copy - void* srcPtr_; void* dstPtr_; + void* srcPtr_; + + // extra local buffer for out-of-place copy + void* tmpPtr_; }; } // namespace channel diff --git a/include/mscclpp/concurrency.hpp b/include/mscclpp/concurrency.hpp index f734d171..efc19426 100644 --- a/include/mscclpp/concurrency.hpp +++ b/include/mscclpp/concurrency.hpp @@ -12,6 +12,9 @@ struct DeviceSyncer { // previous work of all threads in cooperating blocks is finished. __forceinline__ __device__ void sync(int blockNum) { int maxOldCnt = blockNum - 1; + __threadfence(); + // Make sure that all threads in this block have done `__threadfence()` + // before to flip `flag`. __syncthreads(); if (threadIdx.x == 0) { int tmpIsAdd = isAdd_ ^ 1; diff --git a/test/allreduce_test.cu b/test/allreduce_test.cu deleted file mode 100644 index 0a5fcc07..00000000 --- a/test/allreduce_test.cu +++ /dev/null @@ -1,246 +0,0 @@ -#include -#include -#include - -#include "comm.h" -#include "common.h" - -#define ALIGN 4 - -const int phase2Tag = 2; -mscclppDevConn_t* conns; -void* scratch = nullptr; -void* sendRecvData = nullptr; -cuda::barrier* barrier = nullptr; - -struct Chunk { - size_t offset; - size_t size; -}; - -inline int getSendTag(int rank, int peer) { return rank < peer ? 0 : 1; } - -inline int getRecvTag(int rank, int peer) { return rank < peer ? 1 : 0; } - -__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) { - size_t remainder = dataCount % numChunks; - size_t smallChunkSize = dataCount / numChunks; - size_t largeChunkSize = smallChunkSize + 1; - size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; - size_t numSmallChunks = chunkCount - numLargeChunks; - size_t offset = (remainder - numLargeChunks) * largeChunkSize + - (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize; - return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize}; -} - -__host__ __device__ int peerIdx(int peerRank, int rank) { return peerRank < rank ? peerRank : peerRank - 1; } - -__host__ __device__ int peerRank(int peerIdx, int rank) { return peerIdx < rank ? peerIdx : peerIdx + 1; } - -__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3; } - -__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 1; } - -__host__ __device__ int phase2ConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 2; } - -__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) { - if (threadIdx.x == 0) { - conn.putWithSignalAndFlush(dstOffset, srcOffset, size); - } - __syncthreads(); -} - -__device__ void recv(mscclppDevConn_t& conn) { - if (threadIdx.x == 0) { - conn.wait(); - } - __syncthreads(); -} - -__device__ void reduceSum(int* dst, int* src, size_t size) { - for (int i = threadIdx.x; i < size; i += blockDim.x) { - dst[i] += src[i]; - } -} - -__global__ void initData(int* data, size_t size, int rank) { - for (int i = threadIdx.x; i < size; i += blockDim.x) { - data[i] = rank; - } -} - -__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount, - mscclppDevConn_t* conns, void* scratch, void* sendRecvData, - cuda::barrier* barrier) { - int idx = blockIdx.x; - int peer = peerRank(idx, rank); - mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)]; - mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(peer, rank)]; - mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(peer, rank)]; - - // 1st communication phase: send data to the scratch buffer of the peer associated with this block - Chunk toPeerChunk = getChunk(dataCount, nRanks, peer, 1); - // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. - // The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and - // inside that each of the destination's peers send to the nth chunk, where n is the index of the - // source peer from the destination's perspective. - size_t dstOffset = peerIdx(rank, peer) * toPeerChunk.size; - send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int)); - recv(phase1RecvConn); - - if (threadIdx.x == 0) barrier->arrive_and_wait(); - __syncthreads(); - - // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer - Chunk rankChunk = getChunk(dataCount, nRanks, rank, 1); - int* chunk = (int*)sendRecvData + rankChunk.offset; - int numPeers = nRanks - 1, numBlocks = nRanks - 1; - Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, idx, 1); - for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) { - assert(scratchDataCount % numPeers == 0); - assert(scratchDataCount / numPeers == rankChunk.size); - size_t scratchDataCountPerPeer = scratchDataCount / numPeers; - int* scratchChunk = (int*)scratch + peerIdx * scratchDataCountPerPeer; - Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, idx, 1); - assert(blockScratchChunk.size == blockUserChunk.size); - reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); - } - - if (threadIdx.x == 0) barrier->arrive_and_wait(); - __syncthreads(); - - // 2nd communication phase: send the now reduced data between the user buffers - Chunk collectionChunk = getChunk(dataCount, nRanks, rank, 1); - send(phase2Conn, collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int), - collectionChunk.size * sizeof(int)); - recv(phase2Conn); -} - -void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, - size_t* recvInplaceOffset, size_t count, int nranks) { - size_t base = (count / ALIGN) * ALIGN; - *sendcount = base; - *recvcount = base; - *sendInplaceOffset = 0; - *recvInplaceOffset = 0; - *paramcount = base; -} - -void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) { - size_t paramcount, sendInplaceOffset, recvInplaceOffset; - AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); -} - -testResult_t AllReduceInitData(struct testArgs* args, int in_place) { - size_t recvcount = args->expectedBytes / sizeof(int); - - CUDACHECK(cudaSetDevice(args->gpuNum)); - CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes)); - initData<<<1, 256>>>((int*)args->recvbuff, recvcount, args->proc); - - int* dataHost = new int[recvcount]; - for (size_t i = 0; i < recvcount; i++) { - dataHost[i] = args->totalProcs * (args->totalProcs - 1) / 2; - } - CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); - delete dataHost; - CUDACHECK(cudaDeviceSynchronize()); - MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm)); - return testSuccess; -} - -void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { - double baseBw = (double)(count * typesize) / 1.0E9 / sec; - - *algBw = baseBw; - double factor = (2 * (double)(nranks - 1)) / ((double)nranks); - *busBw = baseBw * factor; -} - -testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm, - cudaStream_t stream, int kernelNum) { - int worldSize = comm->nRanks; - int nPeers = worldSize - 1; - int dataCount = nBytes / sizeof(int); - Chunk chunk = getChunk(dataCount, worldSize, comm->rank, 1); - size_t scratchDataCount = chunk.size * nPeers; - allReduceKernel0<<>>(comm->rank, worldSize, dataCount, scratchDataCount, conns, - scratch, sendRecvData, barrier); - return testSuccess; -} - -struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, defaultInitColl, AllReduceInitData, - AllReduceGetBw, AllReduceRunColl}; - -testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) { - int rank = args->proc, worldSize = args->totalProcs; - size_t bufferSize = args->maxbytes; - Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1); - int nPeers = args->totalProcs - 1; - size_t scratchBytes = chunk.size * nPeers * sizeof(int); - - CUDACHECK(cudaMalloc(&scratch, scratchBytes)); - - for (int peer = 0; peer < worldSize; ++peer) { - if (peer != args->proc) { - int sendTag = getSendTag(args->proc, peer); - int recvTag = getRecvTag(args->proc, peer); - MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); - MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr)); - MSCCLPPCHECK( - mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); - } - } - MSCCLPPCHECK(mscclppConnectionSetup(args->comm)); - - return testSuccess; -} - -testResult_t AllReduceTeardownMscclppConnections() { - if (scratch != nullptr) { - CUDACHECK(cudaFree(scratch)); - scratch = nullptr; - } - return testSuccess; -} - -testResult_t AllReduceRunTest(struct testArgs* args) { - args->collTest = &allReduceTest; - - sendRecvData = args->recvbuff; - CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier))); - cuda::barrier initBarrier(args->totalProcs - 1); - CUDACHECK( - cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); - int nPeers = args->totalProcs - 1; - int rank = args->proc; - std::vector hostConns(nPeers * 3, mscclppDevConn_t()); - - for (int peer = 0; peer < args->totalProcs; ++peer) { - mscclppDevConn_t* devConn; - if (peer != rank) { - int sendTag = getSendTag(args->proc, peer); - int recvTag = getRecvTag(args->proc, peer); - MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, sendTag, &devConn)); - hostConns[phase1SendConnIdx(peer, rank)] = *devConn; - MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, recvTag, &devConn)); - hostConns[phase1RecvConnIdx(peer, rank)] = *devConn; - MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, phase2Tag, &devConn)); - hostConns[phase2ConnIdx(peer, rank)] = *devConn; - } - } - CUDACHECK(cudaMalloc(&conns, nPeers * 3 * sizeof(mscclppDevConn_t))); - CUDACHECK(cudaMemcpy(conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice)); - - TESTCHECK(TimeTest(args)); - - CUDACHECK(cudaFree(barrier)); - CUDACHECK(cudaFree(conns)); - - return testSuccess; -} - -struct testEngine allReduceEngine = {AllReduceGetBuffSize, AllReduceRunTest, AllReduceSetupMscclppConnections, - AllReduceTeardownMscclppConnections}; - -#pragma weak mscclppTestEngine = allReduceEngine diff --git a/test/mscclpp-test/CMakeLists.txt b/test/mscclpp-test/CMakeLists.txt index e60848e9..65d1148e 100644 --- a/test/mscclpp-test/CMakeLists.txt +++ b/test/mscclpp-test/CMakeLists.txt @@ -5,3 +5,5 @@ endfunction() add_mscclpp_test_executable(sendrecv_test_perf sendrecv_test.cu) add_mscclpp_test_executable(allgather_test_perf allgather_test.cu) +add_mscclpp_test_executable(allreduce_test_perf allreduce_test.cu) +add_mscclpp_test_executable(alltoall_test_perf alltoall_test.cu) diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu index 89b66221..0e75c8d2 100644 --- a/test/mscclpp-test/allgather_test.cu +++ b/test/mscclpp-test/allgather_test.cu @@ -133,7 +133,7 @@ class AllGatherTestColl : public BaseTestColl { void runColl(const TestArgs& args, cudaStream_t stream) override; void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; - void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override; + void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; void setupCollTest(size_t size) override; }; @@ -205,42 +205,8 @@ void AllGatherTestEngine::allocateBuffer() { } void AllGatherTestEngine::setupConnections() { - const int worldSize = args_.totalRanks; - const int rank = args_.rank; - const int nRanksPerNode = args_.nRanksPerNode; - const int thisNode = rank / nRanksPerNode; - const mscclpp::Transport ibTransport = IBs[args_.gpuNum]; - - std::vector channelIds; - std::vector localMemories; - std::vector> remoteMemories; - - auto rankToNode = [&](int rank) { return rank / nRanksPerNode; }; - for (int r = 0; r < worldSize; r++) { - if (r == rank) { - continue; - } - mscclpp::Transport transport; - if (rankToNode(r) == thisNode) { - transport = mscclpp::Transport::CudaIpc; - } else { - transport = ibTransport; - } - // Connect with all other ranks - channelIds.push_back(chanService_->addChannel(comm_->connectOnSetup(r, 0, transport))); - auto memory = comm_->registerMemory(sendBuff_.get(), args_.maxBytes, mscclpp::Transport::CudaIpc | ibTransport); - localMemories.push_back(memory); - comm_->sendMemoryOnSetup(memory, r, 0); - remoteMemories.push_back(comm_->recvMemoryOnSetup(r, 0)); - } - comm_->setup(); - std::vector devChannels; - for (size_t i = 0; i < channelIds.size(); ++i) { - devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(chanService_->deviceChannel(channelIds[i]), - chanService_->addMemory(remoteMemories[i].get()), - chanService_->addMemory(localMemories[i]))); - } + setupMeshConnections(devChannels, sendBuff_.get(), args_.maxBytes); assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel)); CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(), diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu new file mode 100644 index 00000000..072d9d1f --- /dev/null +++ b/test/mscclpp-test/allreduce_test.cu @@ -0,0 +1,328 @@ +#include +#include +#include + +#include "common.hpp" + +#define ALIGN 4 +#define BLOCKS_PER_PEER 15 + +__constant__ mscclpp::channel::SimpleDeviceChannel constDevFstRoundChans[16]; +__constant__ mscclpp::channel::SimpleDeviceChannel constDevSndRoundChans[16]; + +struct Chunk { + size_t offset; + size_t size; +}; + +__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx) { + size_t remainder = dataCount % numChunks; + size_t smallChunkSize = dataCount / numChunks; + size_t largeChunkSize = smallChunkSize + 1; + size_t numRemainedLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; + size_t offset = (remainder - numRemainedLargeChunks) * largeChunkSize + + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize; + return Chunk{offset, chunkIdx < remainder ? largeChunkSize : smallChunkSize}; +} + +__device__ void reduceSum(int* dst, int* src, size_t size) { + for (int i = threadIdx.x; i < size; i += blockDim.x) { + dst[i] += src[i]; + } +} + +__device__ mscclpp::DeviceSyncer deviceSyncer; + +__device__ void allreduce0(int rank, int worldSize, size_t nelems, size_t scratchDataCount) { + int peerId = blockIdx.x / BLOCKS_PER_PEER; + int isComm = (threadIdx.x == 0) && (blockIdx.x % BLOCKS_PER_PEER == 0); + int remoteRank = (peerId < rank) ? peerId : peerId + 1; + + // 1st communication phase: send data to the scratch buffer of the peer associated with this block + mscclpp::channel::SimpleDeviceChannel& devFstRoundChan = constDevFstRoundChans[peerId]; + Chunk toPeerChunk = getChunk(nelems, worldSize, remoteRank); + // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. + // The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and + // inside that each of the destination's peers send to the nth chunk, where n is the index of the + // source peer from the destination's perspective. + size_t dstOffset = (rank < remoteRank ? rank : rank - 1) * toPeerChunk.size; + if (isComm) { + // Write data to the peer + devFstRoundChan.putWithSignalAndFlush(dstOffset * sizeof(int), toPeerChunk.offset * sizeof(int), + toPeerChunk.size * sizeof(int)); + // Wait for data from the peer + devFstRoundChan.wait(); + } + + deviceSyncer.sync(gridDim.x); + + // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer + mscclpp::channel::SimpleDeviceChannel& devSndRoundChan = constDevSndRoundChans[peerId]; + Chunk rankChunk = getChunk(nelems, worldSize, rank); + int* chunk = (int*)devSndRoundChan.srcPtr_ + rankChunk.offset; + int numPeers = gridDim.x / BLOCKS_PER_PEER; + int numBlocks = gridDim.x; + Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, blockIdx.x); + size_t scratchDataCountPerPeer = scratchDataCount / numPeers; + Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, blockIdx.x); + for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) { + int* scratchChunk = (int*)devFstRoundChan.tmpPtr_ + peerIdx * scratchDataCountPerPeer; + reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); + } + + deviceSyncer.sync(gridDim.x); + + // 2nd communication phase: send the now reduced data between the user buffers + Chunk collectionChunk = getChunk(nelems, worldSize, rank); + if (isComm) { + // Write data to the peer + devSndRoundChan.putWithSignalAndFlush(collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int), + collectionChunk.size * sizeof(int)); + // Wait for data from the peer + devSndRoundChan.wait(); + } +} + +__forceinline__ __device__ void vectorSum(int* dst, int* src, size_t nElem) { + size_t nInt4 = nElem / 4; + size_t nLastInts = nElem % 4; + int4* dst4 = (int4*)dst; + int4* src4 = (int4*)src; + for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < nInt4; i += blockDim.x * gridDim.x) { + dst4[i].w += src4[i].w; + dst4[i].x += src4[i].x; + dst4[i].y += src4[i].y; + dst4[i].z += src4[i].z; + } + if (nLastInts > 0) { + int* dstLast = dst + nInt4 * 4; + int* srcLast = src + nInt4 * 4; + for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < nLastInts; i += blockDim.x * gridDim.x) { + dstLast[i] += srcLast[i]; + } + } +} + +__device__ void allreduce1(int rank, int worldSize, size_t nelems, size_t scratchDataCount) { + int isComm = (threadIdx.x == 0) && (blockIdx.x == 0); + int remoteSendRank = (rank + 1) % worldSize; + int remoteRecvRank = (rank + worldSize - 1) % worldSize; + int peerSendId = (remoteSendRank < rank) ? remoteSendRank : remoteSendRank - 1; + int peerRecvId = (remoteRecvRank < rank) ? remoteRecvRank : remoteRecvRank - 1; + + mscclpp::channel::SimpleDeviceChannel& devFstSendChan = constDevFstRoundChans[peerSendId]; + mscclpp::channel::SimpleDeviceChannel& devFstRecvChan = constDevFstRoundChans[peerRecvId]; + mscclpp::channel::SimpleDeviceChannel& devSndSendChan = constDevSndRoundChans[peerSendId]; + mscclpp::channel::SimpleDeviceChannel& devSndRecvChan = constDevSndRoundChans[peerRecvId]; + + // Step 1 + size_t chunkIndex = (rank + worldSize - 1) % worldSize; + size_t chunkNelem = nelems / worldSize; + size_t offset = chunkIndex * chunkNelem * sizeof(int); + if (isComm) { + if (chunkNelem > 1) { + devFstSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int)); + } + } + + // Step 2 ~ Step n-1 + for (int step = 2; step < worldSize; ++step) { + if (isComm) { + if (chunkNelem > 1) { + devFstRecvChan.wait(); + devFstSendChan.flush(); + } + devFstSendChan.putWithSignal(offset + chunkNelem / 2 * sizeof(int), (chunkNelem - chunkNelem / 2) * sizeof(int)); + } + deviceSyncer.sync(gridDim.x); + + // Reduce + chunkIndex = (rank + worldSize - step) % worldSize; + offset = chunkIndex * chunkNelem * sizeof(int); + int* dst = (int*)((char*)devFstSendChan.srcPtr_ + offset); + int* src = (int*)((char*)devFstRecvChan.tmpPtr_ + offset); + vectorSum(dst, src, chunkNelem / 2); + + if (isComm) { + devFstRecvChan.wait(); + devFstSendChan.flush(); + if (chunkNelem > 1) { + devFstSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int)); + } + } + deviceSyncer.sync(gridDim.x); + + dst += chunkNelem / 2; + src += chunkNelem / 2; + vectorSum(dst, src, chunkNelem - chunkNelem / 2); + } + + // Step n + if (isComm) { + if (chunkNelem > 1) { + devFstRecvChan.wait(); + devFstSendChan.flush(); + } + devFstSendChan.putWithSignal(offset + chunkNelem / 2 * sizeof(int), (chunkNelem - chunkNelem / 2) * sizeof(int)); + } + deviceSyncer.sync(gridDim.x); + + offset = rank * chunkNelem * sizeof(int); + int* dst = (int*)((char*)devFstSendChan.srcPtr_ + offset); + int* src = (int*)((char*)devFstRecvChan.tmpPtr_ + offset); + vectorSum(dst, src, chunkNelem / 2); + + if (isComm) { + devFstRecvChan.wait(); + devFstSendChan.flush(); + if (chunkNelem > 1) { + devSndSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int)); + } + } + deviceSyncer.sync(gridDim.x); + + dst += chunkNelem / 2; + src += chunkNelem / 2; + vectorSum(dst, src, chunkNelem - chunkNelem / 2); + + if (isComm) { + if (chunkNelem > 1) { + devSndSendChan.flush(); + } + devSndSendChan.putWithSignalAndFlush(offset + chunkNelem / 2 * sizeof(int), + (chunkNelem - chunkNelem / 2) * sizeof(int)); + } + + // Step n+1 ~ Step 2n-2 + for (int i = 1; i < worldSize - 1; ++i) { + if (isComm) { + devSndRecvChan.wait(); + } + deviceSyncer.sync(gridDim.x); + + // Copy + chunkIndex = (rank + worldSize - i) % worldSize; + if (isComm) { + devSndSendChan.putWithSignalAndFlush(chunkIndex * chunkNelem * sizeof(int), chunkNelem * sizeof(int)); + } + } + + // Final receive + if (isComm) { + devSndRecvChan.wait(); + } +} + +__global__ void kernel(int rank, int worldSize, size_t nelems, size_t scratchDataCount, int kernel) { + if (kernel == 0) + allreduce0(rank, worldSize, nelems, scratchDataCount); + else if (kernel == 1) + allreduce1(rank, worldSize, nelems, scratchDataCount); +} + +class AllReduceTestColl : public BaseTestColl { + public: + AllReduceTestColl() = default; + ~AllReduceTestColl() = default; + + void runColl(const TestArgs& args, cudaStream_t stream) override; + void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; + void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; + void setupCollTest(size_t size) override; +}; + +void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { + const int worldSize = args.totalRanks; + const int rank = args.rank; + const int kernelNum = args.kernelNum; + const int nPeers = worldSize - 1; + const Chunk chunk = getChunk(paramCount_, worldSize, rank); + const size_t scratchDataCount = chunk.size * nPeers; + const int nBlocks = (kernelNum == 0) ? nPeers * BLOCKS_PER_PEER : 24; + kernel<<>>(rank, worldSize, paramCount_, scratchDataCount, kernelNum); +} + +void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { + assert(sendBuff.size() == 1); + const int rank = args.rank; + const int worldSize = args.totalRanks; + std::vector dataHost(std::max(sendCount_, recvCount_), rank); + CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), sendCount_ * typeSize_, cudaMemcpyHostToDevice)); + + for (size_t i = 0; i < recvCount_; i++) { + dataHost[i] = worldSize * (worldSize - 1) / 2; + } + std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_); +} + +void AllReduceTestColl::getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) { + double baseBw = (double)(paramCount_ * typeSize_) / 1.0E9 / deltaSec; + algBw = baseBw; + double factor = (2 * (double)(worldSize_ - 1)) / ((double)worldSize_); + busBw = baseBw * factor; +} + +void AllReduceTestColl::setupCollTest(size_t size) { + size_t count = size / typeSize_; + size_t base = (count / ALIGN) * ALIGN; + sendCount_ = base; + recvCount_ = base; + paramCount_ = base; + recvCount_ = base; + + mscclpp::DeviceSyncer syncer = {}; + CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); +} + +class AllReduceTestEngine : public BaseTestEngine { + public: + AllReduceTestEngine() = default; + ~AllReduceTestEngine() = default; + + void allocateBuffer() override; + void setupConnections() override; + + private: + std::vector getSendBuff() override; + void* getExpectedBuff() override; + void* getRecvBuff() override; + + std::shared_ptr sendBuff_; + std::shared_ptr scratchBuff_; + std::shared_ptr expectedBuff_; +}; + +void AllReduceTestEngine::allocateBuffer() { + sendBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + scratchBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); +} + +void AllReduceTestEngine::setupConnections() { + std::vector fstRoundChannels; + std::vector sndRoundChannels; + + // Send data from local sendBuff to remote scratchBuff (out-of-place) + setupMeshConnections(fstRoundChannels, sendBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); + assert(fstRoundChannels.size() < sizeof(constDevFstRoundChans) / sizeof(mscclpp::channel::SimpleDeviceChannel)); + CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, fstRoundChannels.data(), + sizeof(mscclpp::channel::SimpleDeviceChannel) * fstRoundChannels.size())); + + // Send data from local sendBuff to remote sendBuff (in-place) + setupMeshConnections(sndRoundChannels, sendBuff_.get(), args_.maxBytes); + assert(sndRoundChannels.size() < sizeof(constDevSndRoundChans) / sizeof(mscclpp::channel::SimpleDeviceChannel)); + CUDATHROW(cudaMemcpyToSymbol(constDevSndRoundChans, sndRoundChannels.data(), + sizeof(mscclpp::channel::SimpleDeviceChannel) * sndRoundChannels.size())); +} + +std::vector AllReduceTestEngine::getSendBuff() { return {sendBuff_.get()}; } + +void* AllReduceTestEngine::getExpectedBuff() { return expectedBuff_.get(); } + +void* AllReduceTestEngine::getRecvBuff() { + // in-place operation reuse the send buffer + return sendBuff_.get(); +} + +std::shared_ptr getTestEngine() { return std::make_shared(); } +std::shared_ptr getTestColl() { return std::make_shared(); } diff --git a/test/mscclpp-test/alltoall_test.cu b/test/mscclpp-test/alltoall_test.cu new file mode 100644 index 00000000..8f55bc17 --- /dev/null +++ b/test/mscclpp-test/alltoall_test.cu @@ -0,0 +1,153 @@ +#include +#include + +#include "common.hpp" + +#define ALIGN 4 +__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16]; +__device__ mscclpp::DeviceSyncer deviceSyncer; +void* localRecvBuff; +void* localSendBuff; + +__device__ void localAlltoall(int rank, int nRanksPerNode, size_t nElements) { + int remoteRank = (blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; + for (int i = 1; i < nRanksPerNode; i++) { + mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[blockIdx.x]; + if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank + i) % nRanksPerNode) { + devChan.putWithSignalAndFlush(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), + nElements * sizeof(int)); + } + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank - i + nRanksPerNode) % nRanksPerNode) { + devChan.wait(); + } + deviceSyncer.sync(nRanksPerNode - 1); + } +} + +__device__ void alltoall0(int rank, int worldSize, size_t nElements) { + int remoteRank = (blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; + mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[blockIdx.x]; + if (threadIdx.x == 0) { + devChan.putWithSignal(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), + nElements * sizeof(int)); + } + + deviceSyncer.sync(gridDim.x); + if (threadIdx.x == 0) { + devChan.flush(); + devChan.wait(); + } +} + +__device__ void alltoall1(int rank, int nRanksPerNode, size_t nElements) { + localAlltoall(rank, nRanksPerNode, nElements); +} + +__global__ void kernel(int rank, int worldSize, size_t nElements, int nRanksPerNode, int kernelNum) { + if (kernelNum == 0) { + alltoall0(rank, worldSize, nElements); + } if (kernelNum == 1) { + alltoall1(rank, nRanksPerNode, nElements); + } +} + +class AllToAllTestColl : public BaseTestColl { + public: + AllToAllTestColl() = default; + ~AllToAllTestColl() override = default; + + void runColl(const TestArgs& args, cudaStream_t stream) override; + void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; + void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; + void setupCollTest(size_t size) override; +}; + +void AllToAllTestColl::runColl(const TestArgs& args, cudaStream_t stream) { + const int worldSize = args.totalRanks; + const int rank = args.rank; + const int kernelNum = args.kernelNum; + const int nRanksPerNode = args.nRanksPerNode; + CUDATHROW(cudaMemcpyAsync((int*)localRecvBuff + paramCount_ * rank, (int*)localSendBuff + paramCount_ * rank, + paramCount_ * sizeof(int), cudaMemcpyDeviceToDevice, stream)); + kernel<<>>(rank, worldSize, paramCount_, nRanksPerNode, kernelNum); +} + +void AllToAllTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { + assert(sendBuff.size() == 1); + const int rank = args.rank; + std::vector dataHost(recvCount_, 0); + // For rank 0, the data is 0, 1, 2 ... recvCount_ - 1, for rank 1, the data is recvCount_, recvCount_ + 1, ... + for (size_t i = 0; i < recvCount_; i++) { + dataHost[i] = rank * recvCount_ + i; + } + CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), sendCount_ * typeSize_, cudaMemcpyHostToDevice)); + for (size_t i = 0; i < recvCount_ / paramCount_; i++) { + for (int j = 0; j < paramCount_; j++) { + dataHost[i * paramCount_ + j] = i * recvCount_ + rank * paramCount_ + j; + } + } + std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_); +} + +void AllToAllTestColl::getBw(const double deltaSec, double& algBw, double& busBw) { + double baseBw = (double)(paramCount_ * typeSize_ * worldSize_) / 1.0E9 / deltaSec; + algBw = baseBw; + double factor = ((double)(worldSize_ - 1)) / ((double)worldSize_); + busBw = baseBw * factor; +} + +void AllToAllTestColl::setupCollTest(size_t size) { + size_t count = size / typeSize_; + size_t base = (count / worldSize_ / (ALIGN)) * ALIGN * worldSize_; + sendCount_ = base; + recvCount_ = base; + paramCount_ = base / worldSize_; + expectedCount_ = base; + + mscclpp::DeviceSyncer syncer = {}; + CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); +} + +class AllToAllTestEngine : public BaseTestEngine { + public: + AllToAllTestEngine() : BaseTestEngine(false){}; + ~AllToAllTestEngine() override = default; + + void allocateBuffer() override; + void setupConnections() override; + + private: + std::vector getSendBuff() override; + void* getExpectedBuff() override; + void* getRecvBuff() override; + + std::shared_ptr sendBuff_; + std::shared_ptr recvBuff_; + std::shared_ptr expectedBuff_; +}; + +void AllToAllTestEngine::allocateBuffer() { + sendBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + recvBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); + + localSendBuff = sendBuff_.get(); + localRecvBuff = recvBuff_.get(); +} + +void AllToAllTestEngine::setupConnections() { + std::vector devChannels; + setupMeshConnections(devChannels, sendBuff_.get(), args_.maxBytes, recvBuff_.get(), args_.maxBytes); + + assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel)); + CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(), + sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size())); +} + +std::vector AllToAllTestEngine::getSendBuff() { return {sendBuff_.get()}; } +void* AllToAllTestEngine::getExpectedBuff() { return expectedBuff_.get(); } +void* AllToAllTestEngine::getRecvBuff() { return recvBuff_.get(); } + +std::shared_ptr getTestEngine() { return std::make_shared(); } +std::shared_ptr getTestColl() { return std::make_shared(); } diff --git a/test/mscclpp-test/common.cu b/test/mscclpp-test/common.cu index 77126631..1893937c 100644 --- a/test/mscclpp-test/common.cu +++ b/test/mscclpp-test/common.cu @@ -227,6 +227,55 @@ size_t BaseTestEngine::checkData() { return nErrors; } +// Create mesh connections between all ranks. If recvBuff is nullptr, assume in-place. +void BaseTestEngine::setupMeshConnections(std::vector& devChannels, + void* sendBuff, size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { + const int worldSize = args_.totalRanks; + const int rank = args_.rank; + const int nRanksPerNode = args_.nRanksPerNode; + const int thisNode = rank / nRanksPerNode; + const mscclpp::Transport ibTransport = IBs[args_.gpuNum]; + const bool isOutPlace = (recvBuff != nullptr); + + std::vector channelIds; + std::vector localMemories; + std::vector localTmpMemories; + std::vector> remoteMemories; + + auto rankToNode = [&](int rank) { return rank / nRanksPerNode; }; + for (int r = 0; r < worldSize; r++) { + if (r == rank) { + continue; + } + mscclpp::Transport transport; + if (rankToNode(r) == thisNode) { + transport = mscclpp::Transport::CudaIpc; + } else { + transport = ibTransport; + } + // Connect with all other ranks + channelIds.push_back(chanService_->addChannel(comm_->connectOnSetup(r, 0, transport))); + auto sendMemory = comm_->registerMemory(sendBuff, sendBuffBytes, mscclpp::Transport::CudaIpc | ibTransport); + localMemories.push_back(sendMemory); + if (isOutPlace) { + auto recvMemory = comm_->registerMemory(recvBuff, recvBuffBytes, mscclpp::Transport::CudaIpc | ibTransport); + comm_->sendMemoryOnSetup(recvMemory, r, 0); + localTmpMemories.push_back(recvMemory); + } else { + comm_->sendMemoryOnSetup(sendMemory, r, 0); + } + remoteMemories.push_back(comm_->recvMemoryOnSetup(r, 0)); + } + comm_->setup(); + + for (size_t i = 0; i < channelIds.size(); ++i) { + devChannels.push_back(mscclpp::channel::SimpleDeviceChannel( + chanService_->deviceChannel(channelIds[i]), chanService_->addMemory(remoteMemories[i].get()), + chanService_->addMemory(localMemories[i]), remoteMemories[i].get().data(), localMemories[i].data(), + (isOutPlace ? localTmpMemories[i].data() : nullptr))); + } +} + void run(int argc, char* argv[]); int main(int argc, char* argv[]) { // Make sure everyline is flushed so that we see the progress of the test diff --git a/test/mscclpp-test/common.hpp b/test/mscclpp-test/common.hpp index ddc100e0..f479cb7e 100644 --- a/test/mscclpp-test/common.hpp +++ b/test/mscclpp-test/common.hpp @@ -41,7 +41,7 @@ class BaseTestColl { virtual ~BaseTestColl() {} virtual void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) = 0; virtual void runColl(const TestArgs& args, cudaStream_t stream) = 0; - virtual void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) = 0; + virtual void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) = 0; void setupCollTest(const TestArgs& args, size_t size); size_t getSendBytes() { return sendCount_ * typeSize_; } @@ -83,6 +83,9 @@ class BaseTestEngine { double benchTime(); protected: + void setupMeshConnections(std::vector& devChannels, void* sendBuff, + size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); + TestArgs args_; std::shared_ptr coll_; std::shared_ptr comm_; diff --git a/test/mscclpp-test/sendrecv_test.cu b/test/mscclpp-test/sendrecv_test.cu index 377f7799..0b76c498 100644 --- a/test/mscclpp-test/sendrecv_test.cu +++ b/test/mscclpp-test/sendrecv_test.cu @@ -54,7 +54,7 @@ class SendRecvTestColl : public BaseTestColl { void runColl(const TestArgs& args, cudaStream_t stream) override; void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; - void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override; + void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; void setupCollTest(size_t size) override; }; @@ -65,9 +65,9 @@ void SendRecvTestColl::runColl(const TestArgs& args, cudaStream_t stream) { kernel<<>>(args.rank, sendBytes, bytesPerBlock); } -void SendRecvTestColl::getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) { +void SendRecvTestColl::getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) { double baseBw = (double)(paramCount_ * typeSize_) / 1.0E9 / deltaSec; - algBW = baseBw; + algBw = baseBw; double factor = 1; busBw = baseBw * factor; }