From 216373eab2d57f2a3658e4e97a8d2c61a9fd4986 Mon Sep 17 00:00:00 2001 From: Binyang2014 Date: Tue, 23 May 2023 15:37:25 +0800 Subject: [PATCH] Add allgather test to mscclpp-test (#78) Finish allGather Co-authored-by: Changho Hwang --- test/allgather_test.cu | 220 ------------ test/allgather_test_standalone.cu | 501 ---------------------------- test/mscclpp-test/CMakeLists.txt | 9 +- test/mscclpp-test/allgather_test.cu | 260 +++++++++++++++ test/mscclpp-test/common.cu | 15 +- test/mscclpp-test/common.hpp | 8 +- test/mscclpp-test/sendrecv_test.cu | 7 +- 7 files changed, 287 insertions(+), 733 deletions(-) delete mode 100644 test/allgather_test.cu delete mode 100644 test/allgather_test_standalone.cu create mode 100644 test/mscclpp-test/allgather_test.cu diff --git a/test/allgather_test.cu b/test/allgather_test.cu deleted file mode 100644 index 764f33fb..00000000 --- a/test/allgather_test.cu +++ /dev/null @@ -1,220 +0,0 @@ -#include "comm.h" -#include "common.h" - -#include -#include - -#define ALIGN 4 -__constant__ mscclppDevConn_t constDevConns[16]; - -__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) -{ - // this allgather is really simple and implemented as an alltoall - - // this thread's role is a sender role - // put your data asynchronously - if (threadIdx.x % 32 == 0) - devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); - // make sure everyone is put their data before some thread randomly blocks everyone else in signal - __syncthreads(); - // push with flag and sync to make sure the data is received - if (threadIdx.x % 32 == 0) - devConn.flush(); - - // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if (threadIdx.x % 32 == 0) - devConn.wait(); -} - -__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - uint64_t offset, uint64_t size) -{ - // this allgather algorithm works as follows: - // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode - // and waits for data from GPU rank (i-1) % nranksPerNode - // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode - // ... - // This order is much better for DMA engine for NVLinks - for (int i = 1; i < nranksPerNode; i++) { - if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { - // put your data to GPU (rank+i) % nranksPerNode and signal in one call - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(offset, size); - } - // wait for the data from GPU (rank-i) % nranksPerNode to arrive - if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); - } -} - -__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) -{ - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), - nelemsPerGPU * sizeof(int)); -} - -__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) -{ - // this allgather is a pipelined and hierarchical one and only works for two nodes - // it is implemented as follows: - // Step 1: each node does a local allgather and concurrently, - // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with - // its cross-node neighbor (local GPU i on the other node) via IB - // Step 2: each node does a local allgather again with the data just received from its - // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with - // its cross-node neighbor - // Step 3: each node does a local allgather for the last time with the rest of the data - - int pipelineSize = 3; - - // Step 1 - // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), - nelemsPerGPU * sizeof(int)); - } - // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { - // opposite side - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), - (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - - __syncthreads(); - - // Step 2 - // local allgather - int otherNghr = (rank + nranksPerNode) % world_size; - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), - (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - } - - // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { - // opposite side - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * - sizeof(int), - nelemsPerGPU / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - - __syncthreads(); - - // Step 3 - // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, - (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), - nelemsPerGPU / pipelineSize * sizeof(int)); - } -} - -__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) -{ - // find the mapping between remoteRank and devConns - int warpId = threadIdx.x / 32; - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - // Each warp is responsible for one of the remote ranks - mscclppDevConn_t devConn = constDevConns[warpId]; - - if (kernel == 0) - allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU); - else if (kernel == 1) - allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); - else if (kernel == 2) - allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); -} - -void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, - size_t* recvInplaceOffset, size_t count, int nranks) -{ - size_t base = (count / (ALIGN * nranks)) * ALIGN; - *sendcount = base; - *recvcount = base * nranks; - *sendInplaceOffset = base; - *recvInplaceOffset = 0; - *paramcount = base; -} - -testResult_t AllGatherInitData(struct testArgs* args, int in_place) -{ - size_t sendcount = args->sendBytes / sizeof(int); - size_t recvcount = args->expectedBytes / sizeof(int); - // int nranks = args->totalProcs; - - CUDACHECK(cudaSetDevice(args->gpuNum)); - int rank = args->proc; - CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes)); - // void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0]; - - int* dataHost = new int[recvcount]; - for (size_t i = 0; i < recvcount; i++) { - int val = i + 1; - if (i / sendcount == (size_t)rank) { - dataHost[i] = val; - } else { - dataHost[i] = 0; - } - } - CUDACHECK(cudaMemcpy(args->recvbuff, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); - for (int i = 0; i < static_cast(recvcount); i++) { - dataHost[i] = i + 1; - } - CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); - delete dataHost; - CUDACHECK(cudaDeviceSynchronize()); - MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm)); - return testSuccess; -} - -void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) -{ - double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec; - - *algBw = baseBw; - double factor = ((double)(nranks - 1)) / ((double)nranks); - *busBw = baseBw * factor; -} - -testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, - cudaStream_t stream, int kernel_num) -{ - int worldSize = comm->nRanks; - kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), kernel_num); - return testSuccess; -} - -struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, defaultInitColl, AllGatherInitData, - AllGatherGetBw, AllGatherRunColl}; - -void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) -{ - size_t paramcount, sendInplaceOffset, recvInplaceOffset; - AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); -} - -testResult_t AllGatherRunTest(struct testArgs* args) -{ - args->collTest = &allGatherTest; - mscclppDevConn_t* devConns; - int nCons; - MSCCLPPCHECK(mscclppGetAllDeviceConnections(args->comm, &devConns, &nCons)); - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); - TESTCHECK(TimeTest(args)); - return testSuccess; -} - -struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest, nullptr, nullptr}; - -#pragma weak mscclppTestEngine = allGatherEngine diff --git a/test/allgather_test_standalone.cu b/test/allgather_test_standalone.cu deleted file mode 100644 index 05aa3304..00000000 --- a/test/allgather_test_standalone.cu +++ /dev/null @@ -1,501 +0,0 @@ -#include "mscclpp.h" - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include -#include -#include -#include -#include -#include - -static int nranksPerNode = 8; - -// Propagate errors up - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ - return res; \ - } \ - } while (0) - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -// Measure current time in second. -static double getTime(void) -{ - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} - -__constant__ mscclppDevConn_t constDevConns[16]; - -__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) -{ - // this allgather is really simple and implemented as an alltoall - - // this thread's role is a sender role - // put your data asynchronously - if ((threadIdx.x % 32) == 0) - devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); - // make sure everyone is put their data before some thread randomly blocks everyone else in signal - __syncthreads(); - // push with flag and sync to make sure the data is received - if ((threadIdx.x % 32) == 0) - devConn.flush(); - - // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if ((threadIdx.x % 32) == 0) - devConn.wait(); -} - -__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - uint64_t offset, uint64_t size) -{ - // this allgather algorithm works as follows: - // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode - // and waits for data from GPU rank (i-1) % nranksPerNode - // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode - // ... - // This order is much better for DMA engine for NVLinks - for (int i = 1; i < nranksPerNode; i++) { - if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { - // put your data to GPU (rank+i) % nranksPerNode and signal in one call - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(offset, size); - } - // wait for the data from GPU (rank-i) % nranksPerNode to arrive - if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); - } -} - -__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) -{ - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), - nelemsPerGPU * sizeof(int)); -} - -__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) -{ - // this allgather is a pipelined and hierarchical one and only works for two nodes - // it is implemented as follows: - // Step 1: each node does a local allgather and concurrently, - // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with - // its cross-node neighbor (local GPU i on the other node) via IB - // Step 2: each node does a local allgather again with the data just received from its - // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with - // its cross-node neighbor - // Step 3: each node does a local allgather for the last time with the rest of the data - - int pipelineSize = 3; - - // Step 1 - // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), - nelemsPerGPU * sizeof(int)); - } - // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { - // opposite side - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), - (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - - __syncthreads(); - - // Step 2 - // local allgather - int otherNghr = (rank + nranksPerNode) % world_size; - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), - (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - } - - // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { - // opposite side - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * - sizeof(int), - nelemsPerGPU / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - - __syncthreads(); - - // Step 3 - // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, - (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), - nelemsPerGPU / pipelineSize * sizeof(int)); - } -} - -__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) -{ - // find the mapping between remoteRank and devConns - int warpId = threadIdx.x / 32; - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - // Each warp is responsible for one of the remote ranks - mscclppDevConn_t devConn = constDevConns[warpId]; - - if (kernel == 0) - allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU); - else if (kernel == 1) - allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); - else if (kernel == 2) - allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); -} - -int rankToLocalRank(int rank) -{ - return rank % nranksPerNode; -} - -int rankToNode(int rank) -{ - return rank / nranksPerNode; -} - -void print_usage(const char* prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - printf("usage: %s IP:PORT [rank nranks]\n", prog); -#else - printf("usage: %s IP:PORT rank nranks\n", prog); -#endif -} - -void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, - int** data_d) -{ - CUDACHECK(cudaMalloc(data_d, dataSize)); - CUDACHECK(cudaMemset(*data_d, 0, dataSize)); - - *data_h = new int[nelemsPerGPU * world_size]; - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { - int val = i + 1; - if (i / nelemsPerGPU == (size_t)rank) { - (*data_h)[i] = val; - } else { - (*data_h)[i] = 0; - } - } - CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); -} - -mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize) -{ - int thisNode = rankToNode(rank); - int cudaNum = rankToLocalRank(rank); - std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); - - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = ibDevStr.c_str(); - if (rankToNode(r) == thisNode) { - ibDev = NULL; - transportType = mscclppTransportP2P; - } else { - transportType = mscclppTransportIB; - } - // Connect with all other ranks - MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev)); - } - - MSCCLPPCHECK(mscclppConnectionSetup(comm)); - - mscclppDevConn_t* devConns; - int nCons; - MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons)); - - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); - - return mscclppSuccess; -} - -void printUsage(const char* prog, bool isMpi) -{ - if (isMpi) { - std::string st = "you are using MPI for this test\n"; - st += "two possilbe usages are:\n"; - st += "> " + std::string(prog) + "\n"; - st += "or\n"; - st += "> " + std::string(prog) + " -ip_port [ip:port]\n"; - printf("%s", st.c_str()); - } else { - std::string st = "you are NOT using MPI for this test\n"; - st += "the only possible usage:\n"; - st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n"; - printf("%s", st.c_str()); - } -} - -std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) -{ - std::unordered_map options; - - for (int i = 1; i < argc; i++) { - std::string arg = argv[i]; - if (arg == "-rankspernode") { - if (isMpi) { - fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["rankspernode"] = argv[++i]; - } else { - fprintf(stderr, "Error: -rankspernode option requires an argument.\n"); - ; - exit(-1); - } - } else if (arg == "-kernel") { - if (i + 1 < argc) { - options["kernel"] = argv[++i]; - } else { - fprintf(stderr, "Error: -kernel option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-ip_port") { - if (i + 1 < argc) { - options["ip_port"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-rank") { - if (isMpi) { - fprintf(stderr, "Error: -rank should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["rank"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-nranks") { - if (isMpi) { - fprintf(stderr, "Error: -nranks should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["nranks"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-datasize") { - if (i + 1 < argc) { - options["datasize"] = argv[++i]; - } else { - fprintf(stderr, "Error: -datasize option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-help" || arg == "-h") { - printUsage(argv[0], isMpi); - exit(0); - } else { - fprintf(stderr, "Error: Unknown option %s\n", argv[i]); - exit(-1); - } - } - return options; -} - -int main(int argc, const char* argv[]) -{ - bool isMpi = false; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - isMpi = true; -#endif - - auto parsedArgs = parseArgs(argc, argv, isMpi); - - int rank; - int world_size; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - // get the local number of nodes with MPI - MPI_Comm shmcomm; - MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - int shmrank; - MPI_Comm_size(shmcomm, &shmrank); - nranksPerNode = shmrank; - MPI_Comm_free(&shmcomm); -#else - if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - rank = std::stoi(parsedArgs["rank"]); - world_size = std::stoi(parsedArgs["nranks"]); - if (parsedArgs.find("rankspernode") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - nranksPerNode = std::stoi(parsedArgs["rankspernode"]); -#endif - int kernelNum = 0; - if (parsedArgs.find("kernel") != parsedArgs.end()) { - kernelNum = std::stoi(parsedArgs["kernel"]); - } - char* ip_port = NULL; - if (parsedArgs.find("ip_port") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - ip_port = (char*)parsedArgs["ip_port"].c_str(); - - int thisNode = rankToNode(rank); - int cudaNum = rankToLocalRank(rank); - CUDACHECK(cudaSetDevice(cudaNum)); - - if (rank == 0) - printf("Initializing MSCCL++\n"); - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank)); - - int* data_d; - int* data_h; - size_t dataSize = 1024 * 1024 * 1024; - if (parsedArgs.find("datasize") != parsedArgs.end()) { - dataSize = std::stoul(parsedArgs["datasize"]); - } - size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; - - if (rank == 0) - printf("Initializing data for allgather test\n"); - initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); - - if (rank == 0) - printf("Setting up the connection in MSCCL++\n"); - MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize)); - - if (rank == 0) - printf("Launching MSCCL++ proxy threads\n"); - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - - if (rank == 0) - printf("Testing the correctness of AllGather implementation\n"); - cudaStream_t stream; - CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - CUDACHECK(cudaDeviceSynchronize()); - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - CUDACHECK(cudaDeviceSynchronize()); - CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost)); - - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { - int val = i + 1; - if (data_h[i] != val) { - printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val); - break; - } - } - int tmp[16]; - // A simple barrier - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - if (rank == 0) - printf("Successfully checked the correctness\n"); - - // Perf test - int iterwithoutcudagraph = 10; - if (rank == 0) - printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); - CUDACHECK(cudaStreamSynchronize(stream)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - for (int i = 0; i < iterwithoutcudagraph; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - } - CUDACHECK(cudaStreamSynchronize(stream)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // cudaGraph Capture - int cudagraphiter = 10; - if (rank == 0) - printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); - cudaGraph_t graph; - cudaGraphExec_t instance; - cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - for (int i = 0; i < cudagraphiter; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - } - cudaStreamEndCapture(stream, &graph); - cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - - int cudagraphwarmup = 10; - if (rank == 0) - printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, - cudagraphiter); - for (int i = 0; i < cudagraphwarmup; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - // measure runtime - int cudagraphlaunch = 10; - if (rank == 0) - printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, - cudagraphiter); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - double t0, t1, ms, time_in_us; - t0 = getTime(); - for (int i = 0; i < cudagraphlaunch; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - t1 = getTime(); - ms = (t1 - t0) * 1000.0; - time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; - printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, - (double)(dataSize) / 1e9 / (time_in_us / 1e6)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - if (rank == 0) - printf("Stopping MSCCL++ proxy threads\n"); - MSCCLPPCHECK(mscclppProxyStop(comm)); - - if (rank == 0) - printf("Destroying MSCCL++ communicator\n"); - MSCCLPPCHECK(mscclppCommDestroy(comm)); - printf("Rank %d succeeded!\n", rank); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Finalize(); -#endif - return 0; -} diff --git a/test/mscclpp-test/CMakeLists.txt b/test/mscclpp-test/CMakeLists.txt index 44c6efca..e60848e9 100644 --- a/test/mscclpp-test/CMakeLists.txt +++ b/test/mscclpp-test/CMakeLists.txt @@ -1,2 +1,7 @@ -add_executable(sendrecv_test_perf sendrecv_test.cu common.cu) -target_link_libraries(sendrecv_test_perf mscclpp MPI::MPI_CXX) +function(add_mscclpp_test_executable name sources) + add_executable(${name} ${sources} common.cu) + target_link_libraries(${name} mscclpp MPI::MPI_CXX CUDA::cudart CUDA::cuda_driver) +endfunction() + +add_mscclpp_test_executable(sendrecv_test_perf sendrecv_test.cu) +add_mscclpp_test_executable(allgather_test_perf allgather_test.cu) diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu new file mode 100644 index 00000000..0517f7c9 --- /dev/null +++ b/test/mscclpp-test/allgather_test.cu @@ -0,0 +1,260 @@ +#include + +#include +#include + +#include "common.hpp" + +#define ALIGN 4 +__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16]; + +__device__ void allgather0(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, int remoteRank, + size_t nelemsPerGPU) { + // this allgather is really simple and implemented as an alltoall + + // this thread's role is a sender role + // put your data asynchronously + if (threadIdx.x % 32 == 0) devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + // make sure everyone is put their data before some thread randomly blocks everyone else in signal + __syncthreads(); + // push with flag and sync to make sure the data is received + if (threadIdx.x % 32 == 0) devChan.flush(); + + // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready + if (threadIdx.x % 32 == 0) devChan.wait(); +} + +__device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, + int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) { + // this allgather algorithm works as follows: + // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode + // and waits for data from GPU rank (i-1) % nranksPerNode + // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode + // ... + // This order is much better for DMA engine for NVLinks + for (int i = 1; i < nranksPerNode; i++) { + if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { + // put your data to GPU (rank+i) % nranksPerNode and signal in one call + if ((threadIdx.x % 32) == 0) devChan.putWithSignalAndFlush(offset, size); + } + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { + if ((threadIdx.x % 32) == 0) devChan.wait(); + } + asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); + } +} + +__device__ void allgather1(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, int nranksPerNode, + int remoteRank, size_t nelemsPerGPU) { + localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + nelemsPerGPU * sizeof(int)); +} + +__device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, int nranksPerNode, + int remoteRank, size_t nelemsPerGPU) { + // this allgather is a pipelined and hierarchical one and only works for two nodes + // it is implemented as follows: + // Step 1: each node does a local allgather and concurrently, + // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with + // its cross-node neighbor (local GPU i on the other node) via IB + // Step 2: each node does a local allgather again with the data just received from its + // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with + // its cross-node neighbor + // Step 3: each node does a local allgather for the last time with the rest of the data + + int pipelineSize = 3; + + // Step 1 + // local allgather + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + nelemsPerGPU * sizeof(int)); + } + // cross-node exchange + if (remoteRank % nranksPerNode == rank % nranksPerNode) { + // opposite side + if ((threadIdx.x % 32) == 0) + devChan.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + if ((threadIdx.x % 32) == 0) devChan.wait(); + } + + __syncthreads(); + + // Step 2 + // local allgather + int otherNghr = (rank + nranksPerNode) % worldSize; + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + } + + // cross-node exchange + if (remoteRank % nranksPerNode == rank % nranksPerNode) { + // opposite side + if ((threadIdx.x % 32) == 0) + devChan.putWithSignalAndFlush( + (rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + if ((threadIdx.x % 32) == 0) devChan.wait(); + } + + __syncthreads(); + + // Step 3 + // local allgather + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, + (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + } +} + +__global__ void kernel(int rank, int worldSize, int nranksPerNode, size_t nelemsPerGPU, int kernel) { + // find the mapping between remoteRank and devConns + int warpId = threadIdx.x / 32; + int remoteRank = (warpId < rank) ? warpId : warpId + 1; + // Each warp is responsible for one of the remote ranks + mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[warpId]; + + if (kernel == 0) + allgather0(devChan, rank, worldSize, remoteRank, nelemsPerGPU); + else if (kernel == 1) + allgather1(devChan, rank, worldSize, nranksPerNode, remoteRank, nelemsPerGPU); + else if (kernel == 2) + allgather2(devChan, rank, worldSize, nranksPerNode, remoteRank, nelemsPerGPU); +} + +class AllGatherTestColl : public BaseTestColl { + public: + AllGatherTestColl() = default; + ~AllGatherTestColl() override = default; + + void runColl(const TestArgs& args, cudaStream_t stream) override; + void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; + void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override; + void setupCollTest(size_t size) override; +}; + +void AllGatherTestColl::runColl(const TestArgs& args, cudaStream_t stream) { + const int worldSize = args.totalRanks; + const int rank = args.rank; + const int nRanksPerNode = args.nRanksPerNode; + const int kernelNum = args.kernelNum; + kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(rank, worldSize, nRanksPerNode, paramCount_, kernelNum); +} + +void AllGatherTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { + assert(sendBuff.size() == 1); + int rank = args.rank; + std::vector dataHost(std::max(sendCount_, recvCount_), 0); + for (size_t i = 0; i < recvCount_; i++) { + int val = i + 1; + if (i / sendCount_ == (size_t)rank) { + dataHost[i] = val; + } else { + dataHost[i] = 0; + } + } + CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), recvCount_ * typeSize_, cudaMemcpyHostToDevice)); + + for (size_t i = 0; i < recvCount_; i++) { + dataHost[i] = static_cast(i) + 1; + } + std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_); +} + +void AllGatherTestColl::getBw(const double deltaSec, double& algBw, double& busBw) { + double baseBw = (double)(paramCount_ * typeSize_ * worldSize_) / 1.0E9 / deltaSec; + + algBw = baseBw; + double factor = ((double)(worldSize_ - 1)) / ((double)worldSize_); + busBw = baseBw * factor; +} + +void AllGatherTestColl::setupCollTest(size_t size) { + size_t count = size / typeSize_; + size_t base = (count / (ALIGN * worldSize_)) * ALIGN; + sendCount_ = base; + recvCount_ = base * worldSize_; + paramCount_ = base; + expectedCount_ = recvCount_; +} + +class AllGatherTestEngine : public BaseTestEngine { + public: + AllGatherTestEngine() = default; + ~AllGatherTestEngine() override = default; + + void allocateBuffer() override; + void setupConnections() override; + + private: + std::vector getSendBuff() override; + void* getExpectedBuff() override; + void* getRecvBuff() override; + + std::shared_ptr sendBuff_; + std::shared_ptr expectedBuff_; +}; + +void AllGatherTestEngine::allocateBuffer() { + sendBuff_ = mscclpp::makeSharedCuda(args_.maxBytes / sizeof(int)); + expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); +} + +void AllGatherTestEngine::setupConnections() { + const int worldSize = args_.totalRanks; + const int rank = args_.rank; + const int nRanksPerNode = args_.nRanksPerNode; + const int thisNode = rank / nRanksPerNode; + const mscclpp::Transport ibTransport = IBs[args_.gpuNum]; + + std::vector channelIds; + std::vector localMemories; + std::vector> remoteMemories; + + auto rankToNode = [&](int rank) { return rank / nRanksPerNode; }; + for (int r = 0; r < worldSize; r++) { + if (r == rank) { + continue; + } + mscclpp::Transport transport; + if (rankToNode(r) == thisNode) { + transport = mscclpp::Transport::CudaIpc; + } else { + transport = ibTransport; + } + // Connect with all other ranks + channelIds.push_back(chanService_->addChannel(comm_->connectOnSetup(r, 0, transport))); + auto memory = comm_->registerMemory(sendBuff_.get(), args_.maxBytes, mscclpp::Transport::CudaIpc | ibTransport); + localMemories.push_back(memory); + comm_->sendMemoryOnSetup(memory, r, 0); + remoteMemories.push_back(comm_->recvMemoryOnSetup(r, 0)); + } + comm_->setup(); + + std::vector devChannels; + for (size_t i = 0; i < channelIds.size(); ++i) { + devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(chanService_->deviceChannel(channelIds[i]), + chanService_->addMemory(remoteMemories[i].get()), + chanService_->addMemory(localMemories[i]))); + } + + assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel)); + CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(), + sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size())); +} + +std::vector AllGatherTestEngine::getSendBuff() { return {sendBuff_.get()}; } + +void* AllGatherTestEngine::getExpectedBuff() { return expectedBuff_.get(); } + +void* AllGatherTestEngine::getRecvBuff() { + // in-place operation reuse the send buffer + return sendBuff_.get(); +} + +std::shared_ptr getTestEngine() { return std::make_shared(); } +std::shared_ptr getTestColl() { return std::make_shared(); } diff --git a/test/mscclpp-test/common.cu b/test/mscclpp-test/common.cu index 236ee31f..21b3ca25 100644 --- a/test/mscclpp-test/common.cu +++ b/test/mscclpp-test/common.cu @@ -94,6 +94,13 @@ BaseTestEngine::BaseTestEngine(bool inPlace) : error_(0), inPlace_(inPlace) { BaseTestEngine::~BaseTestEngine() { cudaStreamDestroy(stream_); } +void BaseTestColl::setupCollTest(const TestArgs& args, size_t size) +{ + this->worldSize_ = args.totalRanks; + this->typeSize_ = sizeof(int); + this->setupCollTest(size); +} + double BaseTestEngine::benchTime() { // Performance Benchmark cudaGraph_t graph; @@ -128,7 +135,7 @@ void BaseTestEngine::barrier() { void BaseTestEngine::runTest() { // warm-up for large size - this->coll_->setupCollTest(args_.maxBytes, sizeof(int)); + this->coll_->setupCollTest(args_, args_.maxBytes); this->barrier(); for (int iter = 0; iter < warmup_iters; iter++) { this->coll_->runColl(args_, stream_); @@ -136,7 +143,7 @@ void BaseTestEngine::runTest() CUDATHROW(cudaDeviceSynchronize()); // warm-up for small size - this->coll_->setupCollTest(args_.minBytes, sizeof(int)); + this->coll_->setupCollTest(args_, args_.minBytes); this->barrier(); for (int iter = 0; iter < warmup_iters; iter++) { this->coll_->runColl(args_, stream_); @@ -153,14 +160,14 @@ void BaseTestEngine::runTest() // Benchmark for (size_t size = args_.minBytes; size <= args_.maxBytes; size = ((args_.stepFactor > 1) ? size * args_.stepFactor : size + args_.stepBytes)) { - coll_->setupCollTest(size, sizeof(int)); + coll_->setupCollTest(args_, size); this->coll_->initData(this->args_, this->getSendBuff(), this->getExpectedBuff()); PRINT("%12li %12li", max(coll_->getSendBytes(), coll_->getExpectedBytes()), coll_->getParamBytes() / sizeof(int)); double deltaSec = benchTime(); size_t nErrors = 0; if (args_.reportErrors) { - this->coll_->setupCollTest(size, sizeof(int)); + this->coll_->setupCollTest(args_, size); this->coll_->initData(this->args_, this->getSendBuff(), this->getExpectedBuff()); this->barrier(); this->coll_->runColl(args_, stream_); diff --git a/test/mscclpp-test/common.hpp b/test/mscclpp-test/common.hpp index 185e77e1..1c9ad85f 100644 --- a/test/mscclpp-test/common.hpp +++ b/test/mscclpp-test/common.hpp @@ -40,10 +40,10 @@ class BaseTestColl { BaseTestColl() {} virtual ~BaseTestColl() {} virtual void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) = 0; - virtual void setupCollTest(size_t size, size_t typeSize) = 0; virtual void runColl(const TestArgs& args, cudaStream_t stream) = 0; virtual void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) = 0; + void setupCollTest(const TestArgs& args, size_t size); size_t getSendBytes() { return sendCount_ * typeSize_; } size_t getRecvBytes() { return recvCount_ * typeSize_; } size_t getExpectedBytes() { return expectedCount_ * typeSize_; } @@ -55,11 +55,15 @@ class BaseTestColl { size_t expectedCount_; size_t paramCount_; int typeSize_; + int worldSize_; + + private: + virtual void setupCollTest(size_t size) = 0; }; class BaseTestEngine { public: - BaseTestEngine(bool inPlace); + BaseTestEngine(bool inPlace = true); virtual ~BaseTestEngine(); virtual void allocateBuffer() = 0; diff --git a/test/mscclpp-test/sendrecv_test.cu b/test/mscclpp-test/sendrecv_test.cu index f683bec4..6b885976 100644 --- a/test/mscclpp-test/sendrecv_test.cu +++ b/test/mscclpp-test/sendrecv_test.cu @@ -55,7 +55,7 @@ class SendRecvTestColl : public BaseTestColl { void runColl(const TestArgs& args, cudaStream_t stream) override; void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override; - void setupCollTest(size_t size, size_t typeSize) override; + void setupCollTest(size_t size) override; }; void SendRecvTestColl::runColl(const TestArgs& args, cudaStream_t stream) { @@ -89,14 +89,13 @@ void SendRecvTestColl::initData(const TestArgs& args, std::vector sendBuf std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_); } -void SendRecvTestColl::setupCollTest(size_t size, size_t typeSize) { - size_t count = size / typeSize; +void SendRecvTestColl::setupCollTest(size_t size) { + size_t count = size / typeSize_; size_t base = (count / ALIGN) * ALIGN; sendCount_ = base; recvCount_ = base; paramCount_ = base; expectedCount_ = base; - typeSize_ = typeSize; mscclpp::DeviceSyncer syncer = {}; CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer)));