From 1b2db68e93db9433275920fffe8aefa5bc84d859 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Thu, 6 Apr 2023 23:55:11 +0000 Subject: [PATCH] filename changes --- Makefile | 4 +- tests/allgather_test.cu | 420 +++++------------------- tests/allgather_test2.cu | 295 ----------------- tests/allgather_test3.cu | 225 ------------- tests/allgather_test_standalone.cu | 501 +++++++++++++++++++++++++++++ 5 files changed, 575 insertions(+), 870 deletions(-) delete mode 100644 tests/allgather_test2.cu delete mode 100644 tests/allgather_test3.cu create mode 100644 tests/allgather_test_standalone.cu diff --git a/Makefile b/Makefile index 6c3577a1..692ae56b 100644 --- a/Makefile +++ b/Makefile @@ -129,13 +129,13 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR) LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR).$(MSCCLPP_PATCH) TESTSDIR := tests -TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test.cu) +TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test_standalone.cu) TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS)) TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) MSCLLPPTESTSOBJSDIR:= $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR) -MSCLLPPTESTBINFILESLIST := allgather_test3 +MSCLLPPTESTBINFILESLIST := allgather_test MSCLLPPTESTBINS := $(MSCLLPPTESTBINFILESLIST:%=$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf) INCLUDE := -Isrc -Isrc/include diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 05aa3304..1f16fba9 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -1,50 +1,16 @@ -#include "mscclpp.h" +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include -#include -#include +#include "comm.h" +#include "common.h" + +#include #include -#include -#include - -static int nranksPerNode = 8; - -// Propagate errors up - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ - return res; \ - } \ - } while (0) - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -// Measure current time in second. -static double getTime(void) -{ - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} +#define ALIGN 4 __constant__ mscclppDevConn_t constDevConns[16]; __device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) @@ -53,16 +19,16 @@ __device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, i // this thread's role is a sender role // put your data asynchronously - if ((threadIdx.x % 32) == 0) + if (threadIdx.x % 32 == 0) devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); // make sure everyone is put their data before some thread randomly blocks everyone else in signal __syncthreads(); // push with flag and sync to make sure the data is received - if ((threadIdx.x % 32) == 0) + if (threadIdx.x % 32 == 0) devConn.flush(); // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if ((threadIdx.x % 32) == 0) + if (threadIdx.x % 32 == 0) devConn.wait(); } @@ -176,326 +142,84 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); } -int rankToLocalRank(int rank) +void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, + size_t* recvInplaceOffset, size_t count, int nranks) { - return rank % nranksPerNode; + size_t base = (count / (ALIGN * nranks)) * ALIGN; + *sendcount = base; + *recvcount = base * nranks; + *sendInplaceOffset = base; + *recvInplaceOffset = 0; + *paramcount = base; } -int rankToNode(int rank) +testResult_t AllGatherInitData(struct threadArgs* args, int in_place) { - return rank / nranksPerNode; -} + size_t sendcount = args->sendBytes / sizeof(int); + size_t recvcount = args->expectedBytes / sizeof(int); + // int nranks = args->totalProcs; -void print_usage(const char* prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - printf("usage: %s IP:PORT [rank nranks]\n", prog); -#else - printf("usage: %s IP:PORT rank nranks\n", prog); -#endif -} + CUDACHECK(cudaSetDevice(args->gpus[0])); + int rank = args->proc; + CUDACHECK(cudaMemset(args->recvbuffs[0], 0, args->expectedBytes)); + // void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0]; -void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, - int** data_d) -{ - CUDACHECK(cudaMalloc(data_d, dataSize)); - CUDACHECK(cudaMemset(*data_d, 0, dataSize)); - - *data_h = new int[nelemsPerGPU * world_size]; - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int* dataHost = new int[recvcount]; + for (size_t i = 0; i < recvcount; i++) { int val = i + 1; - if (i / nelemsPerGPU == (size_t)rank) { - (*data_h)[i] = val; + if (i / sendcount == (size_t)rank) { + dataHost[i] = val; } else { - (*data_h)[i] = 0; + dataHost[i] = 0; } } - CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); + CUDACHECK(cudaMemcpy(args->recvbuffs[0], dataHost, recvcount, cudaMemcpyHostToDevice)); + for (int i = 0; i < static_cast(recvcount); i++) { + dataHost[i] = i + 1; + } + CUDACHECK(cudaMemcpy(args->expected[0], dataHost, recvcount, cudaMemcpyHostToDevice)); + delete dataHost; + CUDACHECK(cudaDeviceSynchronize()); + return testSuccess; } -mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize) +void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { - int thisNode = rankToNode(rank); - int cudaNum = rankToLocalRank(rank); - std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); + double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec; - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = ibDevStr.c_str(); - if (rankToNode(r) == thisNode) { - ibDev = NULL; - transportType = mscclppTransportP2P; - } else { - transportType = mscclppTransportIB; - } - // Connect with all other ranks - MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev)); - } + *algBw = baseBw; + double factor = ((double)(nranks - 1)) / ((double)nranks); + *busBw = baseBw * factor; +} - MSCCLPPCHECK(mscclppConnectionSetup(comm)); +testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, + cudaStream_t stream) +{ + int worldSize = comm->nRanks; + kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), 1); + return testSuccess; +} +struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, AllGatherInitData, AllGatherGetBw, + AllGatherRunColl}; + +void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) +{ + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllGatherRunTest(struct threadArgs* args) +{ + args->collTest = &allGatherTest; mscclppDevConn_t* devConns; int nCons; - MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons)); - + MSCCLPPCHECK(mscclppGetAllDeviceConnections(args->comm, &devConns, &nCons)); CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); - - return mscclppSuccess; + TESTCHECK(TimeTest(args)); + return testSuccess; } -void printUsage(const char* prog, bool isMpi) -{ - if (isMpi) { - std::string st = "you are using MPI for this test\n"; - st += "two possilbe usages are:\n"; - st += "> " + std::string(prog) + "\n"; - st += "or\n"; - st += "> " + std::string(prog) + " -ip_port [ip:port]\n"; - printf("%s", st.c_str()); - } else { - std::string st = "you are NOT using MPI for this test\n"; - st += "the only possible usage:\n"; - st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n"; - printf("%s", st.c_str()); - } -} +struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest}; -std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) -{ - std::unordered_map options; - - for (int i = 1; i < argc; i++) { - std::string arg = argv[i]; - if (arg == "-rankspernode") { - if (isMpi) { - fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["rankspernode"] = argv[++i]; - } else { - fprintf(stderr, "Error: -rankspernode option requires an argument.\n"); - ; - exit(-1); - } - } else if (arg == "-kernel") { - if (i + 1 < argc) { - options["kernel"] = argv[++i]; - } else { - fprintf(stderr, "Error: -kernel option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-ip_port") { - if (i + 1 < argc) { - options["ip_port"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-rank") { - if (isMpi) { - fprintf(stderr, "Error: -rank should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["rank"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-nranks") { - if (isMpi) { - fprintf(stderr, "Error: -nranks should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["nranks"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-datasize") { - if (i + 1 < argc) { - options["datasize"] = argv[++i]; - } else { - fprintf(stderr, "Error: -datasize option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-help" || arg == "-h") { - printUsage(argv[0], isMpi); - exit(0); - } else { - fprintf(stderr, "Error: Unknown option %s\n", argv[i]); - exit(-1); - } - } - return options; -} - -int main(int argc, const char* argv[]) -{ - bool isMpi = false; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - isMpi = true; -#endif - - auto parsedArgs = parseArgs(argc, argv, isMpi); - - int rank; - int world_size; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - // get the local number of nodes with MPI - MPI_Comm shmcomm; - MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - int shmrank; - MPI_Comm_size(shmcomm, &shmrank); - nranksPerNode = shmrank; - MPI_Comm_free(&shmcomm); -#else - if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - rank = std::stoi(parsedArgs["rank"]); - world_size = std::stoi(parsedArgs["nranks"]); - if (parsedArgs.find("rankspernode") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - nranksPerNode = std::stoi(parsedArgs["rankspernode"]); -#endif - int kernelNum = 0; - if (parsedArgs.find("kernel") != parsedArgs.end()) { - kernelNum = std::stoi(parsedArgs["kernel"]); - } - char* ip_port = NULL; - if (parsedArgs.find("ip_port") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - ip_port = (char*)parsedArgs["ip_port"].c_str(); - - int thisNode = rankToNode(rank); - int cudaNum = rankToLocalRank(rank); - CUDACHECK(cudaSetDevice(cudaNum)); - - if (rank == 0) - printf("Initializing MSCCL++\n"); - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank)); - - int* data_d; - int* data_h; - size_t dataSize = 1024 * 1024 * 1024; - if (parsedArgs.find("datasize") != parsedArgs.end()) { - dataSize = std::stoul(parsedArgs["datasize"]); - } - size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; - - if (rank == 0) - printf("Initializing data for allgather test\n"); - initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); - - if (rank == 0) - printf("Setting up the connection in MSCCL++\n"); - MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize)); - - if (rank == 0) - printf("Launching MSCCL++ proxy threads\n"); - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - - if (rank == 0) - printf("Testing the correctness of AllGather implementation\n"); - cudaStream_t stream; - CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - CUDACHECK(cudaDeviceSynchronize()); - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - CUDACHECK(cudaDeviceSynchronize()); - CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost)); - - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { - int val = i + 1; - if (data_h[i] != val) { - printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val); - break; - } - } - int tmp[16]; - // A simple barrier - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - if (rank == 0) - printf("Successfully checked the correctness\n"); - - // Perf test - int iterwithoutcudagraph = 10; - if (rank == 0) - printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); - CUDACHECK(cudaStreamSynchronize(stream)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - for (int i = 0; i < iterwithoutcudagraph; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - } - CUDACHECK(cudaStreamSynchronize(stream)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // cudaGraph Capture - int cudagraphiter = 10; - if (rank == 0) - printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); - cudaGraph_t graph; - cudaGraphExec_t instance; - cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - for (int i = 0; i < cudagraphiter; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - } - cudaStreamEndCapture(stream, &graph); - cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - - int cudagraphwarmup = 10; - if (rank == 0) - printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, - cudagraphiter); - for (int i = 0; i < cudagraphwarmup; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - // measure runtime - int cudagraphlaunch = 10; - if (rank == 0) - printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, - cudagraphiter); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - double t0, t1, ms, time_in_us; - t0 = getTime(); - for (int i = 0; i < cudagraphlaunch; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - t1 = getTime(); - ms = (t1 - t0) * 1000.0; - time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; - printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, - (double)(dataSize) / 1e9 / (time_in_us / 1e6)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - if (rank == 0) - printf("Stopping MSCCL++ proxy threads\n"); - MSCCLPPCHECK(mscclppProxyStop(comm)); - - if (rank == 0) - printf("Destroying MSCCL++ communicator\n"); - MSCCLPPCHECK(mscclppCommDestroy(comm)); - printf("Rank %d succeeded!\n", rank); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Finalize(); -#endif - return 0; -} +#pragma weak mscclppTestEngine = allGatherEngine diff --git a/tests/allgather_test2.cu b/tests/allgather_test2.cu deleted file mode 100644 index 047869fb..00000000 --- a/tests/allgather_test2.cu +++ /dev/null @@ -1,295 +0,0 @@ -#include "mscclpp.h" -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include -#include -#include -#include - -#define RANKS_PER_NODE 8 - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -// Measure current time in second. -static double getTime(void) -{ - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} - -__constant__ mscclppDevConn_t constDevConns[16]; - -__global__ void kernel(int rank, int world_size, size_t nelemsPerGPU) -{ - if (threadIdx.x % 32 != 0) - return; - - int warpId = threadIdx.x / 32; - bool isIB = false; - if (warpId >= world_size - 1) - isIB = true; - if (isIB) - warpId = warpId - (world_size - 1); - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - mscclppDevConn_t devConn = constDevConns[remoteRank]; - if (isIB) - devConn = constDevConns[remoteRank + world_size]; - - // Each warp receives data from different ranks -#if 1 - - // Trigger sending data, flag and synchronize after - devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); - - devConn.wait(); - -#else - for (int i = 1; i < world_size; i++) { - __syncthreads(); - if (remoteRank != ((rank + i) % world_size)) - continue; - - // Trigger sending data, flag and synchronize after - size_t ibPortion = nelemsPerGPU / 12; // nelemsPerGPU/12; - if (isIB) - devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, - rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion) * sizeof(int), - rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion) * sizeof(int), - ibPortion * sizeof(int)); - else - devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), - rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU - ibPortion) * sizeof(int)); - // Wait on the request to make sure it is safe to reuse buffer and flag - auto req = devConn.fifo.putWithSignal(dataOffset, dataSize); - devConn.fifo.sync(req); - } - // Wait for receiving data from remote rank - while (*proxyFlag == baseFlag) - ; -#endif -} - -int rankToLocalRank(int rank) -{ - return rank % RANKS_PER_NODE; -} - -int rankToNode(int rank) -{ - return rank / RANKS_PER_NODE; -} - -int cudaNumToIbNum(int cudaNum) -{ - int ibNum; - if (cudaNum == 0) { - ibNum = 0; - } else if (cudaNum == 1) { - ibNum = 4; - } else if (cudaNum == 2) { - ibNum = 1; - } else if (cudaNum == 3) { - ibNum = 5; - } else if (cudaNum == 4) { - ibNum = 2; - } else if (cudaNum == 5) { - ibNum = 6; - } else if (cudaNum == 6) { - ibNum = 3; - } else if (cudaNum == 7) { - ibNum = 7; - } else { - printf("Invalid cudaNum: %d\n", cudaNum); - exit(EXIT_FAILURE); - } - return ibNum; -} - -void print_usage(const char* prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - printf("usage: %s IP:PORT [rank nranks]\n", prog); -#else - printf("usage: %s IP:PORT rank nranks\n", prog); -#endif -} - -int main(int argc, const char* argv[]) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc != 2 && argc != 4) { - print_usage(argv[0]); - return -1; - } - const char* ip_port = argv[1]; - int rank; - int world_size; - if (argc == 4) { - rank = atoi(argv[2]); - world_size = atoi(argv[3]); - } else { - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - } -#else - if (argc != 4) { - print_usage(argv[0]); - return -1; - } - const char* ip_port = argv[1]; - int rank = atoi(argv[2]); - int world_size = atoi(argv[3]); -#endif - int localRank = rankToLocalRank(rank); - int thisNode = rankToNode(rank); - int cudaNum = localRank; - int ibNum = cudaNumToIbNum(cudaNum); - - CUDACHECK(cudaSetDevice(cudaNum)); - std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); - - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); - - int* data_d; - uint64_t* flag_d; - size_t data_size = 1536 * 1024 * 1024; - size_t nelemsPerGPU = data_size / sizeof(int) / world_size; - CUDACHECK(cudaMalloc(&data_d, data_size)); - CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); - CUDACHECK(cudaMemset(data_d, 0, data_size)); - CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t))); - - int* data_h = new int[nelemsPerGPU * world_size]; - for (int i = 0; i < nelemsPerGPU * world_size; i++) { - size_t val = i + 1; - if (i / nelemsPerGPU == rank) { - data_h[i] = val; - } else { - data_h[i] = 0; - } - } - CUDACHECK(cudaMemcpy(data_d, data_h, data_size, cudaMemcpyHostToDevice)); - - mscclppDevConn_t devConns[16]; - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = NULL; - transportType = mscclppTransportP2P; - // Connect with all other ranks - MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, 0, data_d, data_size, flag_d, transportType, ibDev)); - } - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = ibDevStr.c_str(); - transportType = mscclppTransportIB; - // Connect with all other ranks - MSCCLPPCHECK( - mscclppConnect(comm, &devConns[r + world_size], r, 0, data_d, data_size, flag_d, transportType, ibDev)); - } - - MSCCLPPCHECK(mscclppConnectionSetup(comm)); - - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * 2 * world_size)); - - cudaStream_t stream; - CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - - CUDACHECK(cudaDeviceSynchronize()); - kernel<<<1, 32 * 2 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); - CUDACHECK(cudaDeviceSynchronize()); - CUDACHECK(cudaMemcpy(data_h, data_d, data_size, cudaMemcpyDeviceToHost)); - CUDACHECK(cudaDeviceSynchronize()); - - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { - int val = i + 1; - if (data_h[i] != val) { - printf("oh uh things went wrong! data_h[%d] (%d) != val (%d)\n", i, data_h[i], val); - break; - } - } - int tmp[16]; - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // // Perf test - // cudaEvent_t ev_start; - // cudaEvent_t ev_end; - // CUDACHECK(cudaEventCreate(&ev_start)); - // CUDACHECK(cudaEventCreate(&ev_end)); - - // warm up - // int warmupiter = 1000; - // for (int i = 0; i < warmupiter; ++i) { - // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); - // } - // CUDACHECK(cudaDeviceSynchronize()); - // MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // cudaGraph Capture - cudaGraph_t graph; - cudaGraphExec_t instance; - cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - int cudagraphiter = 10; - for (int i = 0; i < cudagraphiter; ++i) { - kernel<<<1, 32 * 2 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); - } - cudaStreamEndCapture(stream, &graph); - cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - - int cudagraphwarmup = 10; - for (int i = 0; i < cudagraphwarmup; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - // measure runtime - // CUDACHECK(cudaEventRecord(ev_start, stream)); - double t0 = getTime(); - int cudagraphlaunch = 10; - for (int i = 0; i < cudagraphlaunch; ++i) { - // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); - cudaGraphLaunch(instance, stream); - } - // CUDACHECK(cudaEventRecord(ev_end, stream)); - CUDACHECK(cudaStreamSynchronize(stream)); - - double t1 = getTime(); - float ms = (t1 - t0) * 1000.0; - // CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end)); - double time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; - printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, - (double)(data_size) / 1024. / 1024. / 1024. / (time_in_us / 1e6)); - - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - MSCCLPPCHECK(mscclppProxyStop(comm)); - - MSCCLPPCHECK(mscclppCommDestroy(comm)); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc == 2) { - MPI_Finalize(); - } -#endif - printf("Succeeded! %d\n", rank); - return 0; -} diff --git a/tests/allgather_test3.cu b/tests/allgather_test3.cu deleted file mode 100644 index 1f16fba9..00000000 --- a/tests/allgather_test3.cu +++ /dev/null @@ -1,225 +0,0 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#include "comm.h" -#include "common.h" - -#include -#include - -#define ALIGN 4 -__constant__ mscclppDevConn_t constDevConns[16]; - -__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) -{ - // this allgather is really simple and implemented as an alltoall - - // this thread's role is a sender role - // put your data asynchronously - if (threadIdx.x % 32 == 0) - devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); - // make sure everyone is put their data before some thread randomly blocks everyone else in signal - __syncthreads(); - // push with flag and sync to make sure the data is received - if (threadIdx.x % 32 == 0) - devConn.flush(); - - // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if (threadIdx.x % 32 == 0) - devConn.wait(); -} - -__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - uint64_t offset, uint64_t size) -{ - // this allgather algorithm works as follows: - // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode - // and waits for data from GPU rank (i-1) % nranksPerNode - // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode - // ... - // This order is much better for DMA engine for NVLinks - for (int i = 1; i < nranksPerNode; i++) { - if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { - // put your data to GPU (rank+i) % nranksPerNode and signal in one call - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(offset, size); - } - // wait for the data from GPU (rank-i) % nranksPerNode to arrive - if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); - } -} - -__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) -{ - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), - nelemsPerGPU * sizeof(int)); -} - -__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, - size_t nelemsPerGPU) -{ - // this allgather is a pipelined and hierarchical one and only works for two nodes - // it is implemented as follows: - // Step 1: each node does a local allgather and concurrently, - // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with - // its cross-node neighbor (local GPU i on the other node) via IB - // Step 2: each node does a local allgather again with the data just received from its - // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with - // its cross-node neighbor - // Step 3: each node does a local allgather for the last time with the rest of the data - - int pipelineSize = 3; - - // Step 1 - // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), - nelemsPerGPU * sizeof(int)); - } - // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { - // opposite side - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), - (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - - __syncthreads(); - - // Step 2 - // local allgather - int otherNghr = (rank + nranksPerNode) % world_size; - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), - (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - } - - // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { - // opposite side - if ((threadIdx.x % 32) == 0) - devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * - sizeof(int), - nelemsPerGPU / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) - devConn.wait(); - } - - __syncthreads(); - - // Step 3 - // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, - (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), - nelemsPerGPU / pipelineSize * sizeof(int)); - } -} - -__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) -{ - // find the mapping between remoteRank and devConns - int warpId = threadIdx.x / 32; - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - // Each warp is responsible for one of the remote ranks - mscclppDevConn_t devConn = constDevConns[warpId]; - - if (kernel == 0) - allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU); - else if (kernel == 1) - allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); - else if (kernel == 2) - allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); -} - -void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, - size_t* recvInplaceOffset, size_t count, int nranks) -{ - size_t base = (count / (ALIGN * nranks)) * ALIGN; - *sendcount = base; - *recvcount = base * nranks; - *sendInplaceOffset = base; - *recvInplaceOffset = 0; - *paramcount = base; -} - -testResult_t AllGatherInitData(struct threadArgs* args, int in_place) -{ - size_t sendcount = args->sendBytes / sizeof(int); - size_t recvcount = args->expectedBytes / sizeof(int); - // int nranks = args->totalProcs; - - CUDACHECK(cudaSetDevice(args->gpus[0])); - int rank = args->proc; - CUDACHECK(cudaMemset(args->recvbuffs[0], 0, args->expectedBytes)); - // void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0]; - - int* dataHost = new int[recvcount]; - for (size_t i = 0; i < recvcount; i++) { - int val = i + 1; - if (i / sendcount == (size_t)rank) { - dataHost[i] = val; - } else { - dataHost[i] = 0; - } - } - CUDACHECK(cudaMemcpy(args->recvbuffs[0], dataHost, recvcount, cudaMemcpyHostToDevice)); - for (int i = 0; i < static_cast(recvcount); i++) { - dataHost[i] = i + 1; - } - CUDACHECK(cudaMemcpy(args->expected[0], dataHost, recvcount, cudaMemcpyHostToDevice)); - delete dataHost; - CUDACHECK(cudaDeviceSynchronize()); - return testSuccess; -} - -void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) -{ - double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec; - - *algBw = baseBw; - double factor = ((double)(nranks - 1)) / ((double)nranks); - *busBw = baseBw * factor; -} - -testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, - cudaStream_t stream) -{ - int worldSize = comm->nRanks; - kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), 1); - return testSuccess; -} - -struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, AllGatherInitData, AllGatherGetBw, - AllGatherRunColl}; - -void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) -{ - size_t paramcount, sendInplaceOffset, recvInplaceOffset; - AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); -} - -testResult_t AllGatherRunTest(struct threadArgs* args) -{ - args->collTest = &allGatherTest; - mscclppDevConn_t* devConns; - int nCons; - MSCCLPPCHECK(mscclppGetAllDeviceConnections(args->comm, &devConns, &nCons)); - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); - TESTCHECK(TimeTest(args)); - return testSuccess; -} - -struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest}; - -#pragma weak mscclppTestEngine = allGatherEngine diff --git a/tests/allgather_test_standalone.cu b/tests/allgather_test_standalone.cu new file mode 100644 index 00000000..05aa3304 --- /dev/null +++ b/tests/allgather_test_standalone.cu @@ -0,0 +1,501 @@ +#include "mscclpp.h" + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS +#include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include +#include +#include +#include +#include + +static int nranksPerNode = 8; + +// Propagate errors up + +#define MSCCLPPCHECK(call) \ + do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ + return res; \ + } \ + } while (0) + +// Check CUDA RT calls +#define CUDACHECK(cmd) \ + do { \ + cudaError_t err = cmd; \ + if (err != cudaSuccess) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ + } while (false) + +// Measure current time in second. +static double getTime(void) +{ + struct timespec tspec; + if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { + printf("clock_gettime failed\n"); + exit(EXIT_FAILURE); + } + return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; +} + +__constant__ mscclppDevConn_t constDevConns[16]; + +__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) +{ + // this allgather is really simple and implemented as an alltoall + + // this thread's role is a sender role + // put your data asynchronously + if ((threadIdx.x % 32) == 0) + devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + // make sure everyone is put their data before some thread randomly blocks everyone else in signal + __syncthreads(); + // push with flag and sync to make sure the data is received + if ((threadIdx.x % 32) == 0) + devConn.flush(); + + // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready + if ((threadIdx.x % 32) == 0) + devConn.wait(); +} + +__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, + uint64_t offset, uint64_t size) +{ + // this allgather algorithm works as follows: + // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode + // and waits for data from GPU rank (i-1) % nranksPerNode + // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode + // ... + // This order is much better for DMA engine for NVLinks + for (int i = 1; i < nranksPerNode; i++) { + if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { + // put your data to GPU (rank+i) % nranksPerNode and signal in one call + if ((threadIdx.x % 32) == 0) + devConn.putWithSignalAndFlush(offset, size); + } + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { + if ((threadIdx.x % 32) == 0) + devConn.wait(); + } + asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); + } +} + +__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, + size_t nelemsPerGPU) +{ + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + nelemsPerGPU * sizeof(int)); +} + +__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, + size_t nelemsPerGPU) +{ + // this allgather is a pipelined and hierarchical one and only works for two nodes + // it is implemented as follows: + // Step 1: each node does a local allgather and concurrently, + // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with + // its cross-node neighbor (local GPU i on the other node) via IB + // Step 2: each node does a local allgather again with the data just received from its + // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with + // its cross-node neighbor + // Step 3: each node does a local allgather for the last time with the rest of the data + + int pipelineSize = 3; + + // Step 1 + // local allgather + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + nelemsPerGPU * sizeof(int)); + } + // cross-node exchange + if (remoteRank % nranksPerNode == rank % nranksPerNode) { + // opposite side + if ((threadIdx.x % 32) == 0) + devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + if ((threadIdx.x % 32) == 0) + devConn.wait(); + } + + __syncthreads(); + + // Step 2 + // local allgather + int otherNghr = (rank + nranksPerNode) % world_size; + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + } + + // cross-node exchange + if (remoteRank % nranksPerNode == rank % nranksPerNode) { + // opposite side + if ((threadIdx.x % 32) == 0) + devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * + sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + if ((threadIdx.x % 32) == 0) + devConn.wait(); + } + + __syncthreads(); + + // Step 3 + // local allgather + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, + (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + } +} + +__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) +{ + // find the mapping between remoteRank and devConns + int warpId = threadIdx.x / 32; + int remoteRank = (warpId < rank) ? warpId : warpId + 1; + // Each warp is responsible for one of the remote ranks + mscclppDevConn_t devConn = constDevConns[warpId]; + + if (kernel == 0) + allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU); + else if (kernel == 1) + allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); + else if (kernel == 2) + allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); +} + +int rankToLocalRank(int rank) +{ + return rank % nranksPerNode; +} + +int rankToNode(int rank) +{ + return rank / nranksPerNode; +} + +void print_usage(const char* prog) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT [rank nranks]\n", prog); +#else + printf("usage: %s IP:PORT rank nranks\n", prog); +#endif +} + +void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, + int** data_d) +{ + CUDACHECK(cudaMalloc(data_d, dataSize)); + CUDACHECK(cudaMemset(*data_d, 0, dataSize)); + + *data_h = new int[nelemsPerGPU * world_size]; + for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int val = i + 1; + if (i / nelemsPerGPU == (size_t)rank) { + (*data_h)[i] = val; + } else { + (*data_h)[i] = 0; + } + } + CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); +} + +mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize) +{ + int thisNode = rankToNode(rank); + int cudaNum = rankToLocalRank(rank); + std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); + + for (int r = 0; r < world_size; ++r) { + if (r == rank) + continue; + mscclppTransport_t transportType; + const char* ibDev = ibDevStr.c_str(); + if (rankToNode(r) == thisNode) { + ibDev = NULL; + transportType = mscclppTransportP2P; + } else { + transportType = mscclppTransportIB; + } + // Connect with all other ranks + MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev)); + } + + MSCCLPPCHECK(mscclppConnectionSetup(comm)); + + mscclppDevConn_t* devConns; + int nCons; + MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons)); + + CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); + + return mscclppSuccess; +} + +void printUsage(const char* prog, bool isMpi) +{ + if (isMpi) { + std::string st = "you are using MPI for this test\n"; + st += "two possilbe usages are:\n"; + st += "> " + std::string(prog) + "\n"; + st += "or\n"; + st += "> " + std::string(prog) + " -ip_port [ip:port]\n"; + printf("%s", st.c_str()); + } else { + std::string st = "you are NOT using MPI for this test\n"; + st += "the only possible usage:\n"; + st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n"; + printf("%s", st.c_str()); + } +} + +std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) +{ + std::unordered_map options; + + for (int i = 1; i < argc; i++) { + std::string arg = argv[i]; + if (arg == "-rankspernode") { + if (isMpi) { + fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n"); + exit(-1); + } + if (i + 1 < argc) { + options["rankspernode"] = argv[++i]; + } else { + fprintf(stderr, "Error: -rankspernode option requires an argument.\n"); + ; + exit(-1); + } + } else if (arg == "-kernel") { + if (i + 1 < argc) { + options["kernel"] = argv[++i]; + } else { + fprintf(stderr, "Error: -kernel option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-ip_port") { + if (i + 1 < argc) { + options["ip_port"] = argv[++i]; + } else { + fprintf(stderr, "Error: -ip_port option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-rank") { + if (isMpi) { + fprintf(stderr, "Error: -rank should not be specified with MPI.\n"); + exit(-1); + } + if (i + 1 < argc) { + options["rank"] = argv[++i]; + } else { + fprintf(stderr, "Error: -ip_port option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-nranks") { + if (isMpi) { + fprintf(stderr, "Error: -nranks should not be specified with MPI.\n"); + exit(-1); + } + if (i + 1 < argc) { + options["nranks"] = argv[++i]; + } else { + fprintf(stderr, "Error: -ip_port option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-datasize") { + if (i + 1 < argc) { + options["datasize"] = argv[++i]; + } else { + fprintf(stderr, "Error: -datasize option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-help" || arg == "-h") { + printUsage(argv[0], isMpi); + exit(0); + } else { + fprintf(stderr, "Error: Unknown option %s\n", argv[i]); + exit(-1); + } + } + return options; +} + +int main(int argc, const char* argv[]) +{ + bool isMpi = false; +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + isMpi = true; +#endif + + auto parsedArgs = parseArgs(argc, argv, isMpi); + + int rank; + int world_size; +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + // get the local number of nodes with MPI + MPI_Comm shmcomm; + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + int shmrank; + MPI_Comm_size(shmcomm, &shmrank); + nranksPerNode = shmrank; + MPI_Comm_free(&shmcomm); +#else + if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) { + printUsage(argv[0], isMpi); + exit(-1); + } + rank = std::stoi(parsedArgs["rank"]); + world_size = std::stoi(parsedArgs["nranks"]); + if (parsedArgs.find("rankspernode") == parsedArgs.end()) { + printUsage(argv[0], isMpi); + exit(-1); + } + nranksPerNode = std::stoi(parsedArgs["rankspernode"]); +#endif + int kernelNum = 0; + if (parsedArgs.find("kernel") != parsedArgs.end()) { + kernelNum = std::stoi(parsedArgs["kernel"]); + } + char* ip_port = NULL; + if (parsedArgs.find("ip_port") == parsedArgs.end()) { + printUsage(argv[0], isMpi); + exit(-1); + } + ip_port = (char*)parsedArgs["ip_port"].c_str(); + + int thisNode = rankToNode(rank); + int cudaNum = rankToLocalRank(rank); + CUDACHECK(cudaSetDevice(cudaNum)); + + if (rank == 0) + printf("Initializing MSCCL++\n"); + mscclppComm_t comm; + MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank)); + + int* data_d; + int* data_h; + size_t dataSize = 1024 * 1024 * 1024; + if (parsedArgs.find("datasize") != parsedArgs.end()) { + dataSize = std::stoul(parsedArgs["datasize"]); + } + size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; + + if (rank == 0) + printf("Initializing data for allgather test\n"); + initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); + + if (rank == 0) + printf("Setting up the connection in MSCCL++\n"); + MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize)); + + if (rank == 0) + printf("Launching MSCCL++ proxy threads\n"); + MSCCLPPCHECK(mscclppProxyLaunch(comm)); + + if (rank == 0) + printf("Testing the correctness of AllGather implementation\n"); + cudaStream_t stream; + CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); + CUDACHECK(cudaDeviceSynchronize()); + kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); + CUDACHECK(cudaDeviceSynchronize()); + CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost)); + + for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int val = i + 1; + if (data_h[i] != val) { + printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val); + break; + } + } + int tmp[16]; + // A simple barrier + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + if (rank == 0) + printf("Successfully checked the correctness\n"); + + // Perf test + int iterwithoutcudagraph = 10; + if (rank == 0) + printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); + CUDACHECK(cudaStreamSynchronize(stream)); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + for (int i = 0; i < iterwithoutcudagraph; ++i) { + kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); + } + CUDACHECK(cudaStreamSynchronize(stream)); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + + // cudaGraph Capture + int cudagraphiter = 10; + if (rank == 0) + printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); + cudaGraph_t graph; + cudaGraphExec_t instance; + cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); + for (int i = 0; i < cudagraphiter; ++i) { + kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); + } + cudaStreamEndCapture(stream, &graph); + cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); + + int cudagraphwarmup = 10; + if (rank == 0) + printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, + cudagraphiter); + for (int i = 0; i < cudagraphwarmup; ++i) { + cudaGraphLaunch(instance, stream); + } + CUDACHECK(cudaStreamSynchronize(stream)); + + // measure runtime + int cudagraphlaunch = 10; + if (rank == 0) + printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, + cudagraphiter); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + double t0, t1, ms, time_in_us; + t0 = getTime(); + for (int i = 0; i < cudagraphlaunch; ++i) { + cudaGraphLaunch(instance, stream); + } + CUDACHECK(cudaStreamSynchronize(stream)); + + t1 = getTime(); + ms = (t1 - t0) * 1000.0; + time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; + printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, + (double)(dataSize) / 1e9 / (time_in_us / 1e6)); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + + if (rank == 0) + printf("Stopping MSCCL++ proxy threads\n"); + MSCCLPPCHECK(mscclppProxyStop(comm)); + + if (rank == 0) + printf("Destroying MSCCL++ communicator\n"); + MSCCLPPCHECK(mscclppCommDestroy(comm)); + printf("Rank %d succeeded!\n", rank); + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + MPI_Finalize(); +#endif + return 0; +}