diff --git a/Makefile b/Makefile index 806d1c64..972582eb 100644 --- a/Makefile +++ b/Makefile @@ -147,18 +147,22 @@ UTOBJTARGETS := $(UTOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) UTBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(UTOBJS)) TESTSDIR := tests -TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test.cu) +TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test_standalone.cu) TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS)) TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) +MSCLLPPTESTSOBJSDIR:= $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR) +MSCLLPPTESTBINFILESLIST := allgather_test +MSCLLPPTESTBINS := $(MSCLLPPTESTBINFILESLIST:%=$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf) + INCLUDE := -Isrc -Isrc/include -.PHONY: all build lib tests clean +.PHONY: all build lib tests mscclpp_test clean all: build -build: lib tests +build: lib tests mscclpp-test lib: $(LIBOBJTARGETS) $(INCTARGETS) $(LIBTARGET) @@ -166,6 +170,8 @@ unittests: $(UTBINS) tests: unittests $(TESTSBINS) +mscclpp-test: $(LIBTARGET) $(MSCLLPPTESTBINS) + cpplint: clang-format-12 -style=file --verbose --Werror --dry-run $(CPPSOURCES) clang-format-12 --dry-run $(CPPSOURCES) @@ -211,12 +217,17 @@ $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cc $(INCTARGETS) # Compile .cu tests $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cu $(INCTARGETS) @mkdir -p $(@D) - $(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) -c $< $(MPI_MACRO) + $(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) $(INCLUDE) -c $< $(MPI_MACRO) # Test bins $(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%: $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o $(LIBTARGET) @mkdir -p $(@D) $(NVCC) -o $@ $< $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp +# Compile mscclpp_test +$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf: $(MSCLLPPTESTSOBJSDIR)/%.o $(MSCLLPPTESTSOBJSDIR)/common.o + @mkdir -p $(@D) + $(NVCC) -o $@ $^ $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp + clean: rm -rf $(BUILDDIR) diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index 487f38cb..b3998d91 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -452,9 +452,12 @@ mscclppResult_t mscclppSocketGetAddr(struct mscclppSocket* sock, union mscclppSo static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) { - static time_t initTime = -1; - if (initTime == -1) - initTime = clockNano() / 1e9; + static bool timeInitialized = false; + static mscclppTime_t initTime; + if (!timeInitialized) { + timeInitialized = true; + initTime = getClock(); + } mscclppConfig* config = mscclppConfig::getInstance(); time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig(); @@ -462,14 +465,14 @@ static mscclppResult_t socketTryAccept(struct mscclppSocket* sock) sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen); if (sock->fd != -1) { sock->state = mscclppSocketStateAccepted; - initTime = -1; + timeInitialized = false; } else if (errno != EAGAIN && errno != EWOULDBLOCK) { WARN("socketTryAccept: get errno %d that is not EAGAIN or EWOULDBLOCK", errno); - initTime = -1; + timeInitialized = false; return mscclppSystemError; - } else if ((clockNano() / 1e9) - initTime > acceptTimeout) { + } else if (elapsedClock(getClock(), initTime) > acceptTimeout) { WARN("socketTryAccept: exceeded timeout (%ld) sec", acceptTimeout); - initTime = -1; + timeInitialized = false; return mscclppRemoteError; } else { usleep(SLEEP_INT); @@ -513,10 +516,13 @@ static mscclppResult_t socketFinalizeAccept(struct mscclppSocket* sock) static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) { - static time_t initTime = -1; - if (initTime == -1) { - initTime = clockNano() / 1e9; + static bool timeInitialized = false; + static mscclppTime_t initTime; + if (!timeInitialized) { + timeInitialized = true; + initTime = getClock(); } + mscclppConfig* config = mscclppConfig::getInstance(); time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig(); @@ -524,16 +530,16 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) int ret = connect(sock->fd, &sock->addr.sa, sock->salen); if (ret == 0) { sock->state = mscclppSocketStateConnected; - initTime = -1; + timeInitialized = false; return mscclppSuccess; } else if (errno == EINPROGRESS) { sock->state = mscclppSocketStateConnectPolling; return mscclppSuccess; } else if (errno == ECONNREFUSED || errno == ETIMEDOUT) { - if ((clockNano() / 1e9) - initTime > acceptTimeout) { + if (elapsedClock(getClock(), initTime) > acceptTimeout) { WARN("socketStartConnect: exceeded timeout (%ld) sec", acceptTimeout); sock->state = mscclppSocketStateError; - initTime = -1; + timeInitialized = false; return mscclppRemoteError; } usleep(SLEEP_INT); @@ -544,17 +550,20 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock) char line[SOCKET_NAME_MAXLEN + 1]; sock->state = mscclppSocketStateError; WARN("socketStartConnect: Connect to %s failed : %s", mscclppSocketToString(&sock->addr, line), strerror(errno)); - initTime = -1; + timeInitialized = false; return mscclppSystemError; } } static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) { - static time_t initTime = -1; - if (initTime == -1) { - initTime = clockNano() / 1e9; + static bool timeInitialized = false; + static mscclppTime_t initTime; + if (!timeInitialized) { + timeInitialized = true; + initTime = getClock(); } + mscclppConfig* config = mscclppConfig::getInstance(); time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig(); @@ -567,7 +576,7 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) pfd.events = POLLOUT; SYSCHECK(ret = poll(&pfd, 1, timeout), "poll"); if (ret == 0) { - initTime = -1; + timeInitialized = false; return mscclppSuccess; } @@ -576,10 +585,10 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock) SYSCHECK(getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, (void*)&ret, &rlen), "getsockopt"); if (ret == 0) { - initTime = -1; + timeInitialized = false; sock->state = mscclppSocketStateConnected; } else if (ret == ECONNREFUSED || ret == ETIMEDOUT) { - if ((clockNano() / 1e9) - initTime > acceptTimeout) { + if (elapsedClock(getClock(), initTime) > acceptTimeout) { WARN("socketPollConnect: exceeded timeout (%ld) sec", acceptTimeout); sock->state = mscclppSocketStateError; return mscclppRemoteError; diff --git a/src/include/alloc.h b/src/include/alloc.h index 65e953e1..496af197 100644 --- a/src/include/alloc.h +++ b/src/include/alloc.h @@ -16,8 +16,6 @@ #include #include -uint64_t clockNano(); // from utils.h with which we have a circular dependency - template mscclppResult_t mscclppCudaHostCallocDebug(T** ptr, size_t nelem, const char* filefunc, int line) { mscclppResult_t result = mscclppSuccess; diff --git a/src/include/utils.h b/src/include/utils.h index f3eef1a9..3eff9842 100644 --- a/src/include/utils.h +++ b/src/include/utils.h @@ -48,12 +48,9 @@ static long log2i(long n) return l; } -inline uint64_t clockNano() -{ - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return uint64_t(ts.tv_sec) * 1000 * 1000 * 1000 + ts.tv_nsec; -} +typedef std::chrono::steady_clock::time_point mscclppTime_t; +mscclppTime_t getClock(); +int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end); /* get any bytes of random data from /dev/urandom, return 0 if it succeeds; else * return -1 */ diff --git a/src/utils.cc b/src/utils.cc index 9bb01284..c0766765 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -270,3 +270,13 @@ mscclppResult_t setNumaState(mscclppNumaState state) numa_bind(state); return mscclppSuccess; } + +mscclppTime_t getClock() +{ + return std::chrono::steady_clock::now(); +} + +int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end) +{ + return std::chrono::duration_cast(end - start).count(); +} diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 05aa3304..647debef 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -1,50 +1,10 @@ -#include "mscclpp.h" +#include "comm.h" +#include "common.h" -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include -#include -#include +#include #include -#include -#include - -static int nranksPerNode = 8; - -// Propagate errors up - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ - return res; \ - } \ - } while (0) - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -// Measure current time in second. -static double getTime(void) -{ - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} +#define ALIGN 4 __constant__ mscclppDevConn_t constDevConns[16]; __device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) @@ -53,16 +13,16 @@ __device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, i // this thread's role is a sender role // put your data asynchronously - if ((threadIdx.x % 32) == 0) + if (threadIdx.x % 32 == 0) devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); // make sure everyone is put their data before some thread randomly blocks everyone else in signal __syncthreads(); // push with flag and sync to make sure the data is received - if ((threadIdx.x % 32) == 0) + if (threadIdx.x % 32 == 0) devConn.flush(); // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if ((threadIdx.x % 32) == 0) + if (threadIdx.x % 32 == 0) devConn.wait(); } @@ -176,326 +136,85 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); } -int rankToLocalRank(int rank) +void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, + size_t* recvInplaceOffset, size_t count, int nranks) { - return rank % nranksPerNode; + size_t base = (count / (ALIGN * nranks)) * ALIGN; + *sendcount = base; + *recvcount = base * nranks; + *sendInplaceOffset = base; + *recvInplaceOffset = 0; + *paramcount = base; } -int rankToNode(int rank) +testResult_t AllGatherInitData(struct testArgs* args, int in_place) { - return rank / nranksPerNode; -} + size_t sendcount = args->sendBytes / sizeof(int); + size_t recvcount = args->expectedBytes / sizeof(int); + // int nranks = args->totalProcs; -void print_usage(const char* prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - printf("usage: %s IP:PORT [rank nranks]\n", prog); -#else - printf("usage: %s IP:PORT rank nranks\n", prog); -#endif -} + CUDACHECK(cudaSetDevice(args->gpuNum)); + int rank = args->proc; + CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes)); + // void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0]; -void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, - int** data_d) -{ - CUDACHECK(cudaMalloc(data_d, dataSize)); - CUDACHECK(cudaMemset(*data_d, 0, dataSize)); - - *data_h = new int[nelemsPerGPU * world_size]; - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int* dataHost = new int[recvcount]; + for (size_t i = 0; i < recvcount; i++) { int val = i + 1; - if (i / nelemsPerGPU == (size_t)rank) { - (*data_h)[i] = val; + if (i / sendcount == (size_t)rank) { + dataHost[i] = val; } else { - (*data_h)[i] = 0; + dataHost[i] = 0; } } - CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); + CUDACHECK(cudaMemcpy(args->recvbuff, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); + for (int i = 0; i < static_cast(recvcount); i++) { + dataHost[i] = i + 1; + } + CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); + delete dataHost; + CUDACHECK(cudaDeviceSynchronize()); + MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm)); + return testSuccess; } -mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize) +void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) { - int thisNode = rankToNode(rank); - int cudaNum = rankToLocalRank(rank); - std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); + double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec; - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = ibDevStr.c_str(); - if (rankToNode(r) == thisNode) { - ibDev = NULL; - transportType = mscclppTransportP2P; - } else { - transportType = mscclppTransportIB; - } - // Connect with all other ranks - MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev)); - } + *algBw = baseBw; + double factor = ((double)(nranks - 1)) / ((double)nranks); + *busBw = baseBw * factor; +} - MSCCLPPCHECK(mscclppConnectionSetup(comm)); +testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, + cudaStream_t stream, int kernel_num) +{ + int worldSize = comm->nRanks; + kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), kernel_num); + return testSuccess; +} +struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, AllGatherInitData, AllGatherGetBw, + AllGatherRunColl}; + +void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) +{ + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllGatherRunTest(struct testArgs* args) +{ + args->collTest = &allGatherTest; mscclppDevConn_t* devConns; int nCons; - MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons)); - + MSCCLPPCHECK(mscclppGetAllDeviceConnections(args->comm, &devConns, &nCons)); CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); - - return mscclppSuccess; + TESTCHECK(TimeTest(args)); + return testSuccess; } -void printUsage(const char* prog, bool isMpi) -{ - if (isMpi) { - std::string st = "you are using MPI for this test\n"; - st += "two possilbe usages are:\n"; - st += "> " + std::string(prog) + "\n"; - st += "or\n"; - st += "> " + std::string(prog) + " -ip_port [ip:port]\n"; - printf("%s", st.c_str()); - } else { - std::string st = "you are NOT using MPI for this test\n"; - st += "the only possible usage:\n"; - st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n"; - printf("%s", st.c_str()); - } -} +struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest}; -std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) -{ - std::unordered_map options; - - for (int i = 1; i < argc; i++) { - std::string arg = argv[i]; - if (arg == "-rankspernode") { - if (isMpi) { - fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["rankspernode"] = argv[++i]; - } else { - fprintf(stderr, "Error: -rankspernode option requires an argument.\n"); - ; - exit(-1); - } - } else if (arg == "-kernel") { - if (i + 1 < argc) { - options["kernel"] = argv[++i]; - } else { - fprintf(stderr, "Error: -kernel option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-ip_port") { - if (i + 1 < argc) { - options["ip_port"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-rank") { - if (isMpi) { - fprintf(stderr, "Error: -rank should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["rank"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-nranks") { - if (isMpi) { - fprintf(stderr, "Error: -nranks should not be specified with MPI.\n"); - exit(-1); - } - if (i + 1 < argc) { - options["nranks"] = argv[++i]; - } else { - fprintf(stderr, "Error: -ip_port option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-datasize") { - if (i + 1 < argc) { - options["datasize"] = argv[++i]; - } else { - fprintf(stderr, "Error: -datasize option requires an argument.\n"); - exit(-1); - } - } else if (arg == "-help" || arg == "-h") { - printUsage(argv[0], isMpi); - exit(0); - } else { - fprintf(stderr, "Error: Unknown option %s\n", argv[i]); - exit(-1); - } - } - return options; -} - -int main(int argc, const char* argv[]) -{ - bool isMpi = false; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - isMpi = true; -#endif - - auto parsedArgs = parseArgs(argc, argv, isMpi); - - int rank; - int world_size; -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - // get the local number of nodes with MPI - MPI_Comm shmcomm; - MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - int shmrank; - MPI_Comm_size(shmcomm, &shmrank); - nranksPerNode = shmrank; - MPI_Comm_free(&shmcomm); -#else - if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - rank = std::stoi(parsedArgs["rank"]); - world_size = std::stoi(parsedArgs["nranks"]); - if (parsedArgs.find("rankspernode") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - nranksPerNode = std::stoi(parsedArgs["rankspernode"]); -#endif - int kernelNum = 0; - if (parsedArgs.find("kernel") != parsedArgs.end()) { - kernelNum = std::stoi(parsedArgs["kernel"]); - } - char* ip_port = NULL; - if (parsedArgs.find("ip_port") == parsedArgs.end()) { - printUsage(argv[0], isMpi); - exit(-1); - } - ip_port = (char*)parsedArgs["ip_port"].c_str(); - - int thisNode = rankToNode(rank); - int cudaNum = rankToLocalRank(rank); - CUDACHECK(cudaSetDevice(cudaNum)); - - if (rank == 0) - printf("Initializing MSCCL++\n"); - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank)); - - int* data_d; - int* data_h; - size_t dataSize = 1024 * 1024 * 1024; - if (parsedArgs.find("datasize") != parsedArgs.end()) { - dataSize = std::stoul(parsedArgs["datasize"]); - } - size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; - - if (rank == 0) - printf("Initializing data for allgather test\n"); - initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); - - if (rank == 0) - printf("Setting up the connection in MSCCL++\n"); - MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize)); - - if (rank == 0) - printf("Launching MSCCL++ proxy threads\n"); - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - - if (rank == 0) - printf("Testing the correctness of AllGather implementation\n"); - cudaStream_t stream; - CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - CUDACHECK(cudaDeviceSynchronize()); - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - CUDACHECK(cudaDeviceSynchronize()); - CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost)); - - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { - int val = i + 1; - if (data_h[i] != val) { - printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val); - break; - } - } - int tmp[16]; - // A simple barrier - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - if (rank == 0) - printf("Successfully checked the correctness\n"); - - // Perf test - int iterwithoutcudagraph = 10; - if (rank == 0) - printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); - CUDACHECK(cudaStreamSynchronize(stream)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - for (int i = 0; i < iterwithoutcudagraph; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - } - CUDACHECK(cudaStreamSynchronize(stream)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // cudaGraph Capture - int cudagraphiter = 10; - if (rank == 0) - printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); - cudaGraph_t graph; - cudaGraphExec_t instance; - cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - for (int i = 0; i < cudagraphiter; ++i) { - kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); - } - cudaStreamEndCapture(stream, &graph); - cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - - int cudagraphwarmup = 10; - if (rank == 0) - printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, - cudagraphiter); - for (int i = 0; i < cudagraphwarmup; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - // measure runtime - int cudagraphlaunch = 10; - if (rank == 0) - printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, - cudagraphiter); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - double t0, t1, ms, time_in_us; - t0 = getTime(); - for (int i = 0; i < cudagraphlaunch; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - t1 = getTime(); - ms = (t1 - t0) * 1000.0; - time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; - printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, - (double)(dataSize) / 1e9 / (time_in_us / 1e6)); - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - if (rank == 0) - printf("Stopping MSCCL++ proxy threads\n"); - MSCCLPPCHECK(mscclppProxyStop(comm)); - - if (rank == 0) - printf("Destroying MSCCL++ communicator\n"); - MSCCLPPCHECK(mscclppCommDestroy(comm)); - printf("Rank %d succeeded!\n", rank); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Finalize(); -#endif - return 0; -} +#pragma weak mscclppTestEngine = allGatherEngine diff --git a/tests/allgather_test2.cu b/tests/allgather_test2.cu deleted file mode 100644 index 047869fb..00000000 --- a/tests/allgather_test2.cu +++ /dev/null @@ -1,295 +0,0 @@ -#include "mscclpp.h" -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include -#include -#include -#include - -#define RANKS_PER_NODE 8 - -// Check CUDA RT calls -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -// Measure current time in second. -static double getTime(void) -{ - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} - -__constant__ mscclppDevConn_t constDevConns[16]; - -__global__ void kernel(int rank, int world_size, size_t nelemsPerGPU) -{ - if (threadIdx.x % 32 != 0) - return; - - int warpId = threadIdx.x / 32; - bool isIB = false; - if (warpId >= world_size - 1) - isIB = true; - if (isIB) - warpId = warpId - (world_size - 1); - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - mscclppDevConn_t devConn = constDevConns[remoteRank]; - if (isIB) - devConn = constDevConns[remoteRank + world_size]; - - // Each warp receives data from different ranks -#if 1 - - // Trigger sending data, flag and synchronize after - devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); - - devConn.wait(); - -#else - for (int i = 1; i < world_size; i++) { - __syncthreads(); - if (remoteRank != ((rank + i) % world_size)) - continue; - - // Trigger sending data, flag and synchronize after - size_t ibPortion = nelemsPerGPU / 12; // nelemsPerGPU/12; - if (isIB) - devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, - rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion) * sizeof(int), - rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion) * sizeof(int), - ibPortion * sizeof(int)); - else - devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), - rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU - ibPortion) * sizeof(int)); - // Wait on the request to make sure it is safe to reuse buffer and flag - auto req = devConn.fifo.putWithSignal(dataOffset, dataSize); - devConn.fifo.sync(req); - } - // Wait for receiving data from remote rank - while (*proxyFlag == baseFlag) - ; -#endif -} - -int rankToLocalRank(int rank) -{ - return rank % RANKS_PER_NODE; -} - -int rankToNode(int rank) -{ - return rank / RANKS_PER_NODE; -} - -int cudaNumToIbNum(int cudaNum) -{ - int ibNum; - if (cudaNum == 0) { - ibNum = 0; - } else if (cudaNum == 1) { - ibNum = 4; - } else if (cudaNum == 2) { - ibNum = 1; - } else if (cudaNum == 3) { - ibNum = 5; - } else if (cudaNum == 4) { - ibNum = 2; - } else if (cudaNum == 5) { - ibNum = 6; - } else if (cudaNum == 6) { - ibNum = 3; - } else if (cudaNum == 7) { - ibNum = 7; - } else { - printf("Invalid cudaNum: %d\n", cudaNum); - exit(EXIT_FAILURE); - } - return ibNum; -} - -void print_usage(const char* prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - printf("usage: %s IP:PORT [rank nranks]\n", prog); -#else - printf("usage: %s IP:PORT rank nranks\n", prog); -#endif -} - -int main(int argc, const char* argv[]) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc != 2 && argc != 4) { - print_usage(argv[0]); - return -1; - } - const char* ip_port = argv[1]; - int rank; - int world_size; - if (argc == 4) { - rank = atoi(argv[2]); - world_size = atoi(argv[3]); - } else { - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - } -#else - if (argc != 4) { - print_usage(argv[0]); - return -1; - } - const char* ip_port = argv[1]; - int rank = atoi(argv[2]); - int world_size = atoi(argv[3]); -#endif - int localRank = rankToLocalRank(rank); - int thisNode = rankToNode(rank); - int cudaNum = localRank; - int ibNum = cudaNumToIbNum(cudaNum); - - CUDACHECK(cudaSetDevice(cudaNum)); - std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); - - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); - - int* data_d; - uint64_t* flag_d; - size_t data_size = 1536 * 1024 * 1024; - size_t nelemsPerGPU = data_size / sizeof(int) / world_size; - CUDACHECK(cudaMalloc(&data_d, data_size)); - CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); - CUDACHECK(cudaMemset(data_d, 0, data_size)); - CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t))); - - int* data_h = new int[nelemsPerGPU * world_size]; - for (int i = 0; i < nelemsPerGPU * world_size; i++) { - size_t val = i + 1; - if (i / nelemsPerGPU == rank) { - data_h[i] = val; - } else { - data_h[i] = 0; - } - } - CUDACHECK(cudaMemcpy(data_d, data_h, data_size, cudaMemcpyHostToDevice)); - - mscclppDevConn_t devConns[16]; - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = NULL; - transportType = mscclppTransportP2P; - // Connect with all other ranks - MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, 0, data_d, data_size, flag_d, transportType, ibDev)); - } - for (int r = 0; r < world_size; ++r) { - if (r == rank) - continue; - mscclppTransport_t transportType; - const char* ibDev = ibDevStr.c_str(); - transportType = mscclppTransportIB; - // Connect with all other ranks - MSCCLPPCHECK( - mscclppConnect(comm, &devConns[r + world_size], r, 0, data_d, data_size, flag_d, transportType, ibDev)); - } - - MSCCLPPCHECK(mscclppConnectionSetup(comm)); - - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - - CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * 2 * world_size)); - - cudaStream_t stream; - CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - - CUDACHECK(cudaDeviceSynchronize()); - kernel<<<1, 32 * 2 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); - CUDACHECK(cudaDeviceSynchronize()); - CUDACHECK(cudaMemcpy(data_h, data_d, data_size, cudaMemcpyDeviceToHost)); - CUDACHECK(cudaDeviceSynchronize()); - - for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { - int val = i + 1; - if (data_h[i] != val) { - printf("oh uh things went wrong! data_h[%d] (%d) != val (%d)\n", i, data_h[i], val); - break; - } - } - int tmp[16]; - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // // Perf test - // cudaEvent_t ev_start; - // cudaEvent_t ev_end; - // CUDACHECK(cudaEventCreate(&ev_start)); - // CUDACHECK(cudaEventCreate(&ev_end)); - - // warm up - // int warmupiter = 1000; - // for (int i = 0; i < warmupiter; ++i) { - // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); - // } - // CUDACHECK(cudaDeviceSynchronize()); - // MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - - // cudaGraph Capture - cudaGraph_t graph; - cudaGraphExec_t instance; - cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - int cudagraphiter = 10; - for (int i = 0; i < cudagraphiter; ++i) { - kernel<<<1, 32 * 2 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); - } - cudaStreamEndCapture(stream, &graph); - cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - - int cudagraphwarmup = 10; - for (int i = 0; i < cudagraphwarmup; ++i) { - cudaGraphLaunch(instance, stream); - } - CUDACHECK(cudaStreamSynchronize(stream)); - - // measure runtime - // CUDACHECK(cudaEventRecord(ev_start, stream)); - double t0 = getTime(); - int cudagraphlaunch = 10; - for (int i = 0; i < cudagraphlaunch; ++i) { - // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); - cudaGraphLaunch(instance, stream); - } - // CUDACHECK(cudaEventRecord(ev_end, stream)); - CUDACHECK(cudaStreamSynchronize(stream)); - - double t1 = getTime(); - float ms = (t1 - t0) * 1000.0; - // CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end)); - double time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; - printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, - (double)(data_size) / 1024. / 1024. / 1024. / (time_in_us / 1e6)); - - MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); - MSCCLPPCHECK(mscclppProxyStop(comm)); - - MSCCLPPCHECK(mscclppCommDestroy(comm)); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc == 2) { - MPI_Finalize(); - } -#endif - printf("Succeeded! %d\n", rank); - return 0; -} diff --git a/tests/allgather_test_standalone.cu b/tests/allgather_test_standalone.cu new file mode 100644 index 00000000..05aa3304 --- /dev/null +++ b/tests/allgather_test_standalone.cu @@ -0,0 +1,501 @@ +#include "mscclpp.h" + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS +#include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include +#include +#include +#include +#include + +static int nranksPerNode = 8; + +// Propagate errors up + +#define MSCCLPPCHECK(call) \ + do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \ + return res; \ + } \ + } while (0) + +// Check CUDA RT calls +#define CUDACHECK(cmd) \ + do { \ + cudaError_t err = cmd; \ + if (err != cudaSuccess) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ + } while (false) + +// Measure current time in second. +static double getTime(void) +{ + struct timespec tspec; + if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { + printf("clock_gettime failed\n"); + exit(EXIT_FAILURE); + } + return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; +} + +__constant__ mscclppDevConn_t constDevConns[16]; + +__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU) +{ + // this allgather is really simple and implemented as an alltoall + + // this thread's role is a sender role + // put your data asynchronously + if ((threadIdx.x % 32) == 0) + devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + // make sure everyone is put their data before some thread randomly blocks everyone else in signal + __syncthreads(); + // push with flag and sync to make sure the data is received + if ((threadIdx.x % 32) == 0) + devConn.flush(); + + // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready + if ((threadIdx.x % 32) == 0) + devConn.wait(); +} + +__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, + uint64_t offset, uint64_t size) +{ + // this allgather algorithm works as follows: + // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode + // and waits for data from GPU rank (i-1) % nranksPerNode + // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode + // ... + // This order is much better for DMA engine for NVLinks + for (int i = 1; i < nranksPerNode; i++) { + if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { + // put your data to GPU (rank+i) % nranksPerNode and signal in one call + if ((threadIdx.x % 32) == 0) + devConn.putWithSignalAndFlush(offset, size); + } + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { + if ((threadIdx.x % 32) == 0) + devConn.wait(); + } + asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); + } +} + +__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, + size_t nelemsPerGPU) +{ + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + nelemsPerGPU * sizeof(int)); +} + +__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank, + size_t nelemsPerGPU) +{ + // this allgather is a pipelined and hierarchical one and only works for two nodes + // it is implemented as follows: + // Step 1: each node does a local allgather and concurrently, + // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with + // its cross-node neighbor (local GPU i on the other node) via IB + // Step 2: each node does a local allgather again with the data just received from its + // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with + // its cross-node neighbor + // Step 3: each node does a local allgather for the last time with the rest of the data + + int pipelineSize = 3; + + // Step 1 + // local allgather + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + nelemsPerGPU * sizeof(int)); + } + // cross-node exchange + if (remoteRank % nranksPerNode == rank % nranksPerNode) { + // opposite side + if ((threadIdx.x % 32) == 0) + devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + if ((threadIdx.x % 32) == 0) + devConn.wait(); + } + + __syncthreads(); + + // Step 2 + // local allgather + int otherNghr = (rank + nranksPerNode) % world_size; + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + } + + // cross-node exchange + if (remoteRank % nranksPerNode == rank % nranksPerNode) { + // opposite side + if ((threadIdx.x % 32) == 0) + devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * + sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + if ((threadIdx.x % 32) == 0) + devConn.wait(); + } + + __syncthreads(); + + // Step 3 + // local allgather + if (remoteRank / nranksPerNode == rank / nranksPerNode) { + localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, + (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + } +} + +__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel) +{ + // find the mapping between remoteRank and devConns + int warpId = threadIdx.x / 32; + int remoteRank = (warpId < rank) ? warpId : warpId + 1; + // Each warp is responsible for one of the remote ranks + mscclppDevConn_t devConn = constDevConns[warpId]; + + if (kernel == 0) + allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU); + else if (kernel == 1) + allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); + else if (kernel == 2) + allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU); +} + +int rankToLocalRank(int rank) +{ + return rank % nranksPerNode; +} + +int rankToNode(int rank) +{ + return rank / nranksPerNode; +} + +void print_usage(const char* prog) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT [rank nranks]\n", prog); +#else + printf("usage: %s IP:PORT rank nranks\n", prog); +#endif +} + +void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h, + int** data_d) +{ + CUDACHECK(cudaMalloc(data_d, dataSize)); + CUDACHECK(cudaMemset(*data_d, 0, dataSize)); + + *data_h = new int[nelemsPerGPU * world_size]; + for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int val = i + 1; + if (i / nelemsPerGPU == (size_t)rank) { + (*data_h)[i] = val; + } else { + (*data_h)[i] = 0; + } + } + CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice)); +} + +mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize) +{ + int thisNode = rankToNode(rank); + int cudaNum = rankToLocalRank(rank); + std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum); + + for (int r = 0; r < world_size; ++r) { + if (r == rank) + continue; + mscclppTransport_t transportType; + const char* ibDev = ibDevStr.c_str(); + if (rankToNode(r) == thisNode) { + ibDev = NULL; + transportType = mscclppTransportP2P; + } else { + transportType = mscclppTransportIB; + } + // Connect with all other ranks + MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev)); + } + + MSCCLPPCHECK(mscclppConnectionSetup(comm)); + + mscclppDevConn_t* devConns; + int nCons; + MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons)); + + CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons)); + + return mscclppSuccess; +} + +void printUsage(const char* prog, bool isMpi) +{ + if (isMpi) { + std::string st = "you are using MPI for this test\n"; + st += "two possilbe usages are:\n"; + st += "> " + std::string(prog) + "\n"; + st += "or\n"; + st += "> " + std::string(prog) + " -ip_port [ip:port]\n"; + printf("%s", st.c_str()); + } else { + std::string st = "you are NOT using MPI for this test\n"; + st += "the only possible usage:\n"; + st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n"; + printf("%s", st.c_str()); + } +} + +std::unordered_map parseArgs(int argc, const char* argv[], bool isMpi) +{ + std::unordered_map options; + + for (int i = 1; i < argc; i++) { + std::string arg = argv[i]; + if (arg == "-rankspernode") { + if (isMpi) { + fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n"); + exit(-1); + } + if (i + 1 < argc) { + options["rankspernode"] = argv[++i]; + } else { + fprintf(stderr, "Error: -rankspernode option requires an argument.\n"); + ; + exit(-1); + } + } else if (arg == "-kernel") { + if (i + 1 < argc) { + options["kernel"] = argv[++i]; + } else { + fprintf(stderr, "Error: -kernel option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-ip_port") { + if (i + 1 < argc) { + options["ip_port"] = argv[++i]; + } else { + fprintf(stderr, "Error: -ip_port option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-rank") { + if (isMpi) { + fprintf(stderr, "Error: -rank should not be specified with MPI.\n"); + exit(-1); + } + if (i + 1 < argc) { + options["rank"] = argv[++i]; + } else { + fprintf(stderr, "Error: -ip_port option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-nranks") { + if (isMpi) { + fprintf(stderr, "Error: -nranks should not be specified with MPI.\n"); + exit(-1); + } + if (i + 1 < argc) { + options["nranks"] = argv[++i]; + } else { + fprintf(stderr, "Error: -ip_port option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-datasize") { + if (i + 1 < argc) { + options["datasize"] = argv[++i]; + } else { + fprintf(stderr, "Error: -datasize option requires an argument.\n"); + exit(-1); + } + } else if (arg == "-help" || arg == "-h") { + printUsage(argv[0], isMpi); + exit(0); + } else { + fprintf(stderr, "Error: Unknown option %s\n", argv[i]); + exit(-1); + } + } + return options; +} + +int main(int argc, const char* argv[]) +{ + bool isMpi = false; +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + isMpi = true; +#endif + + auto parsedArgs = parseArgs(argc, argv, isMpi); + + int rank; + int world_size; +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + // get the local number of nodes with MPI + MPI_Comm shmcomm; + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + int shmrank; + MPI_Comm_size(shmcomm, &shmrank); + nranksPerNode = shmrank; + MPI_Comm_free(&shmcomm); +#else + if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) { + printUsage(argv[0], isMpi); + exit(-1); + } + rank = std::stoi(parsedArgs["rank"]); + world_size = std::stoi(parsedArgs["nranks"]); + if (parsedArgs.find("rankspernode") == parsedArgs.end()) { + printUsage(argv[0], isMpi); + exit(-1); + } + nranksPerNode = std::stoi(parsedArgs["rankspernode"]); +#endif + int kernelNum = 0; + if (parsedArgs.find("kernel") != parsedArgs.end()) { + kernelNum = std::stoi(parsedArgs["kernel"]); + } + char* ip_port = NULL; + if (parsedArgs.find("ip_port") == parsedArgs.end()) { + printUsage(argv[0], isMpi); + exit(-1); + } + ip_port = (char*)parsedArgs["ip_port"].c_str(); + + int thisNode = rankToNode(rank); + int cudaNum = rankToLocalRank(rank); + CUDACHECK(cudaSetDevice(cudaNum)); + + if (rank == 0) + printf("Initializing MSCCL++\n"); + mscclppComm_t comm; + MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank)); + + int* data_d; + int* data_h; + size_t dataSize = 1024 * 1024 * 1024; + if (parsedArgs.find("datasize") != parsedArgs.end()) { + dataSize = std::stoul(parsedArgs["datasize"]); + } + size_t nelemsPerGPU = dataSize / sizeof(int) / world_size; + + if (rank == 0) + printf("Initializing data for allgather test\n"); + initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d); + + if (rank == 0) + printf("Setting up the connection in MSCCL++\n"); + MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize)); + + if (rank == 0) + printf("Launching MSCCL++ proxy threads\n"); + MSCCLPPCHECK(mscclppProxyLaunch(comm)); + + if (rank == 0) + printf("Testing the correctness of AllGather implementation\n"); + cudaStream_t stream; + CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); + CUDACHECK(cudaDeviceSynchronize()); + kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); + CUDACHECK(cudaDeviceSynchronize()); + CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost)); + + for (size_t i = 0; i < nelemsPerGPU * world_size; i++) { + int val = i + 1; + if (data_h[i] != val) { + printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val); + break; + } + } + int tmp[16]; + // A simple barrier + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + if (rank == 0) + printf("Successfully checked the correctness\n"); + + // Perf test + int iterwithoutcudagraph = 10; + if (rank == 0) + printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph); + CUDACHECK(cudaStreamSynchronize(stream)); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + for (int i = 0; i < iterwithoutcudagraph; ++i) { + kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); + } + CUDACHECK(cudaStreamSynchronize(stream)); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + + // cudaGraph Capture + int cudagraphiter = 10; + if (rank == 0) + printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter); + cudaGraph_t graph; + cudaGraphExec_t instance; + cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); + for (int i = 0; i < cudagraphiter; ++i) { + kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum); + } + cudaStreamEndCapture(stream, &graph); + cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); + + int cudagraphwarmup = 10; + if (rank == 0) + printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup, + cudagraphiter); + for (int i = 0; i < cudagraphwarmup; ++i) { + cudaGraphLaunch(instance, stream); + } + CUDACHECK(cudaStreamSynchronize(stream)); + + // measure runtime + int cudagraphlaunch = 10; + if (rank == 0) + printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch, + cudagraphiter); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + double t0, t1, ms, time_in_us; + t0 = getTime(); + for (int i = 0; i < cudagraphlaunch; ++i) { + cudaGraphLaunch(instance, stream); + } + CUDACHECK(cudaStreamSynchronize(stream)); + + t1 = getTime(); + ms = (t1 - t0) * 1000.0; + time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter; + printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us, + (double)(dataSize) / 1e9 / (time_in_us / 1e6)); + MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int))); + + if (rank == 0) + printf("Stopping MSCCL++ proxy threads\n"); + MSCCLPPCHECK(mscclppProxyStop(comm)); + + if (rank == 0) + printf("Destroying MSCCL++ communicator\n"); + MSCCLPPCHECK(mscclppCommDestroy(comm)); + printf("Rank %d succeeded!\n", rank); + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + MPI_Finalize(); +#endif + return 0; +} diff --git a/tests/common.cu b/tests/common.cu new file mode 100644 index 00000000..77c3223f --- /dev/null +++ b/tests/common.cu @@ -0,0 +1,680 @@ +/************************************************************************* + * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "common.h" +#include "cuda.h" +#include "mscclpp.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#define NUM_BLOCKS 32 + +int is_main_proc = 0; +thread_local int is_main_thread = 0; + +namespace { +class timer +{ + std::uint64_t t0; + +public: + timer(); + double elapsed() const; + double reset(); +}; + +std::uint64_t now() +{ + using clock = std::chrono::steady_clock; + return std::chrono::duration_cast(clock::now().time_since_epoch()).count(); +} + +// Command line parameter defaults +size_t minBytes = 32 * 1024 * 1024; +size_t maxBytes = 32 * 1024 * 1024; +size_t stepBytes = 1 * 1024 * 1024; +size_t stepFactor = 1; +int datacheck = 1; +int warmup_iters = 10; +int iters = 100; +int timeout = 0; +int report_cputime = 0; +// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX) +int average = 1; +int kernel_num = 0; +int cudaGraphLaunches = 15; + +double parsesize(const char* value) +{ + long long int units; + double size; + char size_lit; + + int count = sscanf(value, "%lf %1s", &size, &size_lit); + + switch (count) { + case 2: + switch (size_lit) { + case 'G': + case 'g': + units = 1024 * 1024 * 1024; + break; + case 'M': + case 'm': + units = 1024 * 1024; + break; + case 'K': + case 'k': + units = 1024; + break; + default: + return -1.0; + }; + break; + case 1: + units = 1; + break; + default: + return -1.0; + } + + return size * units; +} + +inline testResult_t Barrier(struct testArgs* args) +{ + int tmp[16]; + // A simple barrier + MSCCLPPCHECK(mscclppBootstrapAllGather(args->comm, tmp, sizeof(int))); + return testSuccess; +} +} // namespace + +timer::timer() +{ + t0 = now(); +} + +double timer::elapsed() const +{ + std::uint64_t t1 = now(); + return 1.e-9 * (t1 - t0); +} + +double timer::reset() +{ + std::uint64_t t1 = now(); + double ans = 1.e-9 * (t1 - t0); + t0 = t1; + return ans; +} + +testResult_t AllocateBuffs(void** sendbuff, size_t sendBytes, void** recvbuff, size_t recvBytes, void** expected, + size_t nbytes) +{ + CUDACHECK(cudaMalloc(sendbuff, nbytes)); + CUDACHECK(cudaMalloc(recvbuff, nbytes)); + if (datacheck) + CUDACHECK(cudaMalloc(expected, recvBytes)); + return testSuccess; +} + +testResult_t startColl(struct testArgs* args, int in_place, int iter) +{ + size_t count = args->nbytes; + + // Try to change offset for each iteration so that we avoid cache effects and catch race conditions in ptrExchange + size_t totalnbytes = max(args->sendBytes, args->expectedBytes); + size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1; + size_t shift = totalnbytes * (iter % steps); + + int rank = args->proc; + char* recvBuff = ((char*)args->recvbuff) + shift; + char* sendBuff = ((char*)args->sendbuff) + shift; + + TESTCHECK(args->collTest->runColl((void*)(in_place ? recvBuff + args->sendInplaceOffset * rank : sendBuff), + (void*)(in_place ? recvBuff + args->recvInplaceOffset * rank : recvBuff), + args->nranksPerNode, count, args->comm, args->stream, args->kernel_num)); + return testSuccess; +} + +testResult_t testStreamSynchronize(cudaStream_t stream) +{ + cudaError_t cudaErr; + timer tim; + + while (true) { + cudaErr = cudaStreamQuery(stream); + if (cudaErr == cudaSuccess) { + break; + } + + if (cudaErr != cudaErrorNotReady) + CUDACHECK(cudaErr); + + double delta = tim.elapsed(); + if (delta > timeout && timeout > 0) { + char hostname[1024]; + getHostName(hostname, 1024); + printf("%s: Test timeout (%ds) %s:%d\n", hostname, timeout, __FILE__, __LINE__); + return testTimeout; + } + + // We might want to let other threads (including MSCCLPP threads) use the CPU. + sched_yield(); + } + return testSuccess; +} + +testResult_t completeColl(struct testArgs* args) +{ + TESTCHECK(testStreamSynchronize(args->stream)); + return testSuccess; +} + +// Inter process barrier+allreduce. The quality of the return value +// for average=0 is just value itself. +// Inter process barrier+allreduce. The quality of the return value +// for average=0 is just value itself. +template void Allreduce(struct testArgs* args, T* value, int average) +{ + T accumulator = *value; + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (average != 0) { + static_assert(std::is_same::value || std::is_same::value, + "Allreduce only for T in {long long, double}"); + MPI_Datatype ty = std::is_same::value ? MPI_LONG_LONG + : std::is_same::value ? MPI_DOUBLE + : MPI_Datatype(); + MPI_Op op = average == 1 ? MPI_SUM + : average == 2 ? MPI_MIN + : average == 3 ? MPI_MAX + : average == 4 ? MPI_SUM + : MPI_Op(); + MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, ty, op, MPI_COMM_WORLD); + } +#endif + + if (average == 1) + accumulator /= args->totalProcs; + *value = accumulator; +} + +testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts) +{ + if (in_place == 0) { + return testInternalError; + } + size_t count = args->expectedBytes / sizeof(int); + + int* dataHostRecv = new int[count]; + int* dataHostExpected = new int[count]; + CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost)); + CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost)); + + for (size_t i = 0; i < count; i++) { + if (dataHostRecv[i] != dataHostExpected[i]) { + *wrongElts += 1; + } + } + + if (args->reportErrors && *wrongElts) { + (args->error)++; + } + return testSuccess; +} + +testResult_t BenchTime(struct testArgs* args, int in_place) +{ + size_t count = args->nbytes; + + TESTCHECK(args->collTest->initData(args, in_place)); + // Sync + TESTCHECK(startColl(args, in_place, 0)); + TESTCHECK(completeColl(args)); + + TESTCHECK(Barrier(args)); + + // Performance Benchmark + cudaGraph_t graph; + cudaGraphExec_t graphExec; + CUDACHECK(cudaStreamBeginCapture(args->stream, cudaStreamCaptureModeGlobal)); + timer tim; + for (int iter = 0; iter < iters; iter++) { + TESTCHECK(startColl(args, in_place, iter)); + } + CUDACHECK(cudaStreamEndCapture(args->stream, &graph)); + CUDACHECK(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0)); + + // Launch the graph + TESTCHECK(Barrier(args)); + tim.reset(); + for (int l = 0; l < cudaGraphLaunches; ++l) { + CUDACHECK(cudaGraphLaunch(graphExec, args->stream)); + } + + double cputimeSec = tim.elapsed() / (iters); + TESTCHECK(completeColl(args)); + + double deltaSec = tim.elapsed(); + deltaSec = deltaSec / (iters) / (cudaGraphLaunches); + Allreduce(args, &deltaSec, average); + + CUDACHECK(cudaGraphExecDestroy(graphExec)); + CUDACHECK(cudaGraphDestroy(graph)); + + double algBw, busBw; + args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs); + TESTCHECK(Barrier(args)); + + int64_t wrongElts = 0; + if (datacheck) { + // Initialize sendbuffs, recvbuffs and expected + TESTCHECK(args->collTest->initData(args, in_place)); + // Begin cuda graph capture for data check + CUDACHECK(cudaStreamBeginCapture(args->stream, cudaStreamCaptureModeGlobal)); + // test validation in single itertion, should ideally be included into the multi-iteration run + TESTCHECK(startColl(args, in_place, 0)); + // End cuda graph capture + CUDACHECK(cudaStreamEndCapture(args->stream, &graph)); + // Instantiate cuda graph + CUDACHECK(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0)); + // Launch cuda graph + CUDACHECK(cudaGraphLaunch(graphExec, args->stream)); + + TESTCHECK(completeColl(args)); + + // destroy cuda graph + CUDACHECK(cudaGraphExecDestroy(graphExec)); + CUDACHECK(cudaGraphDestroy(graph)); + + TESTCHECK(CheckData(args, in_place, &wrongElts)); + + // aggregate delta from all threads and procs + long long wrongElts1 = wrongElts; + Allreduce(args, &wrongElts1, /*sum*/ 4); + wrongElts = wrongElts1; + } + + double timeUsec = (report_cputime ? cputimeSec : deltaSec) * 1.0E6; + char timeStr[100]; + if (timeUsec >= 10000.0) { + sprintf(timeStr, "%7.0f", timeUsec); + } else if (timeUsec >= 100.0) { + sprintf(timeStr, "%7.1f", timeUsec); + } else { + sprintf(timeStr, "%7.2f", timeUsec); + } + if (args->reportErrors) { + PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts); + } else { + PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A"); + } + + args->bw += busBw; + args->bw_count++; + return testSuccess; +} + +void setupArgs(size_t size, struct testArgs* args) +{ + int nranks = args->totalProcs; + size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset; + + // TODO: support more data types + int typeSize = sizeof(int); + count = size / typeSize; + args->collTest->getCollByteCount(&sendCount, &recvCount, ¶mCount, &sendInplaceOffset, &recvInplaceOffset, + (size_t)count, (size_t)nranks); + + args->nbytes = paramCount * typeSize; + args->sendBytes = sendCount * typeSize; + args->expectedBytes = recvCount * typeSize; + args->sendInplaceOffset = sendInplaceOffset * typeSize; + args->recvInplaceOffset = recvInplaceOffset * typeSize; +} + +testResult_t TimeTest(struct testArgs* args) +{ + // Sync to avoid first-call timeout + TESTCHECK(Barrier(args)); + + // Warm-up for large size + setupArgs(args->maxbytes, args); + TESTCHECK(args->collTest->initData(args, 1)); + for (int iter = 0; iter < warmup_iters; iter++) { + TESTCHECK(startColl(args, 1, iter)); + } + TESTCHECK(completeColl(args)); + + // Warm-up for small size + setupArgs(args->minbytes, args); + for (int iter = 0; iter < warmup_iters; iter++) { + TESTCHECK(startColl(args, 1, iter)); + } + TESTCHECK(completeColl(args)); + + PRINT("#\n"); + PRINT("# %10s %12s in-place out-of-place \n", "", ""); + PRINT("# %10s %12s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "time", "algbw", "busbw", "#wrong", + "time", "algbw", "busbw", "#wrong"); + PRINT("# %10s %12s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "(us)", "(GB/s)", "(GB/s)", "", + "(us)", "(GB/s)", "(GB/s)", ""); + // Benchmark + for (size_t size = args->minbytes; size <= args->maxbytes; + size = ((args->stepfactor > 1) ? size * args->stepfactor : size + args->stepbytes)) { + setupArgs(size, args); + PRINT("%12li %12li", max(args->sendBytes, args->expectedBytes), args->nbytes / sizeof(int)); + // Don't support out-of-place for now + // TESTCHECK(BenchTime(args, 0)); + TESTCHECK(BenchTime(args, 1)); + PRINT("\n"); + } + return testSuccess; +} + +testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode, mscclppComm_t comm, void* dataDst, + size_t dataSize) +{ + int thisNode = rank / ranksPerNode; + int localRank = rank % ranksPerNode; + std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); + + for (int r = 0; r < worldSize; ++r) { + if (r == rank) + continue; + mscclppTransport_t transportType; + const char* ibDev = ibDevStr.c_str(); + if (r / ranksPerNode == thisNode) { + ibDev = NULL; + transportType = mscclppTransportP2P; + } else { + transportType = mscclppTransportIB; + } + // Connect with all other ranks + MSCCLPPCHECK(mscclppConnect(comm, r, 0, dataDst, dataSize, transportType, ibDev)); + } + + MSCCLPPCHECK(mscclppConnectionSetup(comm)); + + return testSuccess; +} + +testResult_t runTests(struct testArgs* args) +{ + PRINT("# Setting up the connection in MSCCL++\n"); + TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff, + args->maxbytes)); + PRINT("# Launching MSCCL++ proxy threads\n"); + MSCCLPPCHECK(mscclppProxyLaunch(args->comm)); + TESTCHECK(mscclppTestEngine.runTest(args)); + PRINT("Stopping MSCCL++ proxy threads\n"); + MSCCLPPCHECK(mscclppProxyStop(args->comm)); + return testSuccess; +} + +testResult_t run(); // Main function + +int main(int argc, char* argv[]) +{ + // Make sure everyline is flushed so that we see the progress of the test + setlinebuf(stdout); + + // Parse args + double parsed; + int longindex; + static struct option longopts[] = {{"minbytes", required_argument, 0, 'b'}, + {"maxbytes", required_argument, 0, 'e'}, + {"stepbytes", required_argument, 0, 'i'}, + {"stepfactor", required_argument, 0, 'f'}, + {"iters", required_argument, 0, 'n'}, + {"warmup_iters", required_argument, 0, 'w'}, + {"check", required_argument, 0, 'c'}, + {"timeout", required_argument, 0, 'T'}, + {"cudagraph", required_argument, 0, 'G'}, + {"report_cputime", required_argument, 0, 'C'}, + {"average", required_argument, 0, 'a'}, + {"kernel_num", required_argument, 0, 'k'}, + {"help", no_argument, 0, 'h'}, + {}}; + + while (1) { + int c; + c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:k:h:", longopts, &longindex); + + if (c == -1) + break; + + switch (c) { + case 'b': + parsed = parsesize(optarg); + if (parsed < 0) { + fprintf(stderr, "invalid size specified for 'minbytes'\n"); + return -1; + } + minBytes = (size_t)parsed; + break; + case 'e': + parsed = parsesize(optarg); + if (parsed < 0) { + fprintf(stderr, "invalid size specified for 'maxbytes'\n"); + return -1; + } + maxBytes = (size_t)parsed; + break; + case 'i': + stepBytes = strtol(optarg, NULL, 0); + break; + case 'f': + stepFactor = strtol(optarg, NULL, 0); + break; + case 'n': + iters = (int)strtol(optarg, NULL, 0); + break; + case 'w': + warmup_iters = (int)strtol(optarg, NULL, 0); + break; + case 'c': + datacheck = (int)strtol(optarg, NULL, 0); + break; + case 'T': + timeout = strtol(optarg, NULL, 0); + break; + case 'G': + cudaGraphLaunches = strtol(optarg, NULL, 0); + if (cudaGraphLaunches <= 0) { + fprintf(stderr, "invalid number for 'cudaGraphLaunches'\n"); + return -1; + } + break; + case 'C': + report_cputime = strtol(optarg, NULL, 0); + break; + case 'a': + average = (int)strtol(optarg, NULL, 0); + break; + case 'k': + kernel_num = (int)strtol(optarg, NULL, 0); + break; + case 'h': + default: + if (c != 'h') + printf("invalid option '%c'\n", c); + printf("USAGE: %s \n\t" + "[-b,--minbytes ] \n\t" + "[-e,--maxbytes ] \n\t" + "[-i,--stepbytes ] \n\t" + "[-f,--stepfactor ] \n\t" + "[-n,--iters ] \n\t" + "[-w,--warmup_iters ] \n\t" + "[-c,--check <0/1>] \n\t" + "[-T,--timeout