address comments

This commit is contained in:
Binyang Li
2023-04-07 05:42:21 +00:00
parent bf472ff864
commit e303a532ff
3 changed files with 80 additions and 82 deletions

View File

@@ -147,15 +147,15 @@ void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* par
*paramcount = base;
}
testResult_t AllGatherInitData(struct threadArgs* args, int in_place)
testResult_t AllGatherInitData(struct testArgs* args, int in_place)
{
size_t sendcount = args->sendBytes / sizeof(int);
size_t recvcount = args->expectedBytes / sizeof(int);
// int nranks = args->totalProcs;
CUDACHECK(cudaSetDevice(args->gpus[0]));
CUDACHECK(cudaSetDevice(args->gpuNum));
int rank = args->proc;
CUDACHECK(cudaMemset(args->recvbuffs[0], 0, args->expectedBytes));
CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes));
// void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0];
int* dataHost = new int[recvcount];
@@ -167,11 +167,11 @@ testResult_t AllGatherInitData(struct threadArgs* args, int in_place)
dataHost[i] = 0;
}
}
CUDACHECK(cudaMemcpy(args->recvbuffs[0], dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
CUDACHECK(cudaMemcpy(args->recvbuff, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
for (int i = 0; i < static_cast<int>(recvcount); i++) {
dataHost[i] = i + 1;
}
CUDACHECK(cudaMemcpy(args->expected[0], dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
delete dataHost;
CUDACHECK(cudaDeviceSynchronize());
return testSuccess;
@@ -187,10 +187,10 @@ void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, doubl
}
testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
cudaStream_t stream)
cudaStream_t stream, int kernel_num)
{
int worldSize = comm->nRanks;
kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), 1);
kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), kernel_num);
return testSuccess;
}
@@ -203,7 +203,7 @@ void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, in
AllGatherGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AllGatherRunTest(struct threadArgs* args)
testResult_t AllGatherRunTest(struct testArgs* args)
{
args->collTest = &allGatherTest;
mscclppDevConn_t* devConns;

View File

@@ -53,6 +53,7 @@ int report_cputime = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
int average = 1;
std::string ip_port;
int kernel_num = 0;
int cudaGraphLaunches = 15;
double parsesize(const char* value)
@@ -92,7 +93,7 @@ double parsesize(const char* value)
return size * units;
}
inline testResult_t Barrier(struct threadArgs* args)
inline testResult_t Barrier(struct testArgs* args)
{
int tmp[16];
// A simple barrier
@@ -130,7 +131,7 @@ testResult_t AllocateBuffs(void** sendbuff, size_t sendBytes, void** recvbuff, s
return testSuccess;
}
testResult_t startColl(struct threadArgs* args, int in_place, int iter)
testResult_t startColl(struct testArgs* args, int in_place, int iter)
{
size_t count = args->nbytes;
@@ -139,13 +140,13 @@ testResult_t startColl(struct threadArgs* args, int in_place, int iter)
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
size_t shift = totalnbytes * (iter % steps);
int rank = ((args->proc * args->nThreads + args->thread) * args->nGpus);
char* recvBuff = ((char*)args->recvbuffs[0]) + shift;
char* sendBuff = ((char*)args->sendbuffs[0]) + shift;
int rank = args->proc;
char* recvBuff = ((char*)args->recvbuff) + shift;
char* sendBuff = ((char*)args->sendbuff) + shift;
TESTCHECK(args->collTest->runColl((void*)(in_place ? recvBuff + args->sendInplaceOffset * rank : sendBuff),
(void*)(in_place ? recvBuff + args->recvInplaceOffset * rank : recvBuff),
args->nranksPerNode, count, args->comm, args->stream));
args->nranksPerNode, count, args->comm, args->stream, args->kernel_num));
return testSuccess;
}
@@ -177,7 +178,7 @@ testResult_t testStreamSynchronize(cudaStream_t stream)
return testSuccess;
}
testResult_t completeColl(struct threadArgs* args)
testResult_t completeColl(struct testArgs* args)
{
TESTCHECK(testStreamSynchronize(args->stream));
return testSuccess;
@@ -187,7 +188,7 @@ testResult_t completeColl(struct threadArgs* args)
// for average=0 is just value itself.
// Inter process barrier+allreduce. The quality of the return value
// for average=0 is just value itself.
template <typename T> void Allreduce(struct threadArgs* args, T* value, int average)
template <typename T> void Allreduce(struct testArgs* args, T* value, int average)
{
T accumulator = *value;
@@ -212,7 +213,7 @@ template <typename T> void Allreduce(struct threadArgs* args, T* value, int aver
*value = accumulator;
}
testResult_t CheckData(struct threadArgs* args, int in_place, int64_t* wrongElts)
testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts)
{
if (in_place == 0) {
return testInternalError;
@@ -221,8 +222,8 @@ testResult_t CheckData(struct threadArgs* args, int in_place, int64_t* wrongElts
int* dataHostRecv = new int[count];
int* dataHostExpected = new int[count];
CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuffs[0], args->expectedBytes, cudaMemcpyDeviceToHost));
CUDACHECK(cudaMemcpy(dataHostExpected, args->expected[0], args->expectedBytes, cudaMemcpyDeviceToHost));
CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost));
CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost));
for (size_t i = 0; i < count; i++) {
if (dataHostRecv[i] != dataHostExpected[i]) {
@@ -231,12 +232,12 @@ testResult_t CheckData(struct threadArgs* args, int in_place, int64_t* wrongElts
}
if (args->reportErrors && *wrongElts) {
(*args->errors)++;
(args->error)++;
}
return testSuccess;
}
testResult_t BenchTime(struct threadArgs* args, int in_place)
testResult_t BenchTime(struct testArgs* args, int in_place)
{
size_t count = args->nbytes;
@@ -276,7 +277,7 @@ testResult_t BenchTime(struct threadArgs* args, int in_place)
CUDACHECK(cudaGraphDestroy(graph));
double algBw, busBw;
args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs * args->nThreads * args->nGpus);
args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs);
TESTCHECK(Barrier(args));
int64_t wrongElts = 0;
@@ -323,14 +324,14 @@ testResult_t BenchTime(struct threadArgs* args, int in_place)
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
}
args->bw[0] += busBw;
args->bw_count[0]++;
args->bw += busBw;
args->bw_count++;
return testSuccess;
}
void setupArgs(size_t size, struct threadArgs* args)
void setupArgs(size_t size, struct testArgs* args)
{
int nranks = args->totalProcs * args->nGpus * args->nThreads;
int nranks = args->totalProcs;
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
// TODO: support more data types
@@ -346,7 +347,7 @@ void setupArgs(size_t size, struct threadArgs* args)
args->recvInplaceOffset = recvInplaceOffset * typeSize;
}
testResult_t TimeTest(struct threadArgs* args)
testResult_t TimeTest(struct testArgs* args)
{
// Sync to avoid first-call timeout
TESTCHECK(Barrier(args));
@@ -412,10 +413,10 @@ testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode,
return testSuccess;
}
testResult_t threadRunTests(struct threadArgs* args)
testResult_t runTests(struct testArgs* args)
{
PRINT("# Setting up the connection in MSCCL++\n");
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuffs[0],
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
args->maxbytes));
PRINT("# Launching MSCCL++ proxy threads\n");
MSCCLPPCHECK(mscclppProxyLaunch(args->comm));
@@ -447,12 +448,13 @@ int main(int argc, char* argv[])
{"report_cputime", required_argument, 0, 'C'},
{"average", required_argument, 0, 'a'},
{"ip_port", required_argument, 0, 'P'},
{"kernel_num", required_argument, 0, 'k'},
{"help", no_argument, 0, 'h'},
{}};
while (1) {
int c;
c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:h:", longopts, &longindex);
c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:k:h:", longopts, &longindex);
if (c == -1)
break;
@@ -508,6 +510,9 @@ int main(int argc, char* argv[])
case 'P':
ip_port = optarg;
break;
case 'k':
kernel_num = (int)strtol(optarg, NULL, 0);
break;
case 'h':
default:
if (c != 'h')
@@ -525,6 +530,7 @@ int main(int argc, char* argv[])
"[-C,--report_cputime <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-P,--ip_port <ip port for bootstrap>] \n\t"
"[-k,--kernel_num <kernel number of commnication primitive>] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
@@ -565,9 +571,10 @@ testResult_t run()
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
PRINT("# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d ip port: %s graph: %d\n",
PRINT("# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d ip port: %s graph: %d, "
"kernel num: %d\n",
minBytes, maxBytes, (stepFactor > 1) ? stepFactor : stepBytes, (stepFactor > 1) ? "factor" : "bytes",
warmup_iters, iters, datacheck, ip_port.c_str(), cudaGraphLaunches);
warmup_iters, iters, datacheck, ip_port.c_str(), cudaGraphLaunches, kernel_num);
PRINT("#\n");
PRINT("# Using devices\n");
@@ -607,7 +614,6 @@ testResult_t run()
printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
}
int gpu = cudaDev;
cudaStream_t stream;
void* sendbuff;
void* recvbuff;
@@ -616,7 +622,7 @@ testResult_t run()
mscclppTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)totalProcs);
CUDACHECK(cudaSetDevice(gpu));
CUDACHECK(cudaSetDevice(cudaDev));
TESTCHECK(AllocateBuffs(&sendbuff, sendBytes, &recvbuff, recvBytes, &expected, (size_t)maxBytes));
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
PRINT("#\n");
@@ -625,43 +631,38 @@ testResult_t run()
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRank(&comm, totalProcs, ip_port.c_str(), rank));
int error = 0;
double bw = 0.0;
double* delta;
CUDACHECK(cudaHostAlloc(&delta, sizeof(double) * NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped));
int bw_count = 0;
fflush(stdout);
struct testThread thread;
struct testWorker worker;
thread.args.minbytes = minBytes;
thread.args.maxbytes = maxBytes;
thread.args.stepbytes = stepBytes;
thread.args.stepfactor = stepFactor;
thread.args.localRank = localRank;
thread.args.nranksPerNode = nranksPerNode;
worker.args.minbytes = minBytes;
worker.args.maxbytes = maxBytes;
worker.args.stepbytes = stepBytes;
worker.args.stepfactor = stepFactor;
worker.args.localRank = localRank;
worker.args.nranksPerNode = nranksPerNode;
thread.args.totalProcs = totalProcs;
thread.args.proc = proc;
thread.args.nThreads = 1;
thread.args.thread = 0;
thread.args.nGpus = 1;
thread.args.gpus = &gpu;
thread.args.sendbuffs = &sendbuff;
thread.args.recvbuffs = &recvbuff;
thread.args.expected = &expected;
thread.args.comm = comm;
thread.args.stream = stream;
worker.args.totalProcs = totalProcs;
worker.args.proc = proc;
worker.args.gpuNum = cudaDev;
worker.args.kernel_num = kernel_num;
worker.args.sendbuff = sendbuff;
worker.args.recvbuff = recvbuff;
worker.args.expected = expected;
worker.args.comm = comm;
worker.args.stream = stream;
thread.args.errors = &error;
thread.args.bw = &bw;
thread.args.bw_count = &bw_count;
worker.args.error = 0;
worker.args.bw = 0.0;
worker.args.bw_count = 0;
thread.args.reportErrors = datacheck;
worker.args.reportErrors = datacheck;
thread.func = threadRunTests;
TESTCHECK(thread.func(&thread.args));
worker.func = runTests;
TESTCHECK(worker.func(&worker.args));
MSCCLPPCHECK(mscclppCommDestroy(comm));
@@ -674,8 +675,7 @@ testResult_t run()
CUDACHECK(cudaFree(expected));
CUDACHECK(cudaFreeHost(delta));
bw /= bw_count;
int error = worker.args.error;
PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK");
PRINT("#\n");

View File

@@ -69,22 +69,22 @@ struct testColl
const char name[20];
void (*getCollByteCount)(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
size_t* recvInplaceOffset, size_t count, int nranks);
testResult_t (*initData)(struct threadArgs* args, int in_place);
testResult_t (*initData)(struct testArgs* args, int in_place);
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
testResult_t (*runColl)(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
cudaStream_t stream);
cudaStream_t stream, int kernel_num);
};
struct testEngine
{
void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks);
// We can add more parameters for other communication primitives
testResult_t (*runTest)(struct threadArgs* args);
testResult_t (*runTest)(struct testArgs* args);
};
extern struct testEngine mscclppTestEngine;
struct threadArgs
struct testArgs
{
size_t nbytes;
size_t minbytes;
@@ -94,40 +94,38 @@ struct threadArgs
int totalProcs;
int proc;
int nThreads;
int thread;
int nGpus;
int* gpus;
int gpuNum;
int localRank;
int nranksPerNode;
void** sendbuffs;
int kernel_num;
void* sendbuff;
size_t sendBytes;
size_t sendInplaceOffset;
void** recvbuffs;
void* recvbuff;
size_t recvInplaceOffset;
mscclppComm_t comm;
cudaStream_t stream;
void** expected;
void* expected;
size_t expectedBytes;
int* errors;
double* bw;
int* bw_count;
int error;
double bw;
int bw_count;
int reportErrors;
struct testColl* collTest;
};
typedef testResult_t (*threadFunc_t)(struct threadArgs* args);
struct testThread
typedef testResult_t (*entryFunc_t)(struct testArgs* args);
struct testWorker
{
threadFunc_t func;
struct threadArgs args;
entryFunc_t func;
struct testArgs args;
};
// Provided by common.cu
extern testResult_t TimeTest(struct threadArgs* args);
extern testResult_t TimeTest(struct testArgs* args);
static void getHostName(char* hostname, int maxlen)
{