Rewrite mscclpp-test with cpp style API (#77)

- Rewrite mscclpp-test with cpp style API
- Add SM copy
- add new sendRecv test
This commit is contained in:
Binyang2014
2023-05-19 14:14:19 +08:00
committed by GitHub
parent 4c0883bc91
commit a3cf48cc5d
15 changed files with 795 additions and 1062 deletions

View File

@@ -94,6 +94,22 @@ struct DeviceChannel {
put(dst, offset, src, offset, size);
}
__forceinline__ __device__ void putDirect(void* dst, void* src, uint64_t dstOffset, uint64_t srcOffset, uint64_t size,
uint32_t threadId, uint32_t numThreads) {
// assume the memory is aligned to 8 bytes
uint64_t* srcAddr = (uint64_t*)((char*)src + srcOffset);
uint64_t* dstAddr = (uint64_t*)((char*)dst + dstOffset);
uint64_t ele;
size_t nElem = size % sizeof(uint64_t) ? (size + sizeof(uint64_t)) / sizeof(uint64_t) : size / sizeof(uint64_t);
for (size_t i = threadId; i < nElem; i += numThreads) {
// load to register first
ele = srcAddr[i];
dstAddr[i] = ele;
}
}
__forceinline__ __device__ void signalDirect() { epoch_.signalDirect(); }
__forceinline__ __device__ void signal() {
epochIncrement();
fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, channelId_).value);
@@ -212,6 +228,9 @@ struct SimpleDeviceChannel {
SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src) : devChan_(devChan), dst_(dst), src_(src) {}
SimpleDeviceChannel(DeviceChannel devChan, void* dstPtr, void* srcPtr)
: devChan_(devChan), srcPtr_(srcPtr), dstPtr_(dstPtr) {}
SimpleDeviceChannel(const SimpleDeviceChannel& other) = default;
SimpleDeviceChannel& operator=(SimpleDeviceChannel& other) = default;
@@ -224,8 +243,14 @@ struct SimpleDeviceChannel {
__forceinline__ __device__ void put(uint64_t offset, uint64_t size) { put(offset, offset, size); }
__forceinline__ __device__ void putDirect(uint64_t offset, uint64_t size, uint32_t threadId, uint32_t numThreads) {
devChan_.putDirect(dstPtr_, srcPtr_, offset, offset, size, threadId, numThreads);
}
__forceinline__ __device__ void signal() { devChan_.signal(); }
__forceinline__ __device__ void signalDirect() { devChan_.signalDirect(); }
__forceinline__ __device__ void putWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) {
devChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size);
}
@@ -251,6 +276,10 @@ struct SimpleDeviceChannel {
DeviceChannel devChan_;
MemoryId dst_;
MemoryId src_;
// these are used for direct copy
void* srcPtr_;
void* dstPtr_;
};
} // namespace channel

View File

@@ -0,0 +1,45 @@
#ifndef MSCCLPP_CONCURRENCY_HPP_
#define MSCCLPP_CONCURRENCY_HPP_
namespace mscclpp {
struct DeviceSyncer {
public:
DeviceSyncer() = default;
~DeviceSyncer() = default;
#ifdef __CUDACC__
// Synchronize multiple thread blocks inside a kernel. Guarantee that all
// previous work of all threads in cooperating blocks is finished.
__forceinline__ __device__ void sync(int blockNum) {
int maxOldCnt = blockNum - 1;
__syncthreads();
if (threadIdx.x == 0) {
int tmpIsAdd = isAdd_ ^ 1;
if (tmpIsAdd) {
if (atomicAdd(&count_, 1) == maxOldCnt) {
flag_ = 1;
}
while (!flag_) {
}
} else {
if (atomicSub(&count_, 1) == 1) {
flag_ = 0;
}
while (flag_) {
}
}
isAdd_ = tmpIsAdd;
}
// We need sync here because only a single thread is checking whether
// the flag is flipped.
__syncthreads();
}
#endif
private:
volatile int flag_;
int count_;
int isAdd_;
};
} // namespace mscclpp
#endif // MSCCLPP_CONCURRENCY_HPP_

View File

@@ -17,9 +17,9 @@ class BaseEpoch {
private:
std::shared_ptr<Connection> connection_;
RegisteredMemory localEpochIdsRegMem_;
NonblockingFuture<RegisteredMemory> remoteEpochIdsRegMem_;
protected:
NonblockingFuture<RegisteredMemory> remoteEpochIdsRegMem_;
std::unique_ptr<EpochIds, Deleter<EpochIds>> epochIds_;
std::unique_ptr<uint64_t, Deleter<uint64_t>> expectedInboundEpochId_;
@@ -56,9 +56,18 @@ class DeviceEpoch : BaseEpoch<CudaDeleter> {
}
__forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(epochIds->outbound) += 1; }
__forceinline__ __device__ void signalDirect() {
// This fence ensures that the writes from a preceding putDirect() are visible on the peer GPU before the
// incremented epoch id is visible.
__threadfence_system();
epochIncrement();
*(volatile uint64_t*)&(remoteEpochIds->inboundReplica) = epochIds->outbound;
}
#endif // __CUDACC__
EpochIds* epochIds;
EpochIds* remoteEpochIds;
uint64_t* expectedInboundEpochId;
};

View File

@@ -1,9 +1,12 @@
#ifndef MSCCLPP_UTILS_HPP_
#define MSCCLPP_UTILS_HPP_
#include <stdio.h>
#include <unistd.h>
#include <chrono>
#include <cstdio>
#include <cstring>
#include <string>
namespace mscclpp {
@@ -35,6 +38,18 @@ struct ScopedTimer {
~ScopedTimer() { timer.print(name); }
};
inline std::string getHostName(int maxlen, const char delim) {
std::string hostname(maxlen + 1, '\0');
if (gethostname(const_cast<char*>(hostname.data()), maxlen) != 0) {
std::strncpy(const_cast<char*>(hostname.data()), "unknown", maxlen);
throw;
}
int i = 0;
while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++;
hostname[i] = '\0';
return hostname;
}
} // namespace mscclpp
#endif // MSCCLPP_UTILS_HPP_

View File

@@ -1,13 +1,13 @@
#include "connection.hpp"
#include <algorithm>
#include <mscclpp/utils.hpp>
#include "checks_internal.hpp"
#include "debug.h"
#include "infiniband/verbs.h"
#include "npkit/npkit.h"
#include "registered_memory.hpp"
#include "utils.hpp"
namespace mscclpp {

View File

@@ -13,6 +13,7 @@ MSCCLPP_API_CPP void DeviceEpoch::signal() { BaseEpoch::signal(); }
MSCCLPP_API_CPP DeviceEpoch::DeviceHandle DeviceEpoch::deviceHandle() {
DeviceEpoch::DeviceHandle device;
device.remoteEpochIds = reinterpret_cast<EpochIds*>(remoteEpochIdsRegMem_.get().data());
device.epochIds = epochIds_.get();
device.expectedInboundEpochId = expectedInboundEpochId_.get();
return device;

View File

@@ -1,11 +1,11 @@
#include <atomic>
#include <mscclpp/core.hpp>
#include <mscclpp/proxy.hpp>
#include <mscclpp/utils.hpp>
#include <thread>
#include "api.h"
#include "utils.h"
#include "utils.hpp"
namespace mscclpp {

View File

@@ -22,3 +22,6 @@ add_executable(unit_tests)
target_link_libraries(unit_tests GTest::gtest_main GTest::gmock_main mscclpp CUDA::cudart CUDA::cuda_driver)
add_subdirectory(unit) # This adds the sources to the mscclpp target
gtest_discover_tests(unit_tests DISCOVERY_MODE PRE_TEST)
# Msccclpp_test
add_subdirectory(mscclpp-test)

View File

@@ -1,692 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "common.h"
#include "cuda.h"
#include "mscclpp.h"
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <string>
#include <type_traits>
#include <getopt.h>
#include <libgen.h>
#define NUM_BLOCKS 32
int is_main_proc = 0;
thread_local int is_main_thread = 0;
namespace {
class timer
{
std::uint64_t t0;
public:
timer();
double elapsed() const;
double reset();
};
std::uint64_t now()
{
using clock = std::chrono::steady_clock;
return std::chrono::duration_cast<std::chrono::nanoseconds>(clock::now().time_since_epoch()).count();
}
// Command line parameter defaults
size_t minBytes = 32 * 1024 * 1024;
size_t maxBytes = 32 * 1024 * 1024;
size_t stepBytes = 1 * 1024 * 1024;
size_t stepFactor = 1;
int datacheck = 1;
int warmup_iters = 10;
int iters = 100;
int timeout = 0;
int report_cputime = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
int average = 1;
int kernel_num = 0;
int cudaGraphLaunches = 15;
double parsesize(const char* value)
{
long long int units;
double size;
char size_lit;
int count = sscanf(value, "%lf %1s", &size, &size_lit);
switch (count) {
case 2:
switch (size_lit) {
case 'G':
case 'g':
units = 1024 * 1024 * 1024;
break;
case 'M':
case 'm':
units = 1024 * 1024;
break;
case 'K':
case 'k':
units = 1024;
break;
default:
return -1.0;
};
break;
case 1:
units = 1;
break;
default:
return -1.0;
}
return size * units;
}
inline testResult_t Barrier(struct testArgs* args)
{
int tmp[16];
// A simple barrier
MSCCLPPCHECK(mscclppBootstrapAllGather(args->comm, tmp, sizeof(int)));
return testSuccess;
}
} // namespace
timer::timer()
{
t0 = now();
}
double timer::elapsed() const
{
std::uint64_t t1 = now();
return 1.e-9 * (t1 - t0);
}
double timer::reset()
{
std::uint64_t t1 = now();
double ans = 1.e-9 * (t1 - t0);
t0 = t1;
return ans;
}
testResult_t AllocateBuffs(void** sendbuff, size_t sendBytes, void** recvbuff, size_t recvBytes, void** expected,
size_t nbytes)
{
CUDACHECK(cudaMalloc(sendbuff, nbytes));
CUDACHECK(cudaMalloc(recvbuff, nbytes));
if (datacheck)
CUDACHECK(cudaMalloc(expected, recvBytes));
return testSuccess;
}
testResult_t startColl(struct testArgs* args, int in_place, int iter)
{
size_t count = args->nbytes;
// Try to change offset for each iteration so that we avoid cache effects and catch race conditions in ptrExchange
size_t totalnbytes = max(args->sendBytes, args->expectedBytes);
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
size_t shift = totalnbytes * (iter % steps);
int rank = args->proc;
char* recvBuff = ((char*)args->recvbuff) + shift;
char* sendBuff = ((char*)args->sendbuff) + shift;
TESTCHECK(args->collTest->runColl((void*)(in_place ? recvBuff + args->sendInplaceOffset * rank : sendBuff),
(void*)(in_place ? recvBuff + args->recvInplaceOffset * rank : recvBuff),
args->nranksPerNode, count, args->comm, args->stream, args->kernel_num));
return testSuccess;
}
testResult_t testStreamSynchronize(cudaStream_t stream)
{
cudaError_t cudaErr;
timer tim;
while (true) {
cudaErr = cudaStreamQuery(stream);
if (cudaErr == cudaSuccess) {
break;
}
if (cudaErr != cudaErrorNotReady)
CUDACHECK(cudaErr);
double delta = tim.elapsed();
if (delta > timeout && timeout > 0) {
char hostname[1024];
getHostName(hostname, 1024);
printf("%s: Test timeout (%ds) %s:%d\n", hostname, timeout, __FILE__, __LINE__);
return testTimeout;
}
// We might want to let other threads (including MSCCLPP threads) use the CPU.
sched_yield();
}
return testSuccess;
}
testResult_t completeColl(struct testArgs* args)
{
TESTCHECK(testStreamSynchronize(args->stream));
return testSuccess;
}
// Inter process barrier+allreduce. The quality of the return value
// for average=0 is just value itself.
// Inter process barrier+allreduce. The quality of the return value
// for average=0 is just value itself.
template <typename T> void Allreduce(struct testArgs* args, T* value, int average)
{
T accumulator = *value;
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (average != 0) {
static_assert(std::is_same<T, long long>::value || std::is_same<T, double>::value,
"Allreduce<T> only for T in {long long, double}");
MPI_Datatype ty = std::is_same<T, long long>::value ? MPI_LONG_LONG
: std::is_same<T, double>::value ? MPI_DOUBLE
: MPI_Datatype();
MPI_Op op = average == 1 ? MPI_SUM
: average == 2 ? MPI_MIN
: average == 3 ? MPI_MAX
: average == 4 ? MPI_SUM
: MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, ty, op, MPI_COMM_WORLD);
}
#endif
if (average == 1)
accumulator /= args->totalProcs;
*value = accumulator;
}
testResult_t CheckData(struct testArgs* args, int64_t* wrongElts)
{
size_t count = args->expectedBytes / sizeof(int);
int* dataHostRecv = new int[count];
int* dataHostExpected = new int[count];
CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost));
CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost));
for (size_t i = 0; i < count; i++) {
if (dataHostRecv[i] != dataHostExpected[i]) {
// PRINT("Error: dataHostRecv[%ld] = %d, dataHostExpected[%ld] = %d\n", i, dataHostRecv[i], i,
// dataHostExpected[i]);
*wrongElts += 1;
}
}
if (args->reportErrors && *wrongElts) {
(args->error)++;
}
return testSuccess;
}
testResult_t BenchTime(struct testArgs* args, int in_place)
{
size_t count = args->nbytes;
TESTCHECK(args->collTest->initData(args, in_place));
// Sync
TESTCHECK(startColl(args, in_place, 0));
TESTCHECK(completeColl(args));
TESTCHECK(Barrier(args));
// Performance Benchmark
cudaGraph_t graph;
cudaGraphExec_t graphExec;
CUDACHECK(cudaStreamBeginCapture(args->stream, cudaStreamCaptureModeGlobal));
timer tim;
for (int iter = 0; iter < iters; iter++) {
TESTCHECK(startColl(args, in_place, iter));
}
CUDACHECK(cudaStreamEndCapture(args->stream, &graph));
CUDACHECK(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0));
// Launch the graph
TESTCHECK(Barrier(args));
tim.reset();
for (int l = 0; l < cudaGraphLaunches; ++l) {
CUDACHECK(cudaGraphLaunch(graphExec, args->stream));
}
double cputimeSec = tim.elapsed() / (iters);
TESTCHECK(completeColl(args));
double deltaSec = tim.elapsed();
deltaSec = deltaSec / (iters) / (cudaGraphLaunches);
Allreduce(args, &deltaSec, average);
CUDACHECK(cudaGraphExecDestroy(graphExec));
CUDACHECK(cudaGraphDestroy(graph));
double algBw, busBw;
args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs);
TESTCHECK(Barrier(args));
int64_t wrongElts = 0;
if (datacheck) {
// Initialize sendbuffs, recvbuffs and expected
TESTCHECK(args->collTest->initData(args, in_place));
// Begin cuda graph capture for data check
CUDACHECK(cudaStreamBeginCapture(args->stream, cudaStreamCaptureModeGlobal));
// test validation in single itertion, should ideally be included into the multi-iteration run
TESTCHECK(startColl(args, in_place, 0));
// End cuda graph capture
CUDACHECK(cudaStreamEndCapture(args->stream, &graph));
// Instantiate cuda graph
CUDACHECK(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0));
// Launch cuda graph
CUDACHECK(cudaGraphLaunch(graphExec, args->stream));
TESTCHECK(completeColl(args));
// destroy cuda graph
CUDACHECK(cudaGraphExecDestroy(graphExec));
CUDACHECK(cudaGraphDestroy(graph));
TESTCHECK(CheckData(args, &wrongElts));
// aggregate delta from all threads and procs
long long wrongElts1 = wrongElts;
Allreduce(args, &wrongElts1, /*sum*/ 4);
wrongElts = wrongElts1;
}
double timeUsec = (report_cputime ? cputimeSec : deltaSec) * 1.0E6;
char timeStr[100];
if (timeUsec >= 10000.0) {
sprintf(timeStr, "%7.0f", timeUsec);
} else if (timeUsec >= 100.0) {
sprintf(timeStr, "%7.1f", timeUsec);
} else {
sprintf(timeStr, "%7.2f", timeUsec);
}
if (!in_place) {
PRINT(" ");
}
if (args->reportErrors) {
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
} else {
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
}
args->bw += busBw;
args->bw_count++;
return testSuccess;
}
testResult_t setupArgsAndInit(size_t size, struct testArgs* args)
{
int nranks = args->totalProcs;
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
// TODO: support more data types
int typeSize = sizeof(int);
count = size / typeSize;
args->collTest->getCollByteCount(&sendCount, &recvCount, &paramCount, &sendInplaceOffset, &recvInplaceOffset,
(size_t)count, (size_t)nranks);
args->nbytes = paramCount * typeSize;
args->sendBytes = sendCount * typeSize;
args->expectedBytes = recvCount * typeSize;
args->sendInplaceOffset = sendInplaceOffset * typeSize;
args->recvInplaceOffset = recvInplaceOffset * typeSize;
return args->collTest->initColl();
}
testResult_t TimeTest(struct testArgs* args)
{
// Sync to avoid first-call timeout
TESTCHECK(Barrier(args));
// Warm-up for large size
TESTCHECK(setupArgsAndInit(args->maxbytes, args));
TESTCHECK(args->collTest->initData(args, 1));
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, 1, iter));
}
TESTCHECK(completeColl(args));
// Warm-up for small size
TESTCHECK(setupArgsAndInit(args->minbytes, args));
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, 1, iter));
}
TESTCHECK(completeColl(args));
PRINT("#\n");
PRINT("# %10s %12s in-place out-of-place \n", "", "");
PRINT("# %10s %12s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "time", "algbw", "busbw", "#wrong",
"time", "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "(us)", "(GB/s)", "(GB/s)", "",
"(us)", "(GB/s)", "(GB/s)", "");
// Benchmark
for (size_t size = args->minbytes; size <= args->maxbytes;
size = ((args->stepfactor > 1) ? size * args->stepfactor : size + args->stepbytes)) {
TESTCHECK(setupArgsAndInit(size, args));
PRINT("%12li %12li", max(args->sendBytes, args->expectedBytes), args->nbytes / sizeof(int));
TESTCHECK(BenchTime(args, args->in_place));
PRINT("\n");
}
return testSuccess;
}
testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode, mscclppComm_t comm, void* dataDst,
size_t dataSize)
{
int thisNode = rank / ranksPerNode;
int localRank = rank % ranksPerNode;
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
for (int r = 0; r < worldSize; ++r) {
if (r == rank)
continue;
mscclppTransport_t transportType;
const char* ibDev = ibDevStr.c_str();
if (r / ranksPerNode == thisNode) {
ibDev = NULL;
transportType = mscclppTransportP2P;
} else {
transportType = mscclppTransportIB;
}
// Connect with all other ranks
MSCCLPPCHECK(mscclppConnect(comm, r, 0, dataDst, dataSize, transportType, ibDev));
}
MSCCLPPCHECK(mscclppConnectionSetup(comm));
return testSuccess;
}
testResult_t runTests(struct testArgs* args)
{
PRINT("# Setting up the connection in MSCCL++\n");
if (mscclppTestEngine.setupMscclppConnections != nullptr) {
TESTCHECK(mscclppTestEngine.setupMscclppConnections(args));
} else {
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
args->maxbytes));
}
PRINT("# Launching MSCCL++ proxy threads\n");
MSCCLPPCHECK(mscclppProxyLaunch(args->comm));
TESTCHECK(mscclppTestEngine.runTest(args));
PRINT("Stopping MSCCL++ proxy threads\n");
MSCCLPPCHECK(mscclppProxyStop(args->comm));
if (mscclppTestEngine.teardownMscclppConnections != nullptr) {
TESTCHECK(mscclppTestEngine.teardownMscclppConnections());
}
return testSuccess;
}
testResult_t run(); // Main function
int main(int argc, char* argv[])
{
// Make sure everyline is flushed so that we see the progress of the test
setlinebuf(stdout);
// Parse args
double parsed;
int longindex;
static struct option longopts[] = {{"minbytes", required_argument, 0, 'b'},
{"maxbytes", required_argument, 0, 'e'},
{"stepbytes", required_argument, 0, 'i'},
{"stepfactor", required_argument, 0, 'f'},
{"iters", required_argument, 0, 'n'},
{"warmup_iters", required_argument, 0, 'w'},
{"check", required_argument, 0, 'c'},
{"timeout", required_argument, 0, 'T'},
{"cudagraph", required_argument, 0, 'G'},
{"report_cputime", required_argument, 0, 'C'},
{"average", required_argument, 0, 'a'},
{"kernel_num", required_argument, 0, 'k'},
{"help", no_argument, 0, 'h'},
{}};
while (1) {
int c;
c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:k:h:", longopts, &longindex);
if (c == -1)
break;
switch (c) {
case 'b':
parsed = parsesize(optarg);
if (parsed < 0) {
fprintf(stderr, "invalid size specified for 'minbytes'\n");
return -1;
}
minBytes = (size_t)parsed;
break;
case 'e':
parsed = parsesize(optarg);
if (parsed < 0) {
fprintf(stderr, "invalid size specified for 'maxbytes'\n");
return -1;
}
maxBytes = (size_t)parsed;
break;
case 'i':
stepBytes = strtol(optarg, NULL, 0);
break;
case 'f':
stepFactor = strtol(optarg, NULL, 0);
break;
case 'n':
iters = (int)strtol(optarg, NULL, 0);
break;
case 'w':
warmup_iters = (int)strtol(optarg, NULL, 0);
break;
case 'c':
datacheck = (int)strtol(optarg, NULL, 0);
break;
case 'T':
timeout = strtol(optarg, NULL, 0);
break;
case 'G':
cudaGraphLaunches = strtol(optarg, NULL, 0);
if (cudaGraphLaunches <= 0) {
fprintf(stderr, "invalid number for 'cudaGraphLaunches'\n");
return -1;
}
break;
case 'C':
report_cputime = strtol(optarg, NULL, 0);
break;
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
case 'k':
kernel_num = (int)strtol(optarg, NULL, 0);
break;
case 'h':
default:
if (c != 'h')
printf("invalid option '%c'\n", c);
printf("USAGE: %s \n\t"
"[-b,--minbytes <min size in bytes>] \n\t"
"[-e,--maxbytes <max size in bytes>] \n\t"
"[-i,--stepbytes <increment size>] \n\t"
"[-f,--stepfactor <increment factor>] \n\t"
"[-n,--iters <iteration count>] \n\t"
"[-w,--warmup_iters <warmup iteration count>] \n\t"
"[-c,--check <0/1>] \n\t"
"[-T,--timeout <time in seconds>] \n\t"
"[-G,--cudagraph <num graph launches>] \n\t"
"[-C,--report_cputime <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-k,--kernel_num <kernel number of commnication primitive>] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
}
}
if (minBytes > maxBytes) {
fprintf(stderr, "invalid sizes for 'minbytes' and 'maxbytes': %llu > %llu\n", (unsigned long long)minBytes,
(unsigned long long)maxBytes);
return -1;
}
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Init(&argc, &argv);
#endif
TESTCHECK(run());
return 0;
}
testResult_t run()
{
int totalProcs = 1, proc = 0;
int nranksPerNode = 0, localRank = 0;
char hostname[1024];
getHostName(hostname, 1024);
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Comm_size(MPI_COMM_WORLD, &totalProcs);
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
MPI_Comm shmcomm;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
MPI_Comm_size(shmcomm, &nranksPerNode);
MPI_Comm_free(&shmcomm);
localRank = proc % nranksPerNode;
#endif
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
PRINT("# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d graph: %d, "
"kernel num: %d\n",
minBytes, maxBytes, (stepFactor > 1) ? stepFactor : stepBytes, (stepFactor > 1) ? "factor" : "bytes",
warmup_iters, iters, datacheck, cudaGraphLaunches, kernel_num);
PRINT("#\n");
PRINT("# Using devices\n");
#define MAX_LINE 2048
char line[MAX_LINE];
int len = 0;
size_t maxMem = ~0;
int cudaDev = localRank;
int rank = proc;
cudaDeviceProp prop;
char busIdChar[] = "00000000:00:00.0";
CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev));
CUDACHECK(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev));
len += snprintf(line + len, MAX_LINE - len, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n", rank, getpid(),
hostname, cudaDev, busIdChar, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
#if MSCCLPP_USE_MPI_FOR_TESTS
char* lines = (proc == 0) ? (char*)malloc(totalProcs * MAX_LINE) : NULL;
// Gather all output in rank order to root (0)
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
if (proc == 0) {
for (int p = 0; p < totalProcs; p++)
PRINT("%s", lines + MAX_LINE * p);
free(lines);
}
MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
#else
PRINT("%s", line);
#endif
// We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
size_t memMaxBytes = (maxMem - (1 << 30)) / (datacheck ? 3 : 2);
if (maxBytes > memMaxBytes) {
maxBytes = memMaxBytes;
if (proc == 0)
printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
}
cudaStream_t stream;
void* sendbuff;
void* recvbuff;
void* expected;
size_t sendBytes, recvBytes;
mscclppTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)totalProcs);
CUDACHECK(cudaSetDevice(cudaDev));
TESTCHECK(AllocateBuffs(&sendbuff, sendBytes, &recvbuff, recvBytes, &expected, (size_t)maxBytes));
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
PRINT("#\n");
PRINT("# Initializing MSCCL++\n");
mscclppUniqueId mscclppId;
if (proc == 0)
MSCCLPPCHECK(mscclppGetUniqueId(&mscclppId));
MPI_Bcast((void*)&mscclppId, sizeof(mscclppId), MPI_BYTE, 0, MPI_COMM_WORLD);
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRankFromId(&comm, totalProcs, mscclppId, rank));
double* delta;
CUDACHECK(cudaHostAlloc(&delta, sizeof(double) * NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped));
fflush(stdout);
struct testWorker worker;
worker.args.minbytes = minBytes;
worker.args.maxbytes = maxBytes;
worker.args.stepbytes = stepBytes;
worker.args.stepfactor = stepFactor;
worker.args.localRank = localRank;
worker.args.nranksPerNode = nranksPerNode;
worker.args.in_place = 1;
worker.args.totalProcs = totalProcs;
worker.args.proc = proc;
worker.args.gpuNum = cudaDev;
worker.args.kernel_num = kernel_num;
worker.args.sendbuff = sendbuff;
worker.args.recvbuff = recvbuff;
worker.args.expected = expected;
worker.args.comm = comm;
worker.args.stream = stream;
worker.args.error = 0;
worker.args.bw = 0.0;
worker.args.bw_count = 0;
worker.args.reportErrors = datacheck;
worker.func = runTests;
TESTCHECK(worker.func(&worker.args));
MSCCLPPCHECK(mscclppCommDestroy(comm));
// Free off CUDA allocated memory
if (sendbuff)
CUDACHECK(cudaFree((char*)sendbuff));
if (recvbuff)
CUDACHECK(cudaFree((char*)recvbuff));
if (datacheck)
CUDACHECK(cudaFree(expected));
CUDACHECK(cudaFreeHost(delta));
int error = worker.args.error;
#if MSCCLPP_USE_MPI_FOR_TESTS
MPI_Allreduce(MPI_IN_PLACE, &error, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
#endif
PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK");
PRINT("#\n");
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Finalize();
#endif
return testSuccess;
}

View File

@@ -1,163 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_TESTS_COMMON_H_
#define MSCCLPP_TESTS_COMMON_H_
#include "mscclpp.h"
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include <mpi.h>
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#define CUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf("%s: Test CUDA failure %s:%d '%s'\n", hostname, __FILE__, __LINE__, cudaGetErrorString(err)); \
return testCudaError; \
} \
} while (0)
// Propagate errors up
#define MSCCLPPCHECK(cmd) \
do { \
mscclppResult_t res = cmd; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf("%s: Failure at %s:%d -> %s\n", hostname, __FILE__, __LINE__, mscclppGetErrorString(res)); \
return testMcclppError; \
} \
} while (0);
// Relay errors up and trace
#define TESTCHECK(cmd) \
do { \
testResult_t r = cmd; \
if (r != testSuccess) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf(" .. %s pid %d: Test failure %s:%d\n", hostname, getpid(), __FILE__, __LINE__); \
return r; \
} \
} while (0)
typedef enum
{
testSuccess = 0,
testInternalError = 1,
testCudaError = 2,
testMcclppError = 3,
testTimeout = 4,
testNumResults = 5
} testResult_t;
inline testResult_t defaultInitColl()
{
return testSuccess;
}
struct testColl
{
const char name[20];
void (*getCollByteCount)(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
size_t* recvInplaceOffset, size_t count, int nranks);
testResult_t (*initColl)();
testResult_t (*initData)(struct testArgs* args, int in_place);
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
testResult_t (*runColl)(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
cudaStream_t stream, int kernel_num);
};
struct testEngine
{
void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks);
// We can add more parameters for other communication primitives
testResult_t (*runTest)(struct testArgs* args);
testResult_t (*setupMscclppConnections)(struct testArgs* args);
testResult_t (*teardownMscclppConnections)();
};
extern struct testEngine mscclppTestEngine;
struct testArgs
{
size_t nbytes;
size_t minbytes;
size_t maxbytes;
size_t stepbytes;
size_t stepfactor;
int totalProcs;
int proc;
int gpuNum;
int localRank;
int nranksPerNode;
int kernel_num;
int in_place;
void* sendbuff;
size_t sendBytes;
size_t sendInplaceOffset;
void* recvbuff;
size_t recvInplaceOffset;
mscclppComm_t comm;
cudaStream_t stream;
void* expected;
size_t expectedBytes;
int error;
double bw;
int bw_count;
int reportErrors;
struct testColl* collTest;
};
typedef testResult_t (*entryFunc_t)(struct testArgs* args);
struct testWorker
{
entryFunc_t func;
struct testArgs args;
};
// Provided by common.cu
extern testResult_t TimeTest(struct testArgs* args);
static void getHostName(char* hostname, int maxlen)
{
gethostname(hostname, maxlen);
for (int i = 0; i < maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
inline void print_usage(const char* prog)
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
printf("usage: %s IP:PORT rank nranks\n", prog);
#endif
}
#define PRINT \
if (is_main_thread) \
printf
#endif // MSCCLPP_TESTS_COMMON_H_

View File

@@ -0,0 +1,2 @@
add_executable(sendrecv_test_perf sendrecv_test.cu common.cu)
target_link_libraries(sendrecv_test_perf mscclpp MPI::MPI_CXX)

408
test/mscclpp-test/common.cu Normal file
View File

@@ -0,0 +1,408 @@
#include <cuda.h>
#include <getopt.h>
#include <libgen.h>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <mscclpp/utils.hpp>
#include <string>
#include <type_traits>
#include "common.hpp"
int is_main_proc = 0;
mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2,
mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5,
mscclpp::Transport::IB6, mscclpp::Transport::IB7};
namespace {
// Command line parameter defaults
size_t minBytes = 32 * 1024 * 1024;
size_t maxBytes = 32 * 1024 * 1024;
size_t stepBytes = 1 * 1024 * 1024;
size_t stepFactor = 1;
int datacheck = 1;
int warmup_iters = 10;
int iters = 20;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
int average = 1;
int kernel_num = 0;
int cudaGraphLaunches = 15;
double parseSize(const char* value) {
long long int units;
double size;
char size_lit;
int count = sscanf(value, "%lf %1s", &size, &size_lit);
switch (count) {
case 2:
switch (size_lit) {
case 'G':
case 'g':
units = 1024 * 1024 * 1024;
break;
case 'M':
case 'm':
units = 1024 * 1024;
break;
case 'K':
case 'k':
units = 1024;
break;
default:
return -1.0;
};
break;
case 1:
units = 1;
break;
default:
return -1.0;
}
return size * units;
}
double allreduceTime(int worldSize, double value, int average)
{
double accumulator = value;
if (average != 0) {
MPI_Op op = average == 1 ? MPI_SUM
: average == 2 ? MPI_MIN
: average == 3 ? MPI_MAX
: average == 4 ? MPI_SUM
: MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, MPI_DOUBLE, op, MPI_COMM_WORLD);
}
if (average == 1)
accumulator /= worldSize;
return accumulator;
}
} // namespace
BaseTestEngine::BaseTestEngine(bool inPlace) : error_(0), inPlace_(inPlace) {
this->coll_ = getTestColl();
CUDATHROW(cudaStreamCreateWithFlags(&this->stream_, cudaStreamNonBlocking));
}
BaseTestEngine::~BaseTestEngine() { cudaStreamDestroy(stream_); }
double BaseTestEngine::benchTime() {
// Performance Benchmark
cudaGraph_t graph;
cudaGraphExec_t graphExec;
CUDATHROW(cudaStreamBeginCapture(stream_, cudaStreamCaptureModeGlobal));
mscclpp::Timer timer;
for (int iter = 0; iter < iters; iter++) {
coll_->runColl(args_, stream_);
}
CUDATHROW(cudaStreamEndCapture(stream_, &graph));
CUDATHROW(cudaGraphInstantiate(&graphExec, graph, nullptr, nullptr, 0));
this->barrier();
timer.reset();
for (int l = 0; l < cudaGraphLaunches; ++l) {
CUDATHROW(cudaGraphLaunch(graphExec, stream_));
}
CUDATHROW(cudaStreamSynchronize(stream_));
double deltaSec = timer.elapsed() * 1.e-6;
deltaSec = deltaSec / (iters) / (cudaGraphLaunches);
// all-reduce to get the average time
allreduceTime(args_.totalRanks, deltaSec, average);
CUDATHROW(cudaGraphExecDestroy(graphExec));
CUDATHROW(cudaGraphDestroy(graph));
return deltaSec;
}
void BaseTestEngine::barrier() {
this->comm_->bootstrapper()->barrier();
}
void BaseTestEngine::runTest()
{
// warm-up for large size
this->coll_->setupCollTest(args_.maxBytes, sizeof(int));
this->barrier();
for (int iter = 0; iter < warmup_iters; iter++) {
this->coll_->runColl(args_, stream_);
}
CUDATHROW(cudaDeviceSynchronize());
// warm-up for small size
this->coll_->setupCollTest(args_.minBytes, sizeof(int));
this->barrier();
for (int iter = 0; iter < warmup_iters; iter++) {
this->coll_->runColl(args_, stream_);
}
CUDATHROW(cudaDeviceSynchronize());
PRINT("#\n");
PRINT("# %10s %12s in-place out-of-place \n", "", "");
PRINT("# %10s %12s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "time", "algbw", "busbw", "#wrong",
"time", "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "(us)", "(GB/s)", "(GB/s)", "",
"(us)", "(GB/s)", "(GB/s)", "");
// Benchmark
for (size_t size = args_.minBytes; size <= args_.maxBytes;
size = ((args_.stepFactor > 1) ? size * args_.stepFactor : size + args_.stepBytes)) {
coll_->setupCollTest(size, sizeof(int));
this->coll_->initData(this->args_, this->getSendBuff(), this->getExpectedBuff());
PRINT("%12li %12li", max(coll_->getSendBytes(), coll_->getExpectedBytes()), coll_->getParamBytes() / sizeof(int));
double deltaSec = benchTime();
size_t nErrors = 0;
if (args_.reportErrors) {
this->coll_->setupCollTest(size, sizeof(int));
this->coll_->initData(this->args_, this->getSendBuff(), this->getExpectedBuff());
this->barrier();
this->coll_->runColl(args_, stream_);
CUDATHROW(cudaDeviceSynchronize());
nErrors = this->checkData();
if (nErrors > 0) {
this->error_++;
}
MPI_Allreduce(MPI_IN_PLACE, &nErrors, 1, MPI_LONG, MPI_SUM, MPI_COMM_WORLD);
}
double timeUsec = deltaSec * 1e6;
char timeStr[100];
if (timeUsec >= 10000.0) {
sprintf(timeStr, "%7.0f", timeUsec);
} else if (timeUsec >= 100.0) {
sprintf(timeStr, "%7.1f", timeUsec);
} else {
sprintf(timeStr, "%7.2f", timeUsec);
}
double algBw, busBw;
this->coll_->getBw(deltaSec, algBw, busBw);
if (!this->inPlace_) {
PRINT(" ");
}
if (args_.reportErrors) {
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)nErrors);
} else {
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
}
PRINT("\n");
}
PRINT("\n");
}
void BaseTestEngine::bootstrap(const TestArgs& args) {
this->args_ = args;
auto bootstrap = std::make_shared<mscclpp::Bootstrap>(args_.rank, args_.totalRanks);
mscclpp::UniqueId id;
if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId();
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
bootstrap->initialize(id);
comm_ = std::make_shared<mscclpp::Communicator>(bootstrap);
chanService_ = std::make_shared<mscclpp::channel::DeviceChannelService>(*comm_);
}
void BaseTestEngine::setupTest() {
this->setupConnections();
this->chanService_->startProxy();
}
size_t BaseTestEngine::checkData() {
size_t nErrors = 0;
void* recvBuff = this->getRecvBuff();
void* expectedBuff = this->getExpectedBuff();
size_t recvBytes = this->coll_->getRecvBytes();
std::vector<int> recvData(recvBytes / sizeof(int), 0);
CUDATHROW(cudaMemcpy(recvData.data(), recvBuff, recvBytes, cudaMemcpyDeviceToHost));
for (size_t i = 0; i < recvData.size(); i++) {
if (recvData[i] != ((int*)expectedBuff)[i]) {
nErrors++;
}
}
return nErrors;
}
void run(int argc, char* argv[]);
int main(int argc, char* argv[]) {
// Make sure everyline is flushed so that we see the progress of the test
setlinebuf(stdout);
// Parse args
double parsed;
int longindex;
static option longopts[] = {{"minbytes", required_argument, 0, 'b'},
{"maxbytes", required_argument, 0, 'e'},
{"stepbytes", required_argument, 0, 'i'},
{"stepfactor", required_argument, 0, 'f'},
{"iters", required_argument, 0, 'n'},
{"warmup_iters", required_argument, 0, 'w'},
{"check", required_argument, 0, 'c'},
{"cudagraph", required_argument, 0, 'G'},
{"average", required_argument, 0, 'a'},
{"kernel_num", required_argument, 0, 'k'},
{"help", no_argument, 0, 'h'},
{}};
while (1) {
int c;
c = getopt_long(argc, argv, "b:e:i:f:n:w:c:G:a:k:h:", longopts, &longindex);
if (c == -1) break;
switch (c) {
case 'b':
parsed = parseSize(optarg);
if (parsed < 0) {
fprintf(stderr, "invalid size specified for 'minbytes'\n");
return -1;
}
minBytes = (size_t)parsed;
break;
case 'e':
parsed = parseSize(optarg);
if (parsed < 0) {
fprintf(stderr, "invalid size specified for 'maxbytes'\n");
return -1;
}
maxBytes = (size_t)parsed;
break;
case 'i':
stepBytes = strtol(optarg, NULL, 0);
break;
case 'f':
stepFactor = strtol(optarg, NULL, 0);
break;
case 'n':
iters = (int)strtol(optarg, NULL, 0);
break;
case 'w':
warmup_iters = (int)strtol(optarg, NULL, 0);
break;
case 'c':
datacheck = (int)strtol(optarg, NULL, 0);
break;
case 'G':
cudaGraphLaunches = strtol(optarg, NULL, 0);
if (cudaGraphLaunches <= 0) {
fprintf(stderr, "invalid number for 'cudaGraphLaunches'\n");
return -1;
}
break;
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
case 'k':
kernel_num = (int)strtol(optarg, NULL, 0);
break;
case 'h':
default:
if (c != 'h') printf("invalid option '%c'\n", c);
printf(
"USAGE: %s \n\t"
"[-b,--minbytes <min size in bytes>] \n\t"
"[-e,--maxbytes <max size in bytes>] \n\t"
"[-i,--stepbytes <increment size>] \n\t"
"[-f,--stepfactor <increment factor>] \n\t"
"[-n,--iters <iteration count>] \n\t"
"[-w,--warmup_iters <warmup iteration count>] \n\t"
"[-c,--check <0/1>] \n\t"
"[-T,--timeout <time in seconds>] \n\t"
"[-G,--cudagraph <num graph launches>] \n\t"
"[-C,--report_cputime <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-k,--kernel_num <kernel number of commnication primitive>] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
}
}
if (minBytes > maxBytes) {
std::cerr << "invalid sizes for 'minbytes' and 'maxbytes': " << minBytes << " > " << maxBytes << std::endl;
return -1;
}
run(argc, argv);
return 0;
}
void run(int argc, char* argv[]) {
int totalRanks = 1, rank = 0;
int nRanksPerNode = 0, localRank = 0;
std::string hostname = mscclpp::getHostName(1024, '.');
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &totalRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm shmcomm;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
MPI_Comm_size(shmcomm, &nRanksPerNode);
MPI_Comm_free(&shmcomm);
localRank = rank % nRanksPerNode;
is_main_proc = (rank == 0) ? 1 : 0;
PRINT(
"# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d graph: %d, "
"kernel num: %d\n",
minBytes, maxBytes, (stepFactor > 1) ? stepFactor : stepBytes, (stepFactor > 1) ? "factor" : "bytes",
warmup_iters, iters, datacheck, cudaGraphLaunches, kernel_num);
PRINT("#\n");
PRINT("# Using devices\n");
constexpr int MAX_LINE = 2048;
char line[MAX_LINE];
int len = 0;
size_t maxMem = ~0;
int cudaDev = localRank;
cudaDeviceProp prop;
char busIdChar[] = "00000000:00:00.0";
CUDATHROW(cudaGetDeviceProperties(&prop, cudaDev));
CUDATHROW(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev));
len += snprintf(line + len, MAX_LINE - len, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n", rank, getpid(),
hostname.c_str(), cudaDev, busIdChar, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
std::shared_ptr<char[]> lines(new char[totalRanks * MAX_LINE]);
// Gather all output in rank order to root (0)
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines.get(), MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
if (rank == 0) {
for (int r = 0; r < totalRanks; r++) PRINT("%s", &lines[MAX_LINE * r]);
}
MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
// We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
size_t memMaxBytes = (maxMem - (1 << 30)) / (datacheck ? 3 : 2);
if (maxBytes > memMaxBytes) {
maxBytes = memMaxBytes;
PRINT("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
}
CUDATHROW(cudaSetDevice(cudaDev));
TestArgs args = {minBytes, maxBytes, stepBytes, stepFactor, totalRanks, rank,
cudaDev, localRank, nRanksPerNode, kernel_num, datacheck};
PRINT("#\n");
PRINT("# Initializing MSCCL++\n");
auto testEngine = getTestEngine();
testEngine->bootstrap(args);
testEngine->allocateBuffer();
PRINT("# Setting up the connection in MSCCL++\n");
testEngine->setupTest();
testEngine->barrier();
testEngine->runTest();
fflush(stdout);
int error = testEngine->getTestErrors();
MPI_Allreduce(MPI_IN_PLACE, &error, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK");
PRINT("#\n");
MPI_Finalize();
}

View File

@@ -0,0 +1,98 @@
#ifndef MSCCLPP_TESTS_COMMON_H_
#define MSCCLPP_TESTS_COMMON_H_
#include <mpi.h>
#include <unistd.h>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <mscclpp/channel.hpp>
#include <mscclpp/core.hpp>
#define CUDATHROW(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
std::string msg = std::string("Test CUDA failure: ") + std::string(__FILE__) + ":" + std::to_string(__LINE__) + \
" '" + cudaGetErrorString(err) + "'"; \
throw std::runtime_error(msg); \
} \
} while (0)
struct TestArgs {
size_t minBytes;
size_t maxBytes;
size_t stepBytes;
size_t stepFactor;
int totalRanks;
int rank;
int gpuNum;
int localRank;
int nRanksPerNode;
int kernelNum;
int reportErrors;
};
class BaseTestColl {
public:
BaseTestColl() {}
virtual ~BaseTestColl() {}
virtual void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) = 0;
virtual void setupCollTest(size_t size, size_t typeSize) = 0;
virtual void runColl(const TestArgs& args, cudaStream_t stream) = 0;
virtual void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) = 0;
size_t getSendBytes() { return sendCount_ * typeSize_; }
size_t getRecvBytes() { return recvCount_ * typeSize_; }
size_t getExpectedBytes() { return expectedCount_ * typeSize_; }
size_t getParamBytes() { return paramCount_ * typeSize_; }
protected:
size_t sendCount_;
size_t recvCount_;
size_t expectedCount_;
size_t paramCount_;
int typeSize_;
};
class BaseTestEngine {
public:
BaseTestEngine(bool inPlace);
virtual ~BaseTestEngine();
virtual void allocateBuffer() = 0;
int getTestErrors() {return error_;}
void setupTest();
void bootstrap(const TestArgs& args);
void runTest();
void barrier();
size_t checkData();
private:
virtual void setupConnections() = 0;
virtual std::vector<void*> getSendBuff() = 0;
virtual void* getExpectedBuff() = 0;
virtual void* getRecvBuff() = 0;
double benchTime();
protected:
TestArgs args_;
std::shared_ptr<BaseTestColl> coll_;
std::shared_ptr<mscclpp::Communicator> comm_;
std::shared_ptr<mscclpp::channel::DeviceChannelService> chanService_;
cudaStream_t stream_;
int error_;
bool inPlace_;
};
extern std::shared_ptr<BaseTestEngine> getTestEngine();
extern std::shared_ptr<BaseTestColl> getTestColl();
extern mscclpp::Transport IBs[];
#define PRINT \
if (is_main_proc) printf
#endif // MSCCLPP_TESTS_COMMON_H_

View File

@@ -0,0 +1,181 @@
#include <algorithm>
#include <cassert>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <mscclpp/channel.hpp>
#include <mscclpp/concurrency.hpp>
#include <mscclpp/epoch.hpp>
#include <string>
#include <vector>
#include "common.hpp"
constexpr size_t BLOCK_THREADS_NUM = 1024;
// Try to use more blocks if per-block data size exceeds this threshold
constexpr size_t THRES_BYTES_PER_BLOCK = 8192;
// Let it no more than the number of SMs on a GPU
constexpr size_t MAX_BLOCKS_NUM = 32;
#define ALIGN 4
__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[2];
inline int getBlockNum(size_t count) {
return std::min((count + THRES_BYTES_PER_BLOCK - 1) / THRES_BYTES_PER_BLOCK, MAX_BLOCKS_NUM);
}
inline mscclpp::Transport getTransport(int rank, int peerRank, int nRanksPerNode, mscclpp::Transport ibDevice) {
return rank / nRanksPerNode == peerRank / nRanksPerNode ? mscclpp::Transport::CudaIpc : ibDevice;
}
__device__ mscclpp::DeviceSyncer deviceSyncer;
__global__ void kernel(int rank, size_t dataSize, size_t dataPerBlock) {
size_t startIndex = blockIdx.x * dataPerBlock;
size_t blockDataSize = min(dataSize - startIndex, dataPerBlock);
int globalIndex = blockIdx.x * blockDim.x + threadIdx.x;
mscclpp::channel::SimpleDeviceChannel sendConn = constDevChans[0];
mscclpp::channel::SimpleDeviceChannel recvConn = constDevChans[1];
sendConn.putDirect(startIndex, blockDataSize, threadIdx.x, blockDim.x);
deviceSyncer.sync(gridDim.x);
if (globalIndex == 0) {
sendConn.signalDirect();
recvConn.wait();
}
}
class SendRecvTestColl : public BaseTestColl {
public:
SendRecvTestColl() = default;
~SendRecvTestColl() override = default;
void runColl(const TestArgs& args, cudaStream_t stream) override;
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override;
void setupCollTest(size_t size, size_t typeSize) override;
};
void SendRecvTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
size_t sendBytes = sendCount_ * typeSize_;
int blockNum = getBlockNum(sendBytes);
size_t bytesPerBlock = (sendBytes + blockNum - 1) / blockNum;
kernel<<<blockNum, BLOCK_THREADS_NUM, 0, stream>>>(args.rank, sendBytes, bytesPerBlock);
}
void SendRecvTestColl::getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) {
double baseBw = (double)(paramCount_ * typeSize_) / 1.0E9 / deltaSec;
algBW = baseBw;
double factor = 1;
busBw = baseBw * factor;
}
void SendRecvTestColl::initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) {
int rank = args.rank;
assert(sendBuff.size() == 1);
CUDATHROW(cudaMemset(sendBuff[0], 0, sendCount_ * typeSize_));
// TODO: The type should not limited to int.
std::vector<int> dataHost(std::max(sendCount_, recvCount_), rank);
CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), sendCount_ * typeSize_, cudaMemcpyHostToDevice));
int peerRank = (rank - 1 + args.totalRanks) % args.totalRanks;
for (size_t i = 0; i < recvCount_; i++) {
dataHost[i] = peerRank;
}
std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_);
}
void SendRecvTestColl::setupCollTest(size_t size, size_t typeSize) {
size_t count = size / typeSize;
size_t base = (count / ALIGN) * ALIGN;
sendCount_ = base;
recvCount_ = base;
paramCount_ = base;
expectedCount_ = base;
typeSize_ = typeSize;
mscclpp::DeviceSyncer syncer = {};
CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer)));
}
class SendRecvTestEngine : public BaseTestEngine {
public:
SendRecvTestEngine() : BaseTestEngine(false){};
~SendRecvTestEngine() override = default;
void allocateBuffer() override;
void setupConnections() override;
private:
std::vector<void*> getSendBuff() override;
void* getExpectedBuff() override;
void* getRecvBuff() override;
std::vector<std::shared_ptr<int>> devicePtrs_;
std::shared_ptr<int[]> expectedBuff_;
};
void SendRecvTestEngine::allocateBuffer() {
std::shared_ptr<int> sendBuff = mscclpp::makeSharedCuda<int>(args_.maxBytes / sizeof(int));
std::shared_ptr<int> recvBuff = mscclpp::makeSharedCuda<int>(args_.maxBytes / sizeof(int));
devicePtrs_.push_back(sendBuff);
devicePtrs_.push_back(recvBuff);
expectedBuff_ = std::shared_ptr<int[]>(new int[args_.maxBytes / sizeof(int)]);
}
void SendRecvTestEngine::setupConnections() {
auto ibDevice = IBs[args_.localRank];
int worldSize = args_.totalRanks;
int sendToRank = (args_.rank + 1) % worldSize;
int recvFromRank = (args_.rank - 1 + worldSize) % worldSize;
std::array<int, 2> ranks = {sendToRank, recvFromRank};
std::vector<mscclpp::channel::ChannelId> chanIds;
chanIds.push_back(chanService_->addChannel(
comm_->connectOnSetup(sendToRank, 0, getTransport(args_.rank, sendToRank, args_.nRanksPerNode, ibDevice))));
if (recvFromRank != sendToRank) {
chanIds.push_back(chanService_->addChannel(
comm_->connectOnSetup(recvFromRank, 0, getTransport(args_.rank, recvFromRank, args_.nRanksPerNode, ibDevice))));
} else {
// reuse the send channel if worldSize is 2
chanIds.push_back(chanIds[0]);
}
comm_->setup();
std::vector<mscclpp::RegisteredMemory> localMemories;
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> futureRemoteMemory;
for (int i : {0, 1}) {
auto regMem = comm_->registerMemory(devicePtrs_[i].get(), args_.maxBytes, mscclpp::Transport::CudaIpc | ibDevice);
comm_->sendMemoryOnSetup(regMem, ranks[i], 0);
localMemories.push_back(regMem);
futureRemoteMemory.push_back(comm_->recvMemoryOnSetup(ranks[1 - i], 0));
comm_->setup();
}
// swap to make sure devicePtrs_[0] in local rank write to devicePtrs_[1] in remote rank
std::swap(futureRemoteMemory[0], futureRemoteMemory[1]);
std::vector<mscclpp::channel::SimpleDeviceChannel> devChannels;
for (int i : {0, 1}) {
// We assume ranks in the same node
devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(
chanService_->deviceChannel(chanIds[i]), futureRemoteMemory[i].get().data(), localMemories[i].data()));
}
cudaMemcpyToSymbol(constDevChans, devChannels.data(),
sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size());
}
std::vector<void*> SendRecvTestEngine::getSendBuff() { return {devicePtrs_[0].get()}; }
void* SendRecvTestEngine::getExpectedBuff() { return expectedBuff_.get(); }
void* SendRecvTestEngine::getRecvBuff() { return devicePtrs_[1].get(); }
std::shared_ptr<BaseTestEngine> getTestEngine() { return std::make_shared<SendRecvTestEngine>(); }
std::shared_ptr<BaseTestColl> getTestColl() { return std::make_shared<SendRecvTestColl>(); }

View File

@@ -1,203 +0,0 @@
#include "comm.h"
#include "common.h"
#include <algorithm>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <unistd.h>
constexpr size_t BLOCK_THREADS_NUM = 1024;
// Try to use more blocks if per-block data size exceeds this threshold
constexpr size_t THRES_BYTES_PER_BLOCK = 8192;
// Let it no more than the number of SMs on a GPU
constexpr size_t MAX_BLOCKS_NUM = 32;
#define ALIGN 4
__constant__ mscclppDevConn_t sendConnConst;
__constant__ mscclppDevConn_t recvConnConst;
struct SyncGpuState
{
volatile int flag;
int cnt;
int is_add;
};
// Synchronize multiple thread blocks inside a kernel. Guarantee that all
// previous work of all threads in cooperating blocks is finished and
// visible to all threads in the device.
__forceinline__ __device__ void sync_gpu(SyncGpuState& state, int blockNum)
{
int maxOldCnt = blockNum - 1;
__syncthreads();
if (threadIdx.x == 0) {
int is_add_ = state.is_add ^ 1;
if (is_add_) {
if (atomicAdd(&state.cnt, 1) == maxOldCnt) {
state.flag = 1;
}
while (!state.flag) {
}
} else {
if (atomicSub(&state.cnt, 1) == 1) {
state.flag = 0;
}
while (state.flag) {
}
}
state.is_add = is_add_;
}
// We need sync here because only a single thread is checking whether
// the flag is flipped.
__syncthreads();
}
inline int getSendTag(int rank, int peer)
{
return rank < peer ? 0 : 1;
}
inline int getRecvTag(int rank, int peer)
{
return rank < peer ? 1 : 0;
}
inline int getBlockNum(size_t count)
{
return std::min((count + THRES_BYTES_PER_BLOCK - 1) / THRES_BYTES_PER_BLOCK, MAX_BLOCKS_NUM);
}
__device__ SyncGpuState GLOBAL_SYNC_STATE;
__global__ void kernel(int rank, size_t dataSize, size_t dataPerBlock)
{
mscclppDevConn_t sendConn = sendConnConst;
mscclppDevConn_t recvConn = recvConnConst;
size_t startIndex = blockIdx.x * dataPerBlock;
size_t blockDataSize = min(dataSize - startIndex, dataPerBlock);
int tid = blockIdx.x * blockDim.x + threadIdx.x;
sendConn.putDirect(startIndex, blockDataSize, threadIdx.x, blockDim.x);
sync_gpu(GLOBAL_SYNC_STATE, gridDim.x);
if (tid == 0) {
sendConn.signalDirect();
recvConn.waitDirect();
}
}
void SendRecvGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
size_t* recvInplaceOffset, size_t count, int nranks)
{
size_t base = (count / ALIGN) * ALIGN;
*sendcount = base;
*recvcount = base;
*sendInplaceOffset = base;
*recvInplaceOffset = 0;
*paramcount = base;
}
testResult_t SendRecvInitColl()
{
SyncGpuState state = {};
CUDACHECK(cudaMemcpyToSymbol(GLOBAL_SYNC_STATE, &state, sizeof(SyncGpuState)));
return testSuccess;
}
testResult_t SendRecvInitData(struct testArgs* args, int in_place)
{
size_t sendCount = args->sendBytes / sizeof(int);
size_t recvCount = args->expectedBytes / sizeof(int);
size_t maxCount = std::max(sendCount, recvCount);
int rank = args->proc;
CUDACHECK(cudaMemset(args->sendbuff, 0, args->sendBytes));
std::vector<int> dataHost(maxCount, rank);
CUDACHECK(cudaMemcpy(args->sendbuff, dataHost.data(), sendCount * sizeof(int), cudaMemcpyHostToDevice));
int recvPeerRank = (rank - 1 + args->totalProcs) % args->totalProcs;
for (size_t i = 0; i < recvCount; i++) {
dataHost[i] = recvPeerRank;
}
CUDACHECK(cudaMemcpy(args->expected, dataHost.data(), recvCount * sizeof(int), cudaMemcpyHostToDevice));
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
return testSuccess;
}
void SendRecvGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
{
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = 1;
*busBw = baseBw * factor;
}
testResult_t SendRecvRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
cudaStream_t stream, int kernel_num)
{
int blockNum = getBlockNum(count);
size_t bytesPerBlock = (count + blockNum - 1) / blockNum;
kernel<<<blockNum, BLOCK_THREADS_NUM, 0, stream>>>(comm->rank, count, bytesPerBlock);
return testSuccess;
}
struct testColl sendRecvTest = {"SendRecvTest", SendRecvGetCollByteCount, SendRecvInitColl, SendRecvInitData,
SendRecvGetBw, SendRecvRunColl};
void SendRecvGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
{
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
SendRecvGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t SendRecvSetupConnections(struct testArgs* args)
{
int rank = args->proc;
int worldSize = args->totalProcs;
int ranksPerNode = args->nranksPerNode;
int thisNode = rank / ranksPerNode;
int localRank = rank % ranksPerNode;
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
int sendToRank = (rank + 1) % worldSize;
int recvFromRank = (rank - 1 + worldSize) % worldSize;
std::array<int, 2> ranks = {sendToRank, recvFromRank};
for (int i = 0; i < 2; i++) {
int r = ranks[i];
const char* ibDev = r / ranksPerNode == thisNode ? nullptr : ibDevStr.c_str();
mscclppTransport_t transportType = ibDev == nullptr ? mscclppTransportP2P : mscclppTransportIB;
void* buff = (i == 0) ? args->sendbuff : args->recvbuff;
int tag = (i == 0) ? getSendTag(rank, r) : getRecvTag(rank, r);
MSCCLPPCHECK(mscclppConnect(args->comm, r, tag, buff, args->maxbytes, transportType, ibDev));
}
MSCCLPPCHECK(mscclppConnectionSetup(args->comm));
return testSuccess;
}
testResult_t SendRecvRunTest(struct testArgs* args)
{
args->collTest = &sendRecvTest;
int rank = args->proc, worldSize = args->totalProcs;
// only support out-of-place for sendrecv test
args->in_place = 0;
mscclppDevConn_t* sendDevConn;
mscclppDevConn_t* recvDevConn;
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, (rank + 1) % worldSize, getSendTag(rank, (rank + 1) % worldSize),
&sendDevConn));
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, (rank - 1 + worldSize) % worldSize,
getRecvTag(rank, (rank - 1 + worldSize) % worldSize), &recvDevConn));
CUDACHECK(cudaMemcpyToSymbol(sendConnConst, sendDevConn, sizeof(mscclppDevConn_t)));
CUDACHECK(cudaMemcpyToSymbol(recvConnConst, recvDevConn, sizeof(mscclppDevConn_t)));
TESTCHECK(TimeTest(args));
return testSuccess;
}
struct testEngine sendRecvTestEngine = {SendRecvGetBuffSize, SendRecvRunTest, SendRecvSetupConnections, nullptr};
#pragma weak mscclppTestEngine = sendRecvTestEngine