From e303a532ff7b14830c25519309e85be9c6922dae Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 7 Apr 2023 05:42:21 +0000 Subject: [PATCH] address comments --- tests/allgather_test.cu | 16 +++--- tests/common.cu | 110 ++++++++++++++++++++-------------------- tests/common.h | 36 +++++++------ 3 files changed, 80 insertions(+), 82 deletions(-) diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 8354e69a..416f2e66 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -147,15 +147,15 @@ void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* par *paramcount = base; } -testResult_t AllGatherInitData(struct threadArgs* args, int in_place) +testResult_t AllGatherInitData(struct testArgs* args, int in_place) { size_t sendcount = args->sendBytes / sizeof(int); size_t recvcount = args->expectedBytes / sizeof(int); // int nranks = args->totalProcs; - CUDACHECK(cudaSetDevice(args->gpus[0])); + CUDACHECK(cudaSetDevice(args->gpuNum)); int rank = args->proc; - CUDACHECK(cudaMemset(args->recvbuffs[0], 0, args->expectedBytes)); + CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes)); // void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0]; int* dataHost = new int[recvcount]; @@ -167,11 +167,11 @@ testResult_t AllGatherInitData(struct threadArgs* args, int in_place) dataHost[i] = 0; } } - CUDACHECK(cudaMemcpy(args->recvbuffs[0], dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); + CUDACHECK(cudaMemcpy(args->recvbuff, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); for (int i = 0; i < static_cast(recvcount); i++) { dataHost[i] = i + 1; } - CUDACHECK(cudaMemcpy(args->expected[0], dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); + CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); delete dataHost; CUDACHECK(cudaDeviceSynchronize()); return testSuccess; @@ -187,10 +187,10 @@ void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, doubl } testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, - cudaStream_t stream) + cudaStream_t stream, int kernel_num) { int worldSize = comm->nRanks; - kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), 1); + kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), kernel_num); return testSuccess; } @@ -203,7 +203,7 @@ void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, in AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); } -testResult_t AllGatherRunTest(struct threadArgs* args) +testResult_t AllGatherRunTest(struct testArgs* args) { args->collTest = &allGatherTest; mscclppDevConn_t* devConns; diff --git a/tests/common.cu b/tests/common.cu index 6722f57c..20baac0f 100644 --- a/tests/common.cu +++ b/tests/common.cu @@ -53,6 +53,7 @@ int report_cputime = 0; // Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX) int average = 1; std::string ip_port; +int kernel_num = 0; int cudaGraphLaunches = 15; double parsesize(const char* value) @@ -92,7 +93,7 @@ double parsesize(const char* value) return size * units; } -inline testResult_t Barrier(struct threadArgs* args) +inline testResult_t Barrier(struct testArgs* args) { int tmp[16]; // A simple barrier @@ -130,7 +131,7 @@ testResult_t AllocateBuffs(void** sendbuff, size_t sendBytes, void** recvbuff, s return testSuccess; } -testResult_t startColl(struct threadArgs* args, int in_place, int iter) +testResult_t startColl(struct testArgs* args, int in_place, int iter) { size_t count = args->nbytes; @@ -139,13 +140,13 @@ testResult_t startColl(struct threadArgs* args, int in_place, int iter) size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1; size_t shift = totalnbytes * (iter % steps); - int rank = ((args->proc * args->nThreads + args->thread) * args->nGpus); - char* recvBuff = ((char*)args->recvbuffs[0]) + shift; - char* sendBuff = ((char*)args->sendbuffs[0]) + shift; + int rank = args->proc; + char* recvBuff = ((char*)args->recvbuff) + shift; + char* sendBuff = ((char*)args->sendbuff) + shift; TESTCHECK(args->collTest->runColl((void*)(in_place ? recvBuff + args->sendInplaceOffset * rank : sendBuff), (void*)(in_place ? recvBuff + args->recvInplaceOffset * rank : recvBuff), - args->nranksPerNode, count, args->comm, args->stream)); + args->nranksPerNode, count, args->comm, args->stream, args->kernel_num)); return testSuccess; } @@ -177,7 +178,7 @@ testResult_t testStreamSynchronize(cudaStream_t stream) return testSuccess; } -testResult_t completeColl(struct threadArgs* args) +testResult_t completeColl(struct testArgs* args) { TESTCHECK(testStreamSynchronize(args->stream)); return testSuccess; @@ -187,7 +188,7 @@ testResult_t completeColl(struct threadArgs* args) // for average=0 is just value itself. // Inter process barrier+allreduce. The quality of the return value // for average=0 is just value itself. -template void Allreduce(struct threadArgs* args, T* value, int average) +template void Allreduce(struct testArgs* args, T* value, int average) { T accumulator = *value; @@ -212,7 +213,7 @@ template void Allreduce(struct threadArgs* args, T* value, int aver *value = accumulator; } -testResult_t CheckData(struct threadArgs* args, int in_place, int64_t* wrongElts) +testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts) { if (in_place == 0) { return testInternalError; @@ -221,8 +222,8 @@ testResult_t CheckData(struct threadArgs* args, int in_place, int64_t* wrongElts int* dataHostRecv = new int[count]; int* dataHostExpected = new int[count]; - CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuffs[0], args->expectedBytes, cudaMemcpyDeviceToHost)); - CUDACHECK(cudaMemcpy(dataHostExpected, args->expected[0], args->expectedBytes, cudaMemcpyDeviceToHost)); + CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost)); + CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost)); for (size_t i = 0; i < count; i++) { if (dataHostRecv[i] != dataHostExpected[i]) { @@ -231,12 +232,12 @@ testResult_t CheckData(struct threadArgs* args, int in_place, int64_t* wrongElts } if (args->reportErrors && *wrongElts) { - (*args->errors)++; + (args->error)++; } return testSuccess; } -testResult_t BenchTime(struct threadArgs* args, int in_place) +testResult_t BenchTime(struct testArgs* args, int in_place) { size_t count = args->nbytes; @@ -276,7 +277,7 @@ testResult_t BenchTime(struct threadArgs* args, int in_place) CUDACHECK(cudaGraphDestroy(graph)); double algBw, busBw; - args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs * args->nThreads * args->nGpus); + args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs); TESTCHECK(Barrier(args)); int64_t wrongElts = 0; @@ -323,14 +324,14 @@ testResult_t BenchTime(struct threadArgs* args, int in_place) PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A"); } - args->bw[0] += busBw; - args->bw_count[0]++; + args->bw += busBw; + args->bw_count++; return testSuccess; } -void setupArgs(size_t size, struct threadArgs* args) +void setupArgs(size_t size, struct testArgs* args) { - int nranks = args->totalProcs * args->nGpus * args->nThreads; + int nranks = args->totalProcs; size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset; // TODO: support more data types @@ -346,7 +347,7 @@ void setupArgs(size_t size, struct threadArgs* args) args->recvInplaceOffset = recvInplaceOffset * typeSize; } -testResult_t TimeTest(struct threadArgs* args) +testResult_t TimeTest(struct testArgs* args) { // Sync to avoid first-call timeout TESTCHECK(Barrier(args)); @@ -412,10 +413,10 @@ testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode, return testSuccess; } -testResult_t threadRunTests(struct threadArgs* args) +testResult_t runTests(struct testArgs* args) { PRINT("# Setting up the connection in MSCCL++\n"); - TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuffs[0], + TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff, args->maxbytes)); PRINT("# Launching MSCCL++ proxy threads\n"); MSCCLPPCHECK(mscclppProxyLaunch(args->comm)); @@ -447,12 +448,13 @@ int main(int argc, char* argv[]) {"report_cputime", required_argument, 0, 'C'}, {"average", required_argument, 0, 'a'}, {"ip_port", required_argument, 0, 'P'}, + {"kernel_num", required_argument, 0, 'k'}, {"help", no_argument, 0, 'h'}, {}}; while (1) { int c; - c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:h:", longopts, &longindex); + c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:k:h:", longopts, &longindex); if (c == -1) break; @@ -508,6 +510,9 @@ int main(int argc, char* argv[]) case 'P': ip_port = optarg; break; + case 'k': + kernel_num = (int)strtol(optarg, NULL, 0); + break; case 'h': default: if (c != 'h') @@ -525,6 +530,7 @@ int main(int argc, char* argv[]) "[-C,--report_cputime <0/1>] \n\t" "[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t" "[-P,--ip_port ] \n\t" + "[-k,--kernel_num ] \n\t" "[-h,--help]\n", basename(argv[0])); return 0; @@ -565,9 +571,10 @@ testResult_t run() is_main_thread = is_main_proc = (proc == 0) ? 1 : 0; is_main_thread = is_main_proc = (proc == 0) ? 1 : 0; - PRINT("# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d ip port: %s graph: %d\n", + PRINT("# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d ip port: %s graph: %d, " + "kernel num: %d\n", minBytes, maxBytes, (stepFactor > 1) ? stepFactor : stepBytes, (stepFactor > 1) ? "factor" : "bytes", - warmup_iters, iters, datacheck, ip_port.c_str(), cudaGraphLaunches); + warmup_iters, iters, datacheck, ip_port.c_str(), cudaGraphLaunches, kernel_num); PRINT("#\n"); PRINT("# Using devices\n"); @@ -607,7 +614,6 @@ testResult_t run() printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes); } - int gpu = cudaDev; cudaStream_t stream; void* sendbuff; void* recvbuff; @@ -616,7 +622,7 @@ testResult_t run() mscclppTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)totalProcs); - CUDACHECK(cudaSetDevice(gpu)); + CUDACHECK(cudaSetDevice(cudaDev)); TESTCHECK(AllocateBuffs(&sendbuff, sendBytes, &recvbuff, recvBytes, &expected, (size_t)maxBytes)); CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); PRINT("#\n"); @@ -625,43 +631,38 @@ testResult_t run() mscclppComm_t comm; MSCCLPPCHECK(mscclppCommInitRank(&comm, totalProcs, ip_port.c_str(), rank)); - int error = 0; - double bw = 0.0; double* delta; CUDACHECK(cudaHostAlloc(&delta, sizeof(double) * NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped)); - int bw_count = 0; fflush(stdout); - struct testThread thread; + struct testWorker worker; - thread.args.minbytes = minBytes; - thread.args.maxbytes = maxBytes; - thread.args.stepbytes = stepBytes; - thread.args.stepfactor = stepFactor; - thread.args.localRank = localRank; - thread.args.nranksPerNode = nranksPerNode; + worker.args.minbytes = minBytes; + worker.args.maxbytes = maxBytes; + worker.args.stepbytes = stepBytes; + worker.args.stepfactor = stepFactor; + worker.args.localRank = localRank; + worker.args.nranksPerNode = nranksPerNode; - thread.args.totalProcs = totalProcs; - thread.args.proc = proc; - thread.args.nThreads = 1; - thread.args.thread = 0; - thread.args.nGpus = 1; - thread.args.gpus = &gpu; - thread.args.sendbuffs = &sendbuff; - thread.args.recvbuffs = &recvbuff; - thread.args.expected = &expected; - thread.args.comm = comm; - thread.args.stream = stream; + worker.args.totalProcs = totalProcs; + worker.args.proc = proc; + worker.args.gpuNum = cudaDev; + worker.args.kernel_num = kernel_num; + worker.args.sendbuff = sendbuff; + worker.args.recvbuff = recvbuff; + worker.args.expected = expected; + worker.args.comm = comm; + worker.args.stream = stream; - thread.args.errors = &error; - thread.args.bw = &bw; - thread.args.bw_count = &bw_count; + worker.args.error = 0; + worker.args.bw = 0.0; + worker.args.bw_count = 0; - thread.args.reportErrors = datacheck; + worker.args.reportErrors = datacheck; - thread.func = threadRunTests; - TESTCHECK(thread.func(&thread.args)); + worker.func = runTests; + TESTCHECK(worker.func(&worker.args)); MSCCLPPCHECK(mscclppCommDestroy(comm)); @@ -674,8 +675,7 @@ testResult_t run() CUDACHECK(cudaFree(expected)); CUDACHECK(cudaFreeHost(delta)); - bw /= bw_count; - + int error = worker.args.error; PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK"); PRINT("#\n"); diff --git a/tests/common.h b/tests/common.h index 7d0e9267..39766de0 100644 --- a/tests/common.h +++ b/tests/common.h @@ -69,22 +69,22 @@ struct testColl const char name[20]; void (*getCollByteCount)(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, size_t* recvInplaceOffset, size_t count, int nranks); - testResult_t (*initData)(struct threadArgs* args, int in_place); + testResult_t (*initData)(struct testArgs* args, int in_place); void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks); testResult_t (*runColl)(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, - cudaStream_t stream); + cudaStream_t stream, int kernel_num); }; struct testEngine { void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks); // We can add more parameters for other communication primitives - testResult_t (*runTest)(struct threadArgs* args); + testResult_t (*runTest)(struct testArgs* args); }; extern struct testEngine mscclppTestEngine; -struct threadArgs +struct testArgs { size_t nbytes; size_t minbytes; @@ -94,40 +94,38 @@ struct threadArgs int totalProcs; int proc; - int nThreads; - int thread; - int nGpus; - int* gpus; + int gpuNum; int localRank; int nranksPerNode; - void** sendbuffs; + int kernel_num; + void* sendbuff; size_t sendBytes; size_t sendInplaceOffset; - void** recvbuffs; + void* recvbuff; size_t recvInplaceOffset; mscclppComm_t comm; cudaStream_t stream; - void** expected; + void* expected; size_t expectedBytes; - int* errors; - double* bw; - int* bw_count; + int error; + double bw; + int bw_count; int reportErrors; struct testColl* collTest; }; -typedef testResult_t (*threadFunc_t)(struct threadArgs* args); -struct testThread +typedef testResult_t (*entryFunc_t)(struct testArgs* args); +struct testWorker { - threadFunc_t func; - struct threadArgs args; + entryFunc_t func; + struct testArgs args; }; // Provided by common.cu -extern testResult_t TimeTest(struct threadArgs* args); +extern testResult_t TimeTest(struct testArgs* args); static void getHostName(char* hostname, int maxlen) {