From d338c6e701251f64a74a48cc03a39fefe86876ae Mon Sep 17 00:00:00 2001 From: Binyang2014 Date: Sun, 23 Apr 2023 13:32:01 +0800 Subject: [PATCH] Integrate allReduceTest to mscclpp-test (#57) --- Makefile | 2 +- src/proxy.cc | 1 + tests/allgather_test.cu | 2 +- tests/allreduce_allpairs_test.cu | 322 ------------------------------- tests/allreduce_test.cu | 284 +++++++++++++++++++++++++++ tests/common.cu | 17 +- tests/common.h | 2 + 7 files changed, 298 insertions(+), 332 deletions(-) delete mode 100644 tests/allreduce_allpairs_test.cu create mode 100644 tests/allreduce_test.cu diff --git a/Makefile b/Makefile index bc91ddff..907c32fb 100644 --- a/Makefile +++ b/Makefile @@ -153,7 +153,7 @@ TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) MSCLLPPTESTSOBJSDIR:= $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR) -MSCLLPPTESTBINFILESLIST := allgather_test ring_send_recv_test +MSCLLPPTESTBINFILESLIST := allgather_test allreduce_test ring_send_recv_test MSCLLPPTESTBINS := $(MSCLLPPTESTBINFILESLIST:%=$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf) INCLUDE := -Isrc -Isrc/include diff --git a/src/proxy.cc b/src/proxy.cc index ac704bc6..c126d639 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -99,6 +99,7 @@ void* mscclppProxyService(void* _args) struct mscclppComm* comm = args->comm; // from this point on, proxy thread will stay close to the device + PROXYCUDACHECK(cudaSetDevice(comm->cudaDev)); PROXYMSCCLPPCHECK(numaBind(comm->devNumaNode)); volatile mscclppProxyRunState_t* run = &args->proxyState->run; diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 647debef..11dbfa2d 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -215,6 +215,6 @@ testResult_t AllGatherRunTest(struct testArgs* args) return testSuccess; } -struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest}; +struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest, nullptr, nullptr}; #pragma weak mscclppTestEngine = allGatherEngine diff --git a/tests/allreduce_allpairs_test.cu b/tests/allreduce_allpairs_test.cu deleted file mode 100644 index 514a2b30..00000000 --- a/tests/allreduce_allpairs_test.cu +++ /dev/null @@ -1,322 +0,0 @@ -#include "mscclpp.h" -#include -#include -#include - -#include "common.h" - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \ - return res; \ - } \ - } while (0); - -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -struct Volume -{ - size_t offset; - size_t size; -}; - -__host__ __device__ Volume chunkVolume(size_t totalSize, size_t totalChunks, size_t chunkIdx, size_t chunkCount) -{ - size_t remainder = totalSize % totalChunks; - size_t smallChunk = totalSize / totalChunks; - size_t largeChunk = smallChunk + 1; - size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; - size_t numSmallChunks = chunkCount - numLargeChunks; - size_t offset = - (remainder - numLargeChunks) * largeChunk + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunk; - return Volume{offset, numLargeChunks * largeChunk + numSmallChunks * smallChunk}; -} - -template struct AllreduceAllpairs -{ - int rank; - int nRanks; - T* userData; - size_t userSize; - T* scratch; - size_t scratchSize; - mscclppDevConn_t* conns; - uint64_t* connFlags; - cuda::barrier* barrier; - - __device__ void run(int idx) - { - int myPeer = peerRank(idx, rank); - mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(myPeer)]; - mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(myPeer)]; - mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(myPeer)]; - - // 1st communication phase: send data to the scratch buffer of the peer associated with this block - Volume toPeer = chunkVolume(userSize, nRanks, myPeer, 1); - // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. - // The destination will have allocated a scratch buffer of size numPeers() * toPeer.size and - // inside that each of the destination's peers send to the nth chunk, where n is the index of the - // source peer from the destination's perspective. - size_t dstOffset = peerIdx(rank, myPeer) * toPeer.size; - send(phase1SendConn, toPeer.offset, dstOffset, toPeer.size); - recv(phase1RecvConn); - - if (threadIdx.x == 0) - barrier->arrive_and_wait(); - __syncthreads(); - - // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer - Volume rankUserChunk = chunkVolume(userSize, nRanks, rank, 1); - T* userChunk = userData + rankUserChunk.offset; - Volume blockUserChunk = chunkVolume(rankUserChunk.size, numBlocks(), idx, 1); - for (int peerIdx = 0; peerIdx < numPeers(); ++peerIdx) { - assert(scratchSize % numPeers() == 0); - assert(scratchSize / numPeers() == rankUserChunk.size); - size_t scratchChunkSize = scratchSize / numPeers(); - T* scratchChunk = scratch + peerIdx * scratchChunkSize; - Volume blockScratchChunk = chunkVolume(scratchChunkSize, numBlocks(), idx, 1); - assert(blockScratchChunk.size == blockUserChunk.size); - reduce(userChunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); - } - - if (threadIdx.x == 0) - barrier->arrive_and_wait(); - __syncthreads(); - - // 2nd communication phase: send the now reduced data between the user buffers - Volume srcVolume2 = chunkVolume(userSize, nRanks, rank, 1); - send(phase2Conn, srcVolume2.offset, srcVolume2.offset, srcVolume2.size); - recv(phase2Conn); - } - - __device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) - { - if (threadIdx.x == 0) { - volatile uint64_t* localFlag = conn.localFlag; - *localFlag = 1; // 1 is used to signal the send - - mscclppTrigger_t trigger; - auto request = conn.fifo.getTrigger(&trigger); - conn.fifo.setTrigger(trigger, mscclppData | mscclppFlag, srcOffset * sizeof(T), dstOffset * sizeof(T), - size * sizeof(T)); - } - __syncthreads(); - } - - __device__ void recv(mscclppDevConn_t& conn) - { - if (threadIdx.x == 0) { - volatile uint64_t* proxyFlag = conn.proxyFlag; - while (*proxyFlag != 1) { - } - *proxyFlag = 0; - } - __syncthreads(); - } - - __host__ __device__ int numPeers() - { - return nRanks - 1; - } - - __host__ __device__ int numBlocks() - { - return numPeers(); - } - - __host__ __device__ int peerIdx(int peerRank, int myRank) - { - return peerRank < myRank ? peerRank : peerRank - 1; - } - - __host__ __device__ int peerRank(int peerIdx, int myRank) - { - return peerIdx < myRank ? peerIdx : peerIdx + 1; - } - - __host__ __device__ int phase1SendConnIdx(int peerRank) - { - return peerIdx(peerRank, rank) * 3; - } - - __host__ __device__ int phase1RecvConnIdx(int peerRank) - { - return peerIdx(peerRank, rank) * 3 + 1; - } - - __host__ __device__ int phase2ConnIdx(int peerRank) - { - return peerIdx(peerRank, rank) * 3 + 2; - } - - void freeGPUResources() - { - if (scratch) - CUDACHECK(cudaFree(scratch)); - scratch = nullptr; - if (connFlags) - CUDACHECK(cudaFree(connFlags)); - connFlags = nullptr; - if (conns) - CUDACHECK(cudaFree(conns)); - conns = nullptr; - if (barrier) - CUDACHECK(cudaFree(barrier)); - barrier = nullptr; - } -}; - -// The builder class encapsulates the -template class AllreduceAllpairsBuilder -{ - AllreduceAllpairs d; - std::vector hostConns; - -public: - // The constructor is called after the user has allocated the buffer to be allreduced - AllreduceAllpairsBuilder(T* data, size_t size) - { - d.userData = data; - d.userSize = size; - d.scratch = nullptr; - d.connFlags = nullptr; - d.conns = nullptr; - d.barrier = nullptr; - } - - // connect is called after rank initialization but before connection setup - mscclppResult_t connect(mscclppComm_t comm) - { - MSCCLPPCHECK(mscclppCommRank(comm, &d.rank)); - MSCCLPPCHECK(mscclppCommSize(comm, &d.nRanks)); - - Volume myChunks = chunkVolume(d.userSize, d.nRanks, d.rank, 1); - d.scratchSize = myChunks.size * d.numPeers(); - - CUDACHECK(cudaMalloc(&d.scratch, d.scratchSize * sizeof(T))); - CUDACHECK(cudaMalloc(&d.connFlags, 3 * sizeof(uint64_t))); - CUDACHECK(cudaMemset(d.connFlags, 0, 3 * sizeof(uint64_t))); - - hostConns.resize(d.numPeers() * 3); - for (int peer = 0; peer < d.nRanks; ++peer) { - if (peer != d.rank) { - int sendTag = d.rank < peer ? 0 : 1; - int recvTag = d.rank < peer ? 1 : 0; - MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1SendConnIdx(peer), peer, d.userData, - d.userSize * sizeof(T), d.connFlags + 0, sendTag, mscclppTransportP2P, nullptr)); - MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1RecvConnIdx(peer), peer, d.scratch, - d.scratchSize * sizeof(T), d.connFlags + 1, recvTag, mscclppTransportP2P, nullptr)); - MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase2ConnIdx(peer), peer, d.userData, - d.userSize * sizeof(T), d.connFlags + 2, 2, mscclppTransportP2P, nullptr)); - } - } - - return mscclppSuccess; - } - - // finishSetup is called after connection setup and returns an algorithm object that is ready to be passed to a GPU - // kernel - AllreduceAllpairs finishSetup() - { - CUDACHECK(cudaMalloc(&d.conns, hostConns.size() * sizeof(mscclppDevConn_t))); - CUDACHECK( - cudaMemcpy(d.conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice)); - CUDACHECK(cudaMalloc(&d.barrier, sizeof(cuda::barrier))); - cuda::barrier initBarrier(d.numBlocks()); - CUDACHECK( - cudaMemcpy(d.barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); - return d; - } -}; - -template __device__ void reduceSum(T* dst, T* src, size_t size) -{ - for (int i = threadIdx.x; i < size; i += blockDim.x) { - dst[i] += src[i]; - } -} - -template __global__ void init(T* data, size_t size, int rank) -{ - for (int i = threadIdx.x; i < size; i += blockDim.x) { - data[i] = rank; - } -} - -// The main test kernel -template __global__ void testKernel(AllreduceAllpairs d) -{ - d.run(blockIdx.x); -} - -int main(int argc, const char* argv[]) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Init(NULL, NULL); -#endif - const char* ip_port; - int rank, world_size; - parse_arguments(argc, argv, &ip_port, &rank, &world_size); - - CUDACHECK(cudaSetDevice(rank)); - - // Allocate and initialize 1 MB of data - int* data; - size_t dataSize = 1024 * 1024 / sizeof(int); - CUDACHECK(cudaMalloc(&data, dataSize * sizeof(int))); - init<<<1, 256>>>(data, dataSize, rank); - - // Create the collective - AllreduceAllpairsBuilder builder(data, dataSize); - - // Create the communicator - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); - - // Connect the collective - builder.connect(comm); - - // Finish the setup - MSCCLPPCHECK(mscclppConnectionSetup(comm)); - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - auto allreduce = builder.finishSetup(); - - // Run the collective - testKernel<<>>(allreduce); - - // Wait for kernel to finish - CUDACHECK(cudaDeviceSynchronize()); - - // Check the result - int* hostData = new int[dataSize]; - CUDACHECK(cudaMemcpy(hostData, data, dataSize * sizeof(int), cudaMemcpyDeviceToHost)); - int expectedValue = world_size * (world_size - 1) / 2; - for (size_t i = 0; i < dataSize; ++i) { - if (hostData[i] != expectedValue) { - printf("Error at index %lu: %d != %d\n", i, hostData[i], expectedValue); - return 1; - } - } - - MSCCLPPCHECK(mscclppProxyStop(comm)); - - MSCCLPPCHECK(mscclppCommDestroy(comm)); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc == 2) { - MPI_Finalize(); - } -#endif - printf("Succeeded! %d\n", rank); - return 0; -} \ No newline at end of file diff --git a/tests/allreduce_test.cu b/tests/allreduce_test.cu new file mode 100644 index 00000000..671e2594 --- /dev/null +++ b/tests/allreduce_test.cu @@ -0,0 +1,284 @@ +#include +#include +#include + +#include "comm.h" +#include "common.h" + +#define ALIGN 4 + +const int phase2Tag = 2; +mscclppDevConn_t* conns; +void* scratch = nullptr; +void* sendRecvData = nullptr; +cuda::barrier* barrier = nullptr; + +struct Chunk +{ + size_t offset; + size_t size; +}; + +inline int getSendTag(int rank, int peer) +{ + return rank < peer ? 0 : 1; +} + +inline int getRecvTag(int rank, int peer) +{ + return rank < peer ? 1 : 0; +} + +__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) +{ + size_t remainder = dataCount % numChunks; + size_t smallChunkSize = dataCount / numChunks; + size_t largeChunkSize = smallChunkSize + 1; + size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; + size_t numSmallChunks = chunkCount - numLargeChunks; + size_t offset = + (remainder - numLargeChunks) * largeChunkSize + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize; + return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize}; +} + +__host__ __device__ int peerIdx(int peerRank, int rank) +{ + return peerRank < rank ? peerRank : peerRank - 1; +} + +__host__ __device__ int peerRank(int peerIdx, int rank) +{ + return peerIdx < rank ? peerIdx : peerIdx + 1; +} + +__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) +{ + return peerIdx(peerRank, rank) * 3; +} + +__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) +{ + return peerIdx(peerRank, rank) * 3 + 1; +} + +__host__ __device__ int phase2ConnIdx(int peerRank, int rank) +{ + return peerIdx(peerRank, rank) * 3 + 2; +} + +__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) +{ + if (threadIdx.x == 0) { + conn.putWithSignalAndFlush(dstOffset, srcOffset, size); + } + __syncthreads(); +} + +__device__ void recv(mscclppDevConn_t& conn) +{ + if (threadIdx.x == 0) { + conn.wait(); + } + __syncthreads(); +} + +__device__ void reduceSum(int* dst, int* src, size_t size) +{ + for (int i = threadIdx.x; i < size; i += blockDim.x) { + dst[i] += src[i]; + } +} + +__global__ void initData(int* data, size_t size, int rank) +{ + for (int i = threadIdx.x; i < size; i += blockDim.x) { + data[i] = rank; + } +} + +__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount, + mscclppDevConn_t* conns, void* scratch, void* sendRecvData, + cuda::barrier* barrier) +{ + int idx = blockIdx.x; + int peer = peerRank(idx, rank); + mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)]; + mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(peer, rank)]; + mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(peer, rank)]; + + // 1st communication phase: send data to the scratch buffer of the peer associated with this block + Chunk toPeerChunk = getChunk(dataCount, nRanks, peer, 1); + // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. + // The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and + // inside that each of the destination's peers send to the nth chunk, where n is the index of the + // source peer from the destination's perspective. + size_t dstOffset = peerIdx(rank, peer) * toPeerChunk.size; + send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int)); + recv(phase1RecvConn); + + if (threadIdx.x == 0) + barrier->arrive_and_wait(); + __syncthreads(); + + // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer + Chunk rankChunk = getChunk(dataCount, nRanks, rank, 1); + int* chunk = (int*)sendRecvData + rankChunk.offset; + int numPeers = nRanks - 1, numBlocks = nRanks - 1; + Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, idx, 1); + for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) { + assert(scratchDataCount % numPeers == 0); + assert(scratchDataCount / numPeers == rankChunk.size); + size_t scratchDataCountPerPeer = scratchDataCount / numPeers; + int* scratchChunk = (int*)scratch + peerIdx * scratchDataCountPerPeer; + Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, idx, 1); + assert(blockScratchChunk.size == blockUserChunk.size); + reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); + } + + if (threadIdx.x == 0) + barrier->arrive_and_wait(); + __syncthreads(); + + // 2nd communication phase: send the now reduced data between the user buffers + Chunk collectionChunk = getChunk(dataCount, nRanks, rank, 1); + send(phase2Conn, collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int), + collectionChunk.size * sizeof(int)); + recv(phase2Conn); +} + +void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, + size_t* recvInplaceOffset, size_t count, int nranks) +{ + size_t base = (count / ALIGN) * ALIGN; + *sendcount = base; + *recvcount = base; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = base; +} + +void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) +{ + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllReduceInitData(struct testArgs* args, int in_place) +{ + size_t recvcount = args->expectedBytes / sizeof(int); + + CUDACHECK(cudaSetDevice(args->gpuNum)); + CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes)); + initData<<<1, 256>>>((int*)args->recvbuff, recvcount, args->proc); + + int* dataHost = new int[recvcount]; + for (size_t i = 0; i < recvcount; i++) { + dataHost[i] = args->totalProcs * (args->totalProcs - 1) / 2; + } + CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); + delete dataHost; + CUDACHECK(cudaDeviceSynchronize()); + MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm)); + return testSuccess; +} + +void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) +{ + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = (2 * (double)(nranks - 1)) / ((double)nranks); + *busBw = baseBw * factor; +} + +testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm, + cudaStream_t stream, int kernelNum) +{ + int worldSize = comm->nRanks; + int nPeers = worldSize - 1; + int dataCount = nBytes / sizeof(int); + Chunk chunk = getChunk(dataCount, worldSize, comm->rank, 1); + size_t scratchDataCount = chunk.size * nPeers; + allReduceKernel0<<>>(comm->rank, worldSize, dataCount, scratchDataCount, conns, + scratch, sendRecvData, barrier); + return testSuccess; +} + +struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, AllReduceInitData, AllReduceGetBw, + AllReduceRunColl}; + +testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) +{ + int rank = args->proc, worldSize = args->totalProcs; + size_t bufferSize = args->maxbytes; + Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1); + int nPeers = args->totalProcs - 1; + size_t scratchBytes = chunk.size * nPeers * sizeof(int); + + CUDACHECK(cudaMalloc(&scratch, scratchBytes)); + + for (int peer = 0; peer < worldSize; ++peer) { + if (peer != args->proc) { + int sendTag = getSendTag(args->proc, peer); + int recvTag = getRecvTag(args->proc, peer); + MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); + MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr)); + MSCCLPPCHECK( + mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); + } + } + MSCCLPPCHECK(mscclppConnectionSetup(args->comm)); + + return testSuccess; +} + +testResult_t AllReduceTeardownMscclppConnections() +{ + if (scratch != nullptr) { + CUDACHECK(cudaFree(scratch)); + scratch = nullptr; + } + return testSuccess; +} + +testResult_t AllReduceRunTest(struct testArgs* args) +{ + args->collTest = &allReduceTest; + + sendRecvData = args->recvbuff; + CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier))); + cuda::barrier initBarrier(args->totalProcs - 1); + CUDACHECK( + cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); + int nPeers = args->totalProcs - 1; + int rank = args->proc; + std::vector hostConns(nPeers * 3, mscclppDevConn_t()); + + for (int peer = 0; peer < args->totalProcs; ++peer) { + mscclppDevConn_t* devConn; + if (peer != rank) { + int sendTag = getSendTag(args->proc, peer); + int recvTag = getRecvTag(args->proc, peer); + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, sendTag, &devConn)); + hostConns[phase1SendConnIdx(peer, rank)] = *devConn; + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, recvTag, &devConn)); + hostConns[phase1RecvConnIdx(peer, rank)] = *devConn; + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, phase2Tag, &devConn)); + hostConns[phase2ConnIdx(peer, rank)] = *devConn; + } + } + CUDACHECK(cudaMalloc(&conns, nPeers * 3 * sizeof(mscclppDevConn_t))); + CUDACHECK(cudaMemcpy(conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice)); + + TESTCHECK(TimeTest(args)); + + CUDACHECK(cudaFree(barrier)); + CUDACHECK(cudaFree(conns)); + + return testSuccess; +} + +struct testEngine allReduceEngine = {AllReduceGetBuffSize, AllReduceRunTest, AllReduceSetupMscclppConnections, + AllReduceTeardownMscclppConnections}; + +#pragma weak mscclppTestEngine = allReduceEngine diff --git a/tests/common.cu b/tests/common.cu index 77c3223f..1d424452 100644 --- a/tests/common.cu +++ b/tests/common.cu @@ -224,12 +224,6 @@ testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts) CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost)); CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost)); - for (size_t i = 0; i < count; i++) { - if (dataHostRecv[i] != dataHostExpected[i]) { - *wrongElts += 1; - } - } - if (args->reportErrors && *wrongElts) { (args->error)++; } @@ -415,13 +409,20 @@ testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode, testResult_t runTests(struct testArgs* args) { PRINT("# Setting up the connection in MSCCL++\n"); - TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff, - args->maxbytes)); + if (mscclppTestEngine.setupMscclppConnections != nullptr) { + TESTCHECK(mscclppTestEngine.setupMscclppConnections(args)); + } else { + TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff, + args->maxbytes)); + } PRINT("# Launching MSCCL++ proxy threads\n"); MSCCLPPCHECK(mscclppProxyLaunch(args->comm)); TESTCHECK(mscclppTestEngine.runTest(args)); PRINT("Stopping MSCCL++ proxy threads\n"); MSCCLPPCHECK(mscclppProxyStop(args->comm)); + if (mscclppTestEngine.teardownMscclppConnections != nullptr) { + TESTCHECK(mscclppTestEngine.teardownMscclppConnections()); + } return testSuccess; } diff --git a/tests/common.h b/tests/common.h index 39766de0..9003b6e7 100644 --- a/tests/common.h +++ b/tests/common.h @@ -80,6 +80,8 @@ struct testEngine void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks); // We can add more parameters for other communication primitives testResult_t (*runTest)(struct testArgs* args); + testResult_t (*setupMscclppConnections)(struct testArgs* args); + testResult_t (*teardownMscclppConnections)(); }; extern struct testEngine mscclppTestEngine;