mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-13 09:46:00 +00:00
Merge branch 'ziyyang/npkit-fix-numa' of https://github.com/microsoft/mscclpp into ziyyang/npkit-fix-numa
This commit is contained in:
19
Makefile
19
Makefile
@@ -147,18 +147,22 @@ UTOBJTARGETS := $(UTOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
|
||||
UTBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(UTOBJS))
|
||||
|
||||
TESTSDIR := tests
|
||||
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test.cu)
|
||||
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test_standalone.cu)
|
||||
TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS))
|
||||
TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
|
||||
TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS))
|
||||
|
||||
MSCLLPPTESTSOBJSDIR:= $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)
|
||||
MSCLLPPTESTBINFILESLIST := allgather_test
|
||||
MSCLLPPTESTBINS := $(MSCLLPPTESTBINFILESLIST:%=$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf)
|
||||
|
||||
INCLUDE := -Isrc -Isrc/include
|
||||
|
||||
.PHONY: all build lib tests clean
|
||||
.PHONY: all build lib tests mscclpp_test clean
|
||||
|
||||
all: build
|
||||
|
||||
build: lib tests
|
||||
build: lib tests mscclpp-test
|
||||
|
||||
lib: $(LIBOBJTARGETS) $(INCTARGETS) $(LIBTARGET)
|
||||
|
||||
@@ -166,6 +170,8 @@ unittests: $(UTBINS)
|
||||
|
||||
tests: unittests $(TESTSBINS)
|
||||
|
||||
mscclpp-test: $(LIBTARGET) $(MSCLLPPTESTBINS)
|
||||
|
||||
cpplint:
|
||||
clang-format-12 -style=file --verbose --Werror --dry-run $(CPPSOURCES)
|
||||
clang-format-12 --dry-run $(CPPSOURCES)
|
||||
@@ -211,12 +217,17 @@ $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cc $(INCTARGETS)
|
||||
# Compile .cu tests
|
||||
$(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cu $(INCTARGETS)
|
||||
@mkdir -p $(@D)
|
||||
$(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) -c $< $(MPI_MACRO)
|
||||
$(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) $(INCLUDE) -c $< $(MPI_MACRO)
|
||||
|
||||
# Test bins
|
||||
$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%: $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o $(LIBTARGET)
|
||||
@mkdir -p $(@D)
|
||||
$(NVCC) -o $@ $< $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp
|
||||
|
||||
# Compile mscclpp_test
|
||||
$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf: $(MSCLLPPTESTSOBJSDIR)/%.o $(MSCLLPPTESTSOBJSDIR)/common.o
|
||||
@mkdir -p $(@D)
|
||||
$(NVCC) -o $@ $^ $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp
|
||||
|
||||
clean:
|
||||
rm -rf $(BUILDDIR)
|
||||
|
||||
@@ -452,9 +452,12 @@ mscclppResult_t mscclppSocketGetAddr(struct mscclppSocket* sock, union mscclppSo
|
||||
|
||||
static mscclppResult_t socketTryAccept(struct mscclppSocket* sock)
|
||||
{
|
||||
static time_t initTime = -1;
|
||||
if (initTime == -1)
|
||||
initTime = clockNano() / 1e9;
|
||||
static bool timeInitialized = false;
|
||||
static mscclppTime_t initTime;
|
||||
if (!timeInitialized) {
|
||||
timeInitialized = true;
|
||||
initTime = getClock();
|
||||
}
|
||||
|
||||
mscclppConfig* config = mscclppConfig::getInstance();
|
||||
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
|
||||
@@ -462,14 +465,14 @@ static mscclppResult_t socketTryAccept(struct mscclppSocket* sock)
|
||||
sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen);
|
||||
if (sock->fd != -1) {
|
||||
sock->state = mscclppSocketStateAccepted;
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
||||
WARN("socketTryAccept: get errno %d that is not EAGAIN or EWOULDBLOCK", errno);
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
return mscclppSystemError;
|
||||
} else if ((clockNano() / 1e9) - initTime > acceptTimeout) {
|
||||
} else if (elapsedClock(getClock(), initTime) > acceptTimeout) {
|
||||
WARN("socketTryAccept: exceeded timeout (%ld) sec", acceptTimeout);
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
return mscclppRemoteError;
|
||||
} else {
|
||||
usleep(SLEEP_INT);
|
||||
@@ -513,10 +516,13 @@ static mscclppResult_t socketFinalizeAccept(struct mscclppSocket* sock)
|
||||
|
||||
static mscclppResult_t socketStartConnect(struct mscclppSocket* sock)
|
||||
{
|
||||
static time_t initTime = -1;
|
||||
if (initTime == -1) {
|
||||
initTime = clockNano() / 1e9;
|
||||
static bool timeInitialized = false;
|
||||
static mscclppTime_t initTime;
|
||||
if (!timeInitialized) {
|
||||
timeInitialized = true;
|
||||
initTime = getClock();
|
||||
}
|
||||
|
||||
mscclppConfig* config = mscclppConfig::getInstance();
|
||||
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
|
||||
|
||||
@@ -524,16 +530,16 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock)
|
||||
int ret = connect(sock->fd, &sock->addr.sa, sock->salen);
|
||||
if (ret == 0) {
|
||||
sock->state = mscclppSocketStateConnected;
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
return mscclppSuccess;
|
||||
} else if (errno == EINPROGRESS) {
|
||||
sock->state = mscclppSocketStateConnectPolling;
|
||||
return mscclppSuccess;
|
||||
} else if (errno == ECONNREFUSED || errno == ETIMEDOUT) {
|
||||
if ((clockNano() / 1e9) - initTime > acceptTimeout) {
|
||||
if (elapsedClock(getClock(), initTime) > acceptTimeout) {
|
||||
WARN("socketStartConnect: exceeded timeout (%ld) sec", acceptTimeout);
|
||||
sock->state = mscclppSocketStateError;
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
return mscclppRemoteError;
|
||||
}
|
||||
usleep(SLEEP_INT);
|
||||
@@ -544,17 +550,20 @@ static mscclppResult_t socketStartConnect(struct mscclppSocket* sock)
|
||||
char line[SOCKET_NAME_MAXLEN + 1];
|
||||
sock->state = mscclppSocketStateError;
|
||||
WARN("socketStartConnect: Connect to %s failed : %s", mscclppSocketToString(&sock->addr, line), strerror(errno));
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
return mscclppSystemError;
|
||||
}
|
||||
}
|
||||
|
||||
static mscclppResult_t socketPollConnect(struct mscclppSocket* sock)
|
||||
{
|
||||
static time_t initTime = -1;
|
||||
if (initTime == -1) {
|
||||
initTime = clockNano() / 1e9;
|
||||
static bool timeInitialized = false;
|
||||
static mscclppTime_t initTime;
|
||||
if (!timeInitialized) {
|
||||
timeInitialized = true;
|
||||
initTime = getClock();
|
||||
}
|
||||
|
||||
mscclppConfig* config = mscclppConfig::getInstance();
|
||||
time_t acceptTimeout = config->getBootstrapConnectionTimeoutConfig();
|
||||
|
||||
@@ -567,7 +576,7 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock)
|
||||
pfd.events = POLLOUT;
|
||||
SYSCHECK(ret = poll(&pfd, 1, timeout), "poll");
|
||||
if (ret == 0) {
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
return mscclppSuccess;
|
||||
}
|
||||
|
||||
@@ -576,10 +585,10 @@ static mscclppResult_t socketPollConnect(struct mscclppSocket* sock)
|
||||
SYSCHECK(getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, (void*)&ret, &rlen), "getsockopt");
|
||||
|
||||
if (ret == 0) {
|
||||
initTime = -1;
|
||||
timeInitialized = false;
|
||||
sock->state = mscclppSocketStateConnected;
|
||||
} else if (ret == ECONNREFUSED || ret == ETIMEDOUT) {
|
||||
if ((clockNano() / 1e9) - initTime > acceptTimeout) {
|
||||
if (elapsedClock(getClock(), initTime) > acceptTimeout) {
|
||||
WARN("socketPollConnect: exceeded timeout (%ld) sec", acceptTimeout);
|
||||
sock->state = mscclppSocketStateError;
|
||||
return mscclppRemoteError;
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
#include <sys/mman.h>
|
||||
#include <unistd.h>
|
||||
|
||||
uint64_t clockNano(); // from utils.h with which we have a circular dependency
|
||||
|
||||
template <typename T> mscclppResult_t mscclppCudaHostCallocDebug(T** ptr, size_t nelem, const char* filefunc, int line)
|
||||
{
|
||||
mscclppResult_t result = mscclppSuccess;
|
||||
|
||||
@@ -48,12 +48,9 @@ static long log2i(long n)
|
||||
return l;
|
||||
}
|
||||
|
||||
inline uint64_t clockNano()
|
||||
{
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
return uint64_t(ts.tv_sec) * 1000 * 1000 * 1000 + ts.tv_nsec;
|
||||
}
|
||||
typedef std::chrono::steady_clock::time_point mscclppTime_t;
|
||||
mscclppTime_t getClock();
|
||||
int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end);
|
||||
|
||||
/* get any bytes of random data from /dev/urandom, return 0 if it succeeds; else
|
||||
* return -1 */
|
||||
|
||||
10
src/utils.cc
10
src/utils.cc
@@ -270,3 +270,13 @@ mscclppResult_t setNumaState(mscclppNumaState state)
|
||||
numa_bind(state);
|
||||
return mscclppSuccess;
|
||||
}
|
||||
|
||||
mscclppTime_t getClock()
|
||||
{
|
||||
return std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
int64_t elapsedClock(mscclppTime_t start, mscclppTime_t end)
|
||||
{
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(end - start).count();
|
||||
}
|
||||
|
||||
@@ -1,50 +1,10 @@
|
||||
#include "mscclpp.h"
|
||||
#include "comm.h"
|
||||
#include "common.h"
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include "mpi.h"
|
||||
#endif // MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include <iostream>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <cuda_runtime.h>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <unordered_map>
|
||||
|
||||
static int nranksPerNode = 8;
|
||||
|
||||
// Propagate errors up
|
||||
|
||||
#define MSCCLPPCHECK(call) \
|
||||
do { \
|
||||
mscclppResult_t res = call; \
|
||||
if (res != mscclppSuccess && res != mscclppInProgress) { \
|
||||
/* Print the back trace*/ \
|
||||
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
|
||||
return res; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
// Check CUDA RT calls
|
||||
#define CUDACHECK(cmd) \
|
||||
do { \
|
||||
cudaError_t err = cmd; \
|
||||
if (err != cudaSuccess) { \
|
||||
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while (false)
|
||||
|
||||
// Measure current time in second.
|
||||
static double getTime(void)
|
||||
{
|
||||
struct timespec tspec;
|
||||
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
|
||||
printf("clock_gettime failed\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
|
||||
}
|
||||
|
||||
#define ALIGN 4
|
||||
__constant__ mscclppDevConn_t constDevConns[16];
|
||||
|
||||
__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU)
|
||||
@@ -53,16 +13,16 @@ __device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, i
|
||||
|
||||
// this thread's role is a sender role
|
||||
// put your data asynchronously
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
if (threadIdx.x % 32 == 0)
|
||||
devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
|
||||
// make sure everyone is put their data before some thread randomly blocks everyone else in signal
|
||||
__syncthreads();
|
||||
// push with flag and sync to make sure the data is received
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
if (threadIdx.x % 32 == 0)
|
||||
devConn.flush();
|
||||
|
||||
// this thread's role is a receiver role. wait on the semaphore to make sure the data is ready
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
if (threadIdx.x % 32 == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
@@ -176,326 +136,85 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem
|
||||
allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
}
|
||||
|
||||
int rankToLocalRank(int rank)
|
||||
void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks)
|
||||
{
|
||||
return rank % nranksPerNode;
|
||||
size_t base = (count / (ALIGN * nranks)) * ALIGN;
|
||||
*sendcount = base;
|
||||
*recvcount = base * nranks;
|
||||
*sendInplaceOffset = base;
|
||||
*recvInplaceOffset = 0;
|
||||
*paramcount = base;
|
||||
}
|
||||
|
||||
int rankToNode(int rank)
|
||||
testResult_t AllGatherInitData(struct testArgs* args, int in_place)
|
||||
{
|
||||
return rank / nranksPerNode;
|
||||
}
|
||||
size_t sendcount = args->sendBytes / sizeof(int);
|
||||
size_t recvcount = args->expectedBytes / sizeof(int);
|
||||
// int nranks = args->totalProcs;
|
||||
|
||||
void print_usage(const char* prog)
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
printf("usage: %s IP:PORT [rank nranks]\n", prog);
|
||||
#else
|
||||
printf("usage: %s IP:PORT rank nranks\n", prog);
|
||||
#endif
|
||||
}
|
||||
CUDACHECK(cudaSetDevice(args->gpuNum));
|
||||
int rank = args->proc;
|
||||
CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes));
|
||||
// void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0];
|
||||
|
||||
void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
|
||||
int** data_d)
|
||||
{
|
||||
CUDACHECK(cudaMalloc(data_d, dataSize));
|
||||
CUDACHECK(cudaMemset(*data_d, 0, dataSize));
|
||||
|
||||
*data_h = new int[nelemsPerGPU * world_size];
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int* dataHost = new int[recvcount];
|
||||
for (size_t i = 0; i < recvcount; i++) {
|
||||
int val = i + 1;
|
||||
if (i / nelemsPerGPU == (size_t)rank) {
|
||||
(*data_h)[i] = val;
|
||||
if (i / sendcount == (size_t)rank) {
|
||||
dataHost[i] = val;
|
||||
} else {
|
||||
(*data_h)[i] = 0;
|
||||
dataHost[i] = 0;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
|
||||
CUDACHECK(cudaMemcpy(args->recvbuff, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
for (int i = 0; i < static_cast<int>(recvcount); i++) {
|
||||
dataHost[i] = i + 1;
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
delete dataHost;
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize)
|
||||
void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
|
||||
{
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = rankToLocalRank(rank);
|
||||
std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum);
|
||||
double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec;
|
||||
|
||||
for (int r = 0; r < world_size; ++r) {
|
||||
if (r == rank)
|
||||
continue;
|
||||
mscclppTransport_t transportType;
|
||||
const char* ibDev = ibDevStr.c_str();
|
||||
if (rankToNode(r) == thisNode) {
|
||||
ibDev = NULL;
|
||||
transportType = mscclppTransportP2P;
|
||||
} else {
|
||||
transportType = mscclppTransportIB;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev));
|
||||
}
|
||||
*algBw = baseBw;
|
||||
double factor = ((double)(nranks - 1)) / ((double)nranks);
|
||||
*busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(comm));
|
||||
testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
|
||||
cudaStream_t stream, int kernel_num)
|
||||
{
|
||||
int worldSize = comm->nRanks;
|
||||
kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), kernel_num);
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, AllGatherInitData, AllGatherGetBw,
|
||||
AllGatherRunColl};
|
||||
|
||||
void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
|
||||
{
|
||||
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
|
||||
AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
|
||||
}
|
||||
|
||||
testResult_t AllGatherRunTest(struct testArgs* args)
|
||||
{
|
||||
args->collTest = &allGatherTest;
|
||||
mscclppDevConn_t* devConns;
|
||||
int nCons;
|
||||
MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons));
|
||||
|
||||
MSCCLPPCHECK(mscclppGetAllDeviceConnections(args->comm, &devConns, &nCons));
|
||||
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons));
|
||||
|
||||
return mscclppSuccess;
|
||||
TESTCHECK(TimeTest(args));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void printUsage(const char* prog, bool isMpi)
|
||||
{
|
||||
if (isMpi) {
|
||||
std::string st = "you are using MPI for this test\n";
|
||||
st += "two possilbe usages are:\n";
|
||||
st += "> " + std::string(prog) + "\n";
|
||||
st += "or\n";
|
||||
st += "> " + std::string(prog) + " -ip_port [ip:port]\n";
|
||||
printf("%s", st.c_str());
|
||||
} else {
|
||||
std::string st = "you are NOT using MPI for this test\n";
|
||||
st += "the only possible usage:\n";
|
||||
st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n";
|
||||
printf("%s", st.c_str());
|
||||
}
|
||||
}
|
||||
struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest};
|
||||
|
||||
std::unordered_map<std::string, std::string> parseArgs(int argc, const char* argv[], bool isMpi)
|
||||
{
|
||||
std::unordered_map<std::string, std::string> options;
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
std::string arg = argv[i];
|
||||
if (arg == "-rankspernode") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["rankspernode"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -rankspernode option requires an argument.\n");
|
||||
;
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-kernel") {
|
||||
if (i + 1 < argc) {
|
||||
options["kernel"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -kernel option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-ip_port") {
|
||||
if (i + 1 < argc) {
|
||||
options["ip_port"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-rank") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -rank should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["rank"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-nranks") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -nranks should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["nranks"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-datasize") {
|
||||
if (i + 1 < argc) {
|
||||
options["datasize"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -datasize option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-help" || arg == "-h") {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(0);
|
||||
} else {
|
||||
fprintf(stderr, "Error: Unknown option %s\n", argv[i]);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
bool isMpi = false;
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
isMpi = true;
|
||||
#endif
|
||||
|
||||
auto parsedArgs = parseArgs(argc, argv, isMpi);
|
||||
|
||||
int rank;
|
||||
int world_size;
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Init(NULL, NULL);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||
// get the local number of nodes with MPI
|
||||
MPI_Comm shmcomm;
|
||||
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
|
||||
int shmrank;
|
||||
MPI_Comm_size(shmcomm, &shmrank);
|
||||
nranksPerNode = shmrank;
|
||||
MPI_Comm_free(&shmcomm);
|
||||
#else
|
||||
if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
rank = std::stoi(parsedArgs["rank"]);
|
||||
world_size = std::stoi(parsedArgs["nranks"]);
|
||||
if (parsedArgs.find("rankspernode") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
nranksPerNode = std::stoi(parsedArgs["rankspernode"]);
|
||||
#endif
|
||||
int kernelNum = 0;
|
||||
if (parsedArgs.find("kernel") != parsedArgs.end()) {
|
||||
kernelNum = std::stoi(parsedArgs["kernel"]);
|
||||
}
|
||||
char* ip_port = NULL;
|
||||
if (parsedArgs.find("ip_port") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
ip_port = (char*)parsedArgs["ip_port"].c_str();
|
||||
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = rankToLocalRank(rank);
|
||||
CUDACHECK(cudaSetDevice(cudaNum));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Initializing MSCCL++\n");
|
||||
mscclppComm_t comm;
|
||||
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank));
|
||||
|
||||
int* data_d;
|
||||
int* data_h;
|
||||
size_t dataSize = 1024 * 1024 * 1024;
|
||||
if (parsedArgs.find("datasize") != parsedArgs.end()) {
|
||||
dataSize = std::stoul(parsedArgs["datasize"]);
|
||||
}
|
||||
size_t nelemsPerGPU = dataSize / sizeof(int) / world_size;
|
||||
|
||||
if (rank == 0)
|
||||
printf("Initializing data for allgather test\n");
|
||||
initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d);
|
||||
|
||||
if (rank == 0)
|
||||
printf("Setting up the connection in MSCCL++\n");
|
||||
MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Launching MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(comm));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Testing the correctness of AllGather implementation\n");
|
||||
cudaStream_t stream;
|
||||
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));
|
||||
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int val = i + 1;
|
||||
if (data_h[i] != val) {
|
||||
printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
int tmp[16];
|
||||
// A simple barrier
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
if (rank == 0)
|
||||
printf("Successfully checked the correctness\n");
|
||||
|
||||
// Perf test
|
||||
int iterwithoutcudagraph = 10;
|
||||
if (rank == 0)
|
||||
printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
for (int i = 0; i < iterwithoutcudagraph; ++i) {
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
// cudaGraph Capture
|
||||
int cudagraphiter = 10;
|
||||
if (rank == 0)
|
||||
printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
|
||||
cudaGraph_t graph;
|
||||
cudaGraphExec_t instance;
|
||||
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
|
||||
for (int i = 0; i < cudagraphiter; ++i) {
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
}
|
||||
cudaStreamEndCapture(stream, &graph);
|
||||
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
|
||||
|
||||
int cudagraphwarmup = 10;
|
||||
if (rank == 0)
|
||||
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
|
||||
cudagraphiter);
|
||||
for (int i = 0; i < cudagraphwarmup; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
// measure runtime
|
||||
int cudagraphlaunch = 10;
|
||||
if (rank == 0)
|
||||
printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch,
|
||||
cudagraphiter);
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
double t0, t1, ms, time_in_us;
|
||||
t0 = getTime();
|
||||
for (int i = 0; i < cudagraphlaunch; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
t1 = getTime();
|
||||
ms = (t1 - t0) * 1000.0;
|
||||
time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter;
|
||||
printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us,
|
||||
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Stopping MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyStop(comm));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Destroying MSCCL++ communicator\n");
|
||||
MSCCLPPCHECK(mscclppCommDestroy(comm));
|
||||
printf("Rank %d succeeded!\n", rank);
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Finalize();
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#pragma weak mscclppTestEngine = allGatherEngine
|
||||
|
||||
@@ -1,295 +0,0 @@
|
||||
#include "mscclpp.h"
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include "mpi.h"
|
||||
#endif // MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
|
||||
#define RANKS_PER_NODE 8
|
||||
|
||||
// Check CUDA RT calls
|
||||
#define CUDACHECK(cmd) \
|
||||
do { \
|
||||
cudaError_t err = cmd; \
|
||||
if (err != cudaSuccess) { \
|
||||
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while (false)
|
||||
|
||||
// Measure current time in second.
|
||||
static double getTime(void)
|
||||
{
|
||||
struct timespec tspec;
|
||||
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
|
||||
printf("clock_gettime failed\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
|
||||
}
|
||||
|
||||
__constant__ mscclppDevConn_t constDevConns[16];
|
||||
|
||||
__global__ void kernel(int rank, int world_size, size_t nelemsPerGPU)
|
||||
{
|
||||
if (threadIdx.x % 32 != 0)
|
||||
return;
|
||||
|
||||
int warpId = threadIdx.x / 32;
|
||||
bool isIB = false;
|
||||
if (warpId >= world_size - 1)
|
||||
isIB = true;
|
||||
if (isIB)
|
||||
warpId = warpId - (world_size - 1);
|
||||
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
|
||||
mscclppDevConn_t devConn = constDevConns[remoteRank];
|
||||
if (isIB)
|
||||
devConn = constDevConns[remoteRank + world_size];
|
||||
|
||||
// Each warp receives data from different ranks
|
||||
#if 1
|
||||
|
||||
// Trigger sending data, flag and synchronize after
|
||||
devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
|
||||
|
||||
devConn.wait();
|
||||
|
||||
#else
|
||||
for (int i = 1; i < world_size; i++) {
|
||||
__syncthreads();
|
||||
if (remoteRank != ((rank + i) % world_size))
|
||||
continue;
|
||||
|
||||
// Trigger sending data, flag and synchronize after
|
||||
size_t ibPortion = nelemsPerGPU / 12; // nelemsPerGPU/12;
|
||||
if (isIB)
|
||||
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync,
|
||||
rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion) * sizeof(int),
|
||||
rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion) * sizeof(int),
|
||||
ibPortion * sizeof(int));
|
||||
else
|
||||
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int),
|
||||
rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU - ibPortion) * sizeof(int));
|
||||
// Wait on the request to make sure it is safe to reuse buffer and flag
|
||||
auto req = devConn.fifo.putWithSignal(dataOffset, dataSize);
|
||||
devConn.fifo.sync(req);
|
||||
}
|
||||
// Wait for receiving data from remote rank
|
||||
while (*proxyFlag == baseFlag)
|
||||
;
|
||||
#endif
|
||||
}
|
||||
|
||||
int rankToLocalRank(int rank)
|
||||
{
|
||||
return rank % RANKS_PER_NODE;
|
||||
}
|
||||
|
||||
int rankToNode(int rank)
|
||||
{
|
||||
return rank / RANKS_PER_NODE;
|
||||
}
|
||||
|
||||
int cudaNumToIbNum(int cudaNum)
|
||||
{
|
||||
int ibNum;
|
||||
if (cudaNum == 0) {
|
||||
ibNum = 0;
|
||||
} else if (cudaNum == 1) {
|
||||
ibNum = 4;
|
||||
} else if (cudaNum == 2) {
|
||||
ibNum = 1;
|
||||
} else if (cudaNum == 3) {
|
||||
ibNum = 5;
|
||||
} else if (cudaNum == 4) {
|
||||
ibNum = 2;
|
||||
} else if (cudaNum == 5) {
|
||||
ibNum = 6;
|
||||
} else if (cudaNum == 6) {
|
||||
ibNum = 3;
|
||||
} else if (cudaNum == 7) {
|
||||
ibNum = 7;
|
||||
} else {
|
||||
printf("Invalid cudaNum: %d\n", cudaNum);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
return ibNum;
|
||||
}
|
||||
|
||||
void print_usage(const char* prog)
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
printf("usage: %s IP:PORT [rank nranks]\n", prog);
|
||||
#else
|
||||
printf("usage: %s IP:PORT rank nranks\n", prog);
|
||||
#endif
|
||||
}
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
if (argc != 2 && argc != 4) {
|
||||
print_usage(argv[0]);
|
||||
return -1;
|
||||
}
|
||||
const char* ip_port = argv[1];
|
||||
int rank;
|
||||
int world_size;
|
||||
if (argc == 4) {
|
||||
rank = atoi(argv[2]);
|
||||
world_size = atoi(argv[3]);
|
||||
} else {
|
||||
MPI_Init(NULL, NULL);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||
}
|
||||
#else
|
||||
if (argc != 4) {
|
||||
print_usage(argv[0]);
|
||||
return -1;
|
||||
}
|
||||
const char* ip_port = argv[1];
|
||||
int rank = atoi(argv[2]);
|
||||
int world_size = atoi(argv[3]);
|
||||
#endif
|
||||
int localRank = rankToLocalRank(rank);
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = localRank;
|
||||
int ibNum = cudaNumToIbNum(cudaNum);
|
||||
|
||||
CUDACHECK(cudaSetDevice(cudaNum));
|
||||
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
|
||||
|
||||
mscclppComm_t comm;
|
||||
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
|
||||
|
||||
int* data_d;
|
||||
uint64_t* flag_d;
|
||||
size_t data_size = 1536 * 1024 * 1024;
|
||||
size_t nelemsPerGPU = data_size / sizeof(int) / world_size;
|
||||
CUDACHECK(cudaMalloc(&data_d, data_size));
|
||||
CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t)));
|
||||
CUDACHECK(cudaMemset(data_d, 0, data_size));
|
||||
CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t)));
|
||||
|
||||
int* data_h = new int[nelemsPerGPU * world_size];
|
||||
for (int i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
size_t val = i + 1;
|
||||
if (i / nelemsPerGPU == rank) {
|
||||
data_h[i] = val;
|
||||
} else {
|
||||
data_h[i] = 0;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(data_d, data_h, data_size, cudaMemcpyHostToDevice));
|
||||
|
||||
mscclppDevConn_t devConns[16];
|
||||
for (int r = 0; r < world_size; ++r) {
|
||||
if (r == rank)
|
||||
continue;
|
||||
mscclppTransport_t transportType;
|
||||
const char* ibDev = NULL;
|
||||
transportType = mscclppTransportP2P;
|
||||
// Connect with all other ranks
|
||||
MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, 0, data_d, data_size, flag_d, transportType, ibDev));
|
||||
}
|
||||
for (int r = 0; r < world_size; ++r) {
|
||||
if (r == rank)
|
||||
continue;
|
||||
mscclppTransport_t transportType;
|
||||
const char* ibDev = ibDevStr.c_str();
|
||||
transportType = mscclppTransportIB;
|
||||
// Connect with all other ranks
|
||||
MSCCLPPCHECK(
|
||||
mscclppConnect(comm, &devConns[r + world_size], r, 0, data_d, data_size, flag_d, transportType, ibDev));
|
||||
}
|
||||
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(comm));
|
||||
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(comm));
|
||||
|
||||
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * 2 * world_size));
|
||||
|
||||
cudaStream_t stream;
|
||||
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
|
||||
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
kernel<<<1, 32 * 2 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU);
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
CUDACHECK(cudaMemcpy(data_h, data_d, data_size, cudaMemcpyDeviceToHost));
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int val = i + 1;
|
||||
if (data_h[i] != val) {
|
||||
printf("oh uh things went wrong! data_h[%d] (%d) != val (%d)\n", i, data_h[i], val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
int tmp[16];
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
// // Perf test
|
||||
// cudaEvent_t ev_start;
|
||||
// cudaEvent_t ev_end;
|
||||
// CUDACHECK(cudaEventCreate(&ev_start));
|
||||
// CUDACHECK(cudaEventCreate(&ev_end));
|
||||
|
||||
// warm up
|
||||
// int warmupiter = 1000;
|
||||
// for (int i = 0; i < warmupiter; ++i) {
|
||||
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU);
|
||||
// }
|
||||
// CUDACHECK(cudaDeviceSynchronize());
|
||||
// MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
// cudaGraph Capture
|
||||
cudaGraph_t graph;
|
||||
cudaGraphExec_t instance;
|
||||
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
|
||||
int cudagraphiter = 10;
|
||||
for (int i = 0; i < cudagraphiter; ++i) {
|
||||
kernel<<<1, 32 * 2 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU);
|
||||
}
|
||||
cudaStreamEndCapture(stream, &graph);
|
||||
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
|
||||
|
||||
int cudagraphwarmup = 10;
|
||||
for (int i = 0; i < cudagraphwarmup; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
// measure runtime
|
||||
// CUDACHECK(cudaEventRecord(ev_start, stream));
|
||||
double t0 = getTime();
|
||||
int cudagraphlaunch = 10;
|
||||
for (int i = 0; i < cudagraphlaunch; ++i) {
|
||||
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
// CUDACHECK(cudaEventRecord(ev_end, stream));
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
double t1 = getTime();
|
||||
float ms = (t1 - t0) * 1000.0;
|
||||
// CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end));
|
||||
double time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter;
|
||||
printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us,
|
||||
(double)(data_size) / 1024. / 1024. / 1024. / (time_in_us / 1e6));
|
||||
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
MSCCLPPCHECK(mscclppProxyStop(comm));
|
||||
|
||||
MSCCLPPCHECK(mscclppCommDestroy(comm));
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
if (argc == 2) {
|
||||
MPI_Finalize();
|
||||
}
|
||||
#endif
|
||||
printf("Succeeded! %d\n", rank);
|
||||
return 0;
|
||||
}
|
||||
501
tests/allgather_test_standalone.cu
Normal file
501
tests/allgather_test_standalone.cu
Normal file
@@ -0,0 +1,501 @@
|
||||
#include "mscclpp.h"
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include "mpi.h"
|
||||
#endif // MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include <iostream>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <unordered_map>
|
||||
|
||||
static int nranksPerNode = 8;
|
||||
|
||||
// Propagate errors up
|
||||
|
||||
#define MSCCLPPCHECK(call) \
|
||||
do { \
|
||||
mscclppResult_t res = call; \
|
||||
if (res != mscclppSuccess && res != mscclppInProgress) { \
|
||||
/* Print the back trace*/ \
|
||||
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
|
||||
return res; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
// Check CUDA RT calls
|
||||
#define CUDACHECK(cmd) \
|
||||
do { \
|
||||
cudaError_t err = cmd; \
|
||||
if (err != cudaSuccess) { \
|
||||
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while (false)
|
||||
|
||||
// Measure current time in second.
|
||||
static double getTime(void)
|
||||
{
|
||||
struct timespec tspec;
|
||||
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
|
||||
printf("clock_gettime failed\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
|
||||
}
|
||||
|
||||
__constant__ mscclppDevConn_t constDevConns[16];
|
||||
|
||||
__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU)
|
||||
{
|
||||
// this allgather is really simple and implemented as an alltoall
|
||||
|
||||
// this thread's role is a sender role
|
||||
// put your data asynchronously
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
|
||||
// make sure everyone is put their data before some thread randomly blocks everyone else in signal
|
||||
__syncthreads();
|
||||
// push with flag and sync to make sure the data is received
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.flush();
|
||||
|
||||
// this thread's role is a receiver role. wait on the semaphore to make sure the data is ready
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
uint64_t offset, uint64_t size)
|
||||
{
|
||||
// this allgather algorithm works as follows:
|
||||
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
|
||||
// and waits for data from GPU rank (i-1) % nranksPerNode
|
||||
// Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode
|
||||
// ...
|
||||
// This order is much better for DMA engine for NVLinks
|
||||
for (int i = 1; i < nranksPerNode; i++) {
|
||||
if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) {
|
||||
// put your data to GPU (rank+i) % nranksPerNode and signal in one call
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush(offset, size);
|
||||
}
|
||||
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
|
||||
if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) {
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory");
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
size_t nelemsPerGPU)
|
||||
{
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
|
||||
__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
size_t nelemsPerGPU)
|
||||
{
|
||||
// this allgather is a pipelined and hierarchical one and only works for two nodes
|
||||
// it is implemented as follows:
|
||||
// Step 1: each node does a local allgather and concurrently,
|
||||
// local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with
|
||||
// its cross-node neighbor (local GPU i on the other node) via IB
|
||||
// Step 2: each node does a local allgather again with the data just received from its
|
||||
// cross-node neighbor in step 1, and concurrently, exchange the rest of the data with
|
||||
// its cross-node neighbor
|
||||
// Step 3: each node does a local allgather for the last time with the rest of the data
|
||||
|
||||
int pipelineSize = 3;
|
||||
|
||||
// Step 1
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 2
|
||||
// local allgather
|
||||
int otherNghr = (rank + nranksPerNode) % world_size;
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
}
|
||||
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) *
|
||||
sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 3
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank,
|
||||
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel)
|
||||
{
|
||||
// find the mapping between remoteRank and devConns
|
||||
int warpId = threadIdx.x / 32;
|
||||
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
|
||||
// Each warp is responsible for one of the remote ranks
|
||||
mscclppDevConn_t devConn = constDevConns[warpId];
|
||||
|
||||
if (kernel == 0)
|
||||
allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 1)
|
||||
allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 2)
|
||||
allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
}
|
||||
|
||||
int rankToLocalRank(int rank)
|
||||
{
|
||||
return rank % nranksPerNode;
|
||||
}
|
||||
|
||||
int rankToNode(int rank)
|
||||
{
|
||||
return rank / nranksPerNode;
|
||||
}
|
||||
|
||||
void print_usage(const char* prog)
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
printf("usage: %s IP:PORT [rank nranks]\n", prog);
|
||||
#else
|
||||
printf("usage: %s IP:PORT rank nranks\n", prog);
|
||||
#endif
|
||||
}
|
||||
|
||||
void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
|
||||
int** data_d)
|
||||
{
|
||||
CUDACHECK(cudaMalloc(data_d, dataSize));
|
||||
CUDACHECK(cudaMemset(*data_d, 0, dataSize));
|
||||
|
||||
*data_h = new int[nelemsPerGPU * world_size];
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int val = i + 1;
|
||||
if (i / nelemsPerGPU == (size_t)rank) {
|
||||
(*data_h)[i] = val;
|
||||
} else {
|
||||
(*data_h)[i] = 0;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
|
||||
}
|
||||
|
||||
mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize)
|
||||
{
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = rankToLocalRank(rank);
|
||||
std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum);
|
||||
|
||||
for (int r = 0; r < world_size; ++r) {
|
||||
if (r == rank)
|
||||
continue;
|
||||
mscclppTransport_t transportType;
|
||||
const char* ibDev = ibDevStr.c_str();
|
||||
if (rankToNode(r) == thisNode) {
|
||||
ibDev = NULL;
|
||||
transportType = mscclppTransportP2P;
|
||||
} else {
|
||||
transportType = mscclppTransportIB;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev));
|
||||
}
|
||||
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(comm));
|
||||
|
||||
mscclppDevConn_t* devConns;
|
||||
int nCons;
|
||||
MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons));
|
||||
|
||||
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons));
|
||||
|
||||
return mscclppSuccess;
|
||||
}
|
||||
|
||||
void printUsage(const char* prog, bool isMpi)
|
||||
{
|
||||
if (isMpi) {
|
||||
std::string st = "you are using MPI for this test\n";
|
||||
st += "two possilbe usages are:\n";
|
||||
st += "> " + std::string(prog) + "\n";
|
||||
st += "or\n";
|
||||
st += "> " + std::string(prog) + " -ip_port [ip:port]\n";
|
||||
printf("%s", st.c_str());
|
||||
} else {
|
||||
std::string st = "you are NOT using MPI for this test\n";
|
||||
st += "the only possible usage:\n";
|
||||
st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n";
|
||||
printf("%s", st.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::string> parseArgs(int argc, const char* argv[], bool isMpi)
|
||||
{
|
||||
std::unordered_map<std::string, std::string> options;
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
std::string arg = argv[i];
|
||||
if (arg == "-rankspernode") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["rankspernode"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -rankspernode option requires an argument.\n");
|
||||
;
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-kernel") {
|
||||
if (i + 1 < argc) {
|
||||
options["kernel"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -kernel option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-ip_port") {
|
||||
if (i + 1 < argc) {
|
||||
options["ip_port"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-rank") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -rank should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["rank"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-nranks") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -nranks should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["nranks"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-datasize") {
|
||||
if (i + 1 < argc) {
|
||||
options["datasize"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -datasize option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-help" || arg == "-h") {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(0);
|
||||
} else {
|
||||
fprintf(stderr, "Error: Unknown option %s\n", argv[i]);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
bool isMpi = false;
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
isMpi = true;
|
||||
#endif
|
||||
|
||||
auto parsedArgs = parseArgs(argc, argv, isMpi);
|
||||
|
||||
int rank;
|
||||
int world_size;
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Init(NULL, NULL);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||
// get the local number of nodes with MPI
|
||||
MPI_Comm shmcomm;
|
||||
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
|
||||
int shmrank;
|
||||
MPI_Comm_size(shmcomm, &shmrank);
|
||||
nranksPerNode = shmrank;
|
||||
MPI_Comm_free(&shmcomm);
|
||||
#else
|
||||
if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
rank = std::stoi(parsedArgs["rank"]);
|
||||
world_size = std::stoi(parsedArgs["nranks"]);
|
||||
if (parsedArgs.find("rankspernode") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
nranksPerNode = std::stoi(parsedArgs["rankspernode"]);
|
||||
#endif
|
||||
int kernelNum = 0;
|
||||
if (parsedArgs.find("kernel") != parsedArgs.end()) {
|
||||
kernelNum = std::stoi(parsedArgs["kernel"]);
|
||||
}
|
||||
char* ip_port = NULL;
|
||||
if (parsedArgs.find("ip_port") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
ip_port = (char*)parsedArgs["ip_port"].c_str();
|
||||
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = rankToLocalRank(rank);
|
||||
CUDACHECK(cudaSetDevice(cudaNum));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Initializing MSCCL++\n");
|
||||
mscclppComm_t comm;
|
||||
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank));
|
||||
|
||||
int* data_d;
|
||||
int* data_h;
|
||||
size_t dataSize = 1024 * 1024 * 1024;
|
||||
if (parsedArgs.find("datasize") != parsedArgs.end()) {
|
||||
dataSize = std::stoul(parsedArgs["datasize"]);
|
||||
}
|
||||
size_t nelemsPerGPU = dataSize / sizeof(int) / world_size;
|
||||
|
||||
if (rank == 0)
|
||||
printf("Initializing data for allgather test\n");
|
||||
initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d);
|
||||
|
||||
if (rank == 0)
|
||||
printf("Setting up the connection in MSCCL++\n");
|
||||
MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Launching MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(comm));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Testing the correctness of AllGather implementation\n");
|
||||
cudaStream_t stream;
|
||||
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));
|
||||
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int val = i + 1;
|
||||
if (data_h[i] != val) {
|
||||
printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
int tmp[16];
|
||||
// A simple barrier
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
if (rank == 0)
|
||||
printf("Successfully checked the correctness\n");
|
||||
|
||||
// Perf test
|
||||
int iterwithoutcudagraph = 10;
|
||||
if (rank == 0)
|
||||
printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
for (int i = 0; i < iterwithoutcudagraph; ++i) {
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
// cudaGraph Capture
|
||||
int cudagraphiter = 10;
|
||||
if (rank == 0)
|
||||
printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
|
||||
cudaGraph_t graph;
|
||||
cudaGraphExec_t instance;
|
||||
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
|
||||
for (int i = 0; i < cudagraphiter; ++i) {
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
}
|
||||
cudaStreamEndCapture(stream, &graph);
|
||||
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
|
||||
|
||||
int cudagraphwarmup = 10;
|
||||
if (rank == 0)
|
||||
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
|
||||
cudagraphiter);
|
||||
for (int i = 0; i < cudagraphwarmup; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
// measure runtime
|
||||
int cudagraphlaunch = 10;
|
||||
if (rank == 0)
|
||||
printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch,
|
||||
cudagraphiter);
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
double t0, t1, ms, time_in_us;
|
||||
t0 = getTime();
|
||||
for (int i = 0; i < cudagraphlaunch; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
t1 = getTime();
|
||||
ms = (t1 - t0) * 1000.0;
|
||||
time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter;
|
||||
printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us,
|
||||
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Stopping MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyStop(comm));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Destroying MSCCL++ communicator\n");
|
||||
MSCCLPPCHECK(mscclppCommDestroy(comm));
|
||||
printf("Rank %d succeeded!\n", rank);
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Finalize();
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
680
tests/common.cu
Normal file
680
tests/common.cu
Normal file
@@ -0,0 +1,680 @@
|
||||
/*************************************************************************
|
||||
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
|
||||
*
|
||||
* See LICENSE.txt for license information
|
||||
************************************************************************/
|
||||
|
||||
#include "common.h"
|
||||
#include "cuda.h"
|
||||
#include "mscclpp.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
|
||||
#include <getopt.h>
|
||||
#include <libgen.h>
|
||||
|
||||
#define NUM_BLOCKS 32
|
||||
|
||||
int is_main_proc = 0;
|
||||
thread_local int is_main_thread = 0;
|
||||
|
||||
namespace {
|
||||
class timer
|
||||
{
|
||||
std::uint64_t t0;
|
||||
|
||||
public:
|
||||
timer();
|
||||
double elapsed() const;
|
||||
double reset();
|
||||
};
|
||||
|
||||
std::uint64_t now()
|
||||
{
|
||||
using clock = std::chrono::steady_clock;
|
||||
return std::chrono::duration_cast<std::chrono::nanoseconds>(clock::now().time_since_epoch()).count();
|
||||
}
|
||||
|
||||
// Command line parameter defaults
|
||||
size_t minBytes = 32 * 1024 * 1024;
|
||||
size_t maxBytes = 32 * 1024 * 1024;
|
||||
size_t stepBytes = 1 * 1024 * 1024;
|
||||
size_t stepFactor = 1;
|
||||
int datacheck = 1;
|
||||
int warmup_iters = 10;
|
||||
int iters = 100;
|
||||
int timeout = 0;
|
||||
int report_cputime = 0;
|
||||
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
|
||||
int average = 1;
|
||||
int kernel_num = 0;
|
||||
int cudaGraphLaunches = 15;
|
||||
|
||||
double parsesize(const char* value)
|
||||
{
|
||||
long long int units;
|
||||
double size;
|
||||
char size_lit;
|
||||
|
||||
int count = sscanf(value, "%lf %1s", &size, &size_lit);
|
||||
|
||||
switch (count) {
|
||||
case 2:
|
||||
switch (size_lit) {
|
||||
case 'G':
|
||||
case 'g':
|
||||
units = 1024 * 1024 * 1024;
|
||||
break;
|
||||
case 'M':
|
||||
case 'm':
|
||||
units = 1024 * 1024;
|
||||
break;
|
||||
case 'K':
|
||||
case 'k':
|
||||
units = 1024;
|
||||
break;
|
||||
default:
|
||||
return -1.0;
|
||||
};
|
||||
break;
|
||||
case 1:
|
||||
units = 1;
|
||||
break;
|
||||
default:
|
||||
return -1.0;
|
||||
}
|
||||
|
||||
return size * units;
|
||||
}
|
||||
|
||||
inline testResult_t Barrier(struct testArgs* args)
|
||||
{
|
||||
int tmp[16];
|
||||
// A simple barrier
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(args->comm, tmp, sizeof(int)));
|
||||
return testSuccess;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
timer::timer()
|
||||
{
|
||||
t0 = now();
|
||||
}
|
||||
|
||||
double timer::elapsed() const
|
||||
{
|
||||
std::uint64_t t1 = now();
|
||||
return 1.e-9 * (t1 - t0);
|
||||
}
|
||||
|
||||
double timer::reset()
|
||||
{
|
||||
std::uint64_t t1 = now();
|
||||
double ans = 1.e-9 * (t1 - t0);
|
||||
t0 = t1;
|
||||
return ans;
|
||||
}
|
||||
|
||||
testResult_t AllocateBuffs(void** sendbuff, size_t sendBytes, void** recvbuff, size_t recvBytes, void** expected,
|
||||
size_t nbytes)
|
||||
{
|
||||
CUDACHECK(cudaMalloc(sendbuff, nbytes));
|
||||
CUDACHECK(cudaMalloc(recvbuff, nbytes));
|
||||
if (datacheck)
|
||||
CUDACHECK(cudaMalloc(expected, recvBytes));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t startColl(struct testArgs* args, int in_place, int iter)
|
||||
{
|
||||
size_t count = args->nbytes;
|
||||
|
||||
// Try to change offset for each iteration so that we avoid cache effects and catch race conditions in ptrExchange
|
||||
size_t totalnbytes = max(args->sendBytes, args->expectedBytes);
|
||||
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
|
||||
size_t shift = totalnbytes * (iter % steps);
|
||||
|
||||
int rank = args->proc;
|
||||
char* recvBuff = ((char*)args->recvbuff) + shift;
|
||||
char* sendBuff = ((char*)args->sendbuff) + shift;
|
||||
|
||||
TESTCHECK(args->collTest->runColl((void*)(in_place ? recvBuff + args->sendInplaceOffset * rank : sendBuff),
|
||||
(void*)(in_place ? recvBuff + args->recvInplaceOffset * rank : recvBuff),
|
||||
args->nranksPerNode, count, args->comm, args->stream, args->kernel_num));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t testStreamSynchronize(cudaStream_t stream)
|
||||
{
|
||||
cudaError_t cudaErr;
|
||||
timer tim;
|
||||
|
||||
while (true) {
|
||||
cudaErr = cudaStreamQuery(stream);
|
||||
if (cudaErr == cudaSuccess) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (cudaErr != cudaErrorNotReady)
|
||||
CUDACHECK(cudaErr);
|
||||
|
||||
double delta = tim.elapsed();
|
||||
if (delta > timeout && timeout > 0) {
|
||||
char hostname[1024];
|
||||
getHostName(hostname, 1024);
|
||||
printf("%s: Test timeout (%ds) %s:%d\n", hostname, timeout, __FILE__, __LINE__);
|
||||
return testTimeout;
|
||||
}
|
||||
|
||||
// We might want to let other threads (including MSCCLPP threads) use the CPU.
|
||||
sched_yield();
|
||||
}
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t completeColl(struct testArgs* args)
|
||||
{
|
||||
TESTCHECK(testStreamSynchronize(args->stream));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
// Inter process barrier+allreduce. The quality of the return value
|
||||
// for average=0 is just value itself.
|
||||
// Inter process barrier+allreduce. The quality of the return value
|
||||
// for average=0 is just value itself.
|
||||
template <typename T> void Allreduce(struct testArgs* args, T* value, int average)
|
||||
{
|
||||
T accumulator = *value;
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
if (average != 0) {
|
||||
static_assert(std::is_same<T, long long>::value || std::is_same<T, double>::value,
|
||||
"Allreduce<T> only for T in {long long, double}");
|
||||
MPI_Datatype ty = std::is_same<T, long long>::value ? MPI_LONG_LONG
|
||||
: std::is_same<T, double>::value ? MPI_DOUBLE
|
||||
: MPI_Datatype();
|
||||
MPI_Op op = average == 1 ? MPI_SUM
|
||||
: average == 2 ? MPI_MIN
|
||||
: average == 3 ? MPI_MAX
|
||||
: average == 4 ? MPI_SUM
|
||||
: MPI_Op();
|
||||
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, ty, op, MPI_COMM_WORLD);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (average == 1)
|
||||
accumulator /= args->totalProcs;
|
||||
*value = accumulator;
|
||||
}
|
||||
|
||||
testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts)
|
||||
{
|
||||
if (in_place == 0) {
|
||||
return testInternalError;
|
||||
}
|
||||
size_t count = args->expectedBytes / sizeof(int);
|
||||
|
||||
int* dataHostRecv = new int[count];
|
||||
int* dataHostExpected = new int[count];
|
||||
CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost));
|
||||
CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost));
|
||||
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
if (dataHostRecv[i] != dataHostExpected[i]) {
|
||||
*wrongElts += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (args->reportErrors && *wrongElts) {
|
||||
(args->error)++;
|
||||
}
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t BenchTime(struct testArgs* args, int in_place)
|
||||
{
|
||||
size_t count = args->nbytes;
|
||||
|
||||
TESTCHECK(args->collTest->initData(args, in_place));
|
||||
// Sync
|
||||
TESTCHECK(startColl(args, in_place, 0));
|
||||
TESTCHECK(completeColl(args));
|
||||
|
||||
TESTCHECK(Barrier(args));
|
||||
|
||||
// Performance Benchmark
|
||||
cudaGraph_t graph;
|
||||
cudaGraphExec_t graphExec;
|
||||
CUDACHECK(cudaStreamBeginCapture(args->stream, cudaStreamCaptureModeGlobal));
|
||||
timer tim;
|
||||
for (int iter = 0; iter < iters; iter++) {
|
||||
TESTCHECK(startColl(args, in_place, iter));
|
||||
}
|
||||
CUDACHECK(cudaStreamEndCapture(args->stream, &graph));
|
||||
CUDACHECK(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0));
|
||||
|
||||
// Launch the graph
|
||||
TESTCHECK(Barrier(args));
|
||||
tim.reset();
|
||||
for (int l = 0; l < cudaGraphLaunches; ++l) {
|
||||
CUDACHECK(cudaGraphLaunch(graphExec, args->stream));
|
||||
}
|
||||
|
||||
double cputimeSec = tim.elapsed() / (iters);
|
||||
TESTCHECK(completeColl(args));
|
||||
|
||||
double deltaSec = tim.elapsed();
|
||||
deltaSec = deltaSec / (iters) / (cudaGraphLaunches);
|
||||
Allreduce(args, &deltaSec, average);
|
||||
|
||||
CUDACHECK(cudaGraphExecDestroy(graphExec));
|
||||
CUDACHECK(cudaGraphDestroy(graph));
|
||||
|
||||
double algBw, busBw;
|
||||
args->collTest->getBw(count, 1, deltaSec, &algBw, &busBw, args->totalProcs);
|
||||
TESTCHECK(Barrier(args));
|
||||
|
||||
int64_t wrongElts = 0;
|
||||
if (datacheck) {
|
||||
// Initialize sendbuffs, recvbuffs and expected
|
||||
TESTCHECK(args->collTest->initData(args, in_place));
|
||||
// Begin cuda graph capture for data check
|
||||
CUDACHECK(cudaStreamBeginCapture(args->stream, cudaStreamCaptureModeGlobal));
|
||||
// test validation in single itertion, should ideally be included into the multi-iteration run
|
||||
TESTCHECK(startColl(args, in_place, 0));
|
||||
// End cuda graph capture
|
||||
CUDACHECK(cudaStreamEndCapture(args->stream, &graph));
|
||||
// Instantiate cuda graph
|
||||
CUDACHECK(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0));
|
||||
// Launch cuda graph
|
||||
CUDACHECK(cudaGraphLaunch(graphExec, args->stream));
|
||||
|
||||
TESTCHECK(completeColl(args));
|
||||
|
||||
// destroy cuda graph
|
||||
CUDACHECK(cudaGraphExecDestroy(graphExec));
|
||||
CUDACHECK(cudaGraphDestroy(graph));
|
||||
|
||||
TESTCHECK(CheckData(args, in_place, &wrongElts));
|
||||
|
||||
// aggregate delta from all threads and procs
|
||||
long long wrongElts1 = wrongElts;
|
||||
Allreduce(args, &wrongElts1, /*sum*/ 4);
|
||||
wrongElts = wrongElts1;
|
||||
}
|
||||
|
||||
double timeUsec = (report_cputime ? cputimeSec : deltaSec) * 1.0E6;
|
||||
char timeStr[100];
|
||||
if (timeUsec >= 10000.0) {
|
||||
sprintf(timeStr, "%7.0f", timeUsec);
|
||||
} else if (timeUsec >= 100.0) {
|
||||
sprintf(timeStr, "%7.1f", timeUsec);
|
||||
} else {
|
||||
sprintf(timeStr, "%7.2f", timeUsec);
|
||||
}
|
||||
if (args->reportErrors) {
|
||||
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
|
||||
} else {
|
||||
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
|
||||
}
|
||||
|
||||
args->bw += busBw;
|
||||
args->bw_count++;
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void setupArgs(size_t size, struct testArgs* args)
|
||||
{
|
||||
int nranks = args->totalProcs;
|
||||
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
|
||||
|
||||
// TODO: support more data types
|
||||
int typeSize = sizeof(int);
|
||||
count = size / typeSize;
|
||||
args->collTest->getCollByteCount(&sendCount, &recvCount, ¶mCount, &sendInplaceOffset, &recvInplaceOffset,
|
||||
(size_t)count, (size_t)nranks);
|
||||
|
||||
args->nbytes = paramCount * typeSize;
|
||||
args->sendBytes = sendCount * typeSize;
|
||||
args->expectedBytes = recvCount * typeSize;
|
||||
args->sendInplaceOffset = sendInplaceOffset * typeSize;
|
||||
args->recvInplaceOffset = recvInplaceOffset * typeSize;
|
||||
}
|
||||
|
||||
testResult_t TimeTest(struct testArgs* args)
|
||||
{
|
||||
// Sync to avoid first-call timeout
|
||||
TESTCHECK(Barrier(args));
|
||||
|
||||
// Warm-up for large size
|
||||
setupArgs(args->maxbytes, args);
|
||||
TESTCHECK(args->collTest->initData(args, 1));
|
||||
for (int iter = 0; iter < warmup_iters; iter++) {
|
||||
TESTCHECK(startColl(args, 1, iter));
|
||||
}
|
||||
TESTCHECK(completeColl(args));
|
||||
|
||||
// Warm-up for small size
|
||||
setupArgs(args->minbytes, args);
|
||||
for (int iter = 0; iter < warmup_iters; iter++) {
|
||||
TESTCHECK(startColl(args, 1, iter));
|
||||
}
|
||||
TESTCHECK(completeColl(args));
|
||||
|
||||
PRINT("#\n");
|
||||
PRINT("# %10s %12s in-place out-of-place \n", "", "");
|
||||
PRINT("# %10s %12s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "time", "algbw", "busbw", "#wrong",
|
||||
"time", "algbw", "busbw", "#wrong");
|
||||
PRINT("# %10s %12s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "(us)", "(GB/s)", "(GB/s)", "",
|
||||
"(us)", "(GB/s)", "(GB/s)", "");
|
||||
// Benchmark
|
||||
for (size_t size = args->minbytes; size <= args->maxbytes;
|
||||
size = ((args->stepfactor > 1) ? size * args->stepfactor : size + args->stepbytes)) {
|
||||
setupArgs(size, args);
|
||||
PRINT("%12li %12li", max(args->sendBytes, args->expectedBytes), args->nbytes / sizeof(int));
|
||||
// Don't support out-of-place for now
|
||||
// TESTCHECK(BenchTime(args, 0));
|
||||
TESTCHECK(BenchTime(args, 1));
|
||||
PRINT("\n");
|
||||
}
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode, mscclppComm_t comm, void* dataDst,
|
||||
size_t dataSize)
|
||||
{
|
||||
int thisNode = rank / ranksPerNode;
|
||||
int localRank = rank % ranksPerNode;
|
||||
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
|
||||
|
||||
for (int r = 0; r < worldSize; ++r) {
|
||||
if (r == rank)
|
||||
continue;
|
||||
mscclppTransport_t transportType;
|
||||
const char* ibDev = ibDevStr.c_str();
|
||||
if (r / ranksPerNode == thisNode) {
|
||||
ibDev = NULL;
|
||||
transportType = mscclppTransportP2P;
|
||||
} else {
|
||||
transportType = mscclppTransportIB;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
MSCCLPPCHECK(mscclppConnect(comm, r, 0, dataDst, dataSize, transportType, ibDev));
|
||||
}
|
||||
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(comm));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t runTests(struct testArgs* args)
|
||||
{
|
||||
PRINT("# Setting up the connection in MSCCL++\n");
|
||||
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
|
||||
args->maxbytes));
|
||||
PRINT("# Launching MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(args->comm));
|
||||
TESTCHECK(mscclppTestEngine.runTest(args));
|
||||
PRINT("Stopping MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyStop(args->comm));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t run(); // Main function
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
// Make sure everyline is flushed so that we see the progress of the test
|
||||
setlinebuf(stdout);
|
||||
|
||||
// Parse args
|
||||
double parsed;
|
||||
int longindex;
|
||||
static struct option longopts[] = {{"minbytes", required_argument, 0, 'b'},
|
||||
{"maxbytes", required_argument, 0, 'e'},
|
||||
{"stepbytes", required_argument, 0, 'i'},
|
||||
{"stepfactor", required_argument, 0, 'f'},
|
||||
{"iters", required_argument, 0, 'n'},
|
||||
{"warmup_iters", required_argument, 0, 'w'},
|
||||
{"check", required_argument, 0, 'c'},
|
||||
{"timeout", required_argument, 0, 'T'},
|
||||
{"cudagraph", required_argument, 0, 'G'},
|
||||
{"report_cputime", required_argument, 0, 'C'},
|
||||
{"average", required_argument, 0, 'a'},
|
||||
{"kernel_num", required_argument, 0, 'k'},
|
||||
{"help", no_argument, 0, 'h'},
|
||||
{}};
|
||||
|
||||
while (1) {
|
||||
int c;
|
||||
c = getopt_long(argc, argv, "b:e:i:f:n:w:c:T:G:C:a:P:k:h:", longopts, &longindex);
|
||||
|
||||
if (c == -1)
|
||||
break;
|
||||
|
||||
switch (c) {
|
||||
case 'b':
|
||||
parsed = parsesize(optarg);
|
||||
if (parsed < 0) {
|
||||
fprintf(stderr, "invalid size specified for 'minbytes'\n");
|
||||
return -1;
|
||||
}
|
||||
minBytes = (size_t)parsed;
|
||||
break;
|
||||
case 'e':
|
||||
parsed = parsesize(optarg);
|
||||
if (parsed < 0) {
|
||||
fprintf(stderr, "invalid size specified for 'maxbytes'\n");
|
||||
return -1;
|
||||
}
|
||||
maxBytes = (size_t)parsed;
|
||||
break;
|
||||
case 'i':
|
||||
stepBytes = strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'f':
|
||||
stepFactor = strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'n':
|
||||
iters = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'w':
|
||||
warmup_iters = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'c':
|
||||
datacheck = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'T':
|
||||
timeout = strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'G':
|
||||
cudaGraphLaunches = strtol(optarg, NULL, 0);
|
||||
if (cudaGraphLaunches <= 0) {
|
||||
fprintf(stderr, "invalid number for 'cudaGraphLaunches'\n");
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'C':
|
||||
report_cputime = strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'a':
|
||||
average = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'k':
|
||||
kernel_num = (int)strtol(optarg, NULL, 0);
|
||||
break;
|
||||
case 'h':
|
||||
default:
|
||||
if (c != 'h')
|
||||
printf("invalid option '%c'\n", c);
|
||||
printf("USAGE: %s \n\t"
|
||||
"[-b,--minbytes <min size in bytes>] \n\t"
|
||||
"[-e,--maxbytes <max size in bytes>] \n\t"
|
||||
"[-i,--stepbytes <increment size>] \n\t"
|
||||
"[-f,--stepfactor <increment factor>] \n\t"
|
||||
"[-n,--iters <iteration count>] \n\t"
|
||||
"[-w,--warmup_iters <warmup iteration count>] \n\t"
|
||||
"[-c,--check <0/1>] \n\t"
|
||||
"[-T,--timeout <time in seconds>] \n\t"
|
||||
"[-G,--cudagraph <num graph launches>] \n\t"
|
||||
"[-C,--report_cputime <0/1>] \n\t"
|
||||
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
|
||||
"[-k,--kernel_num <kernel number of commnication primitive>] \n\t"
|
||||
"[-h,--help]\n",
|
||||
basename(argv[0]));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (minBytes > maxBytes) {
|
||||
fprintf(stderr, "invalid sizes for 'minbytes' and 'maxbytes': %llu > %llu\n", (unsigned long long)minBytes,
|
||||
(unsigned long long)maxBytes);
|
||||
return -1;
|
||||
}
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Init(&argc, &argv);
|
||||
#endif
|
||||
TESTCHECK(run());
|
||||
return 0;
|
||||
}
|
||||
|
||||
testResult_t run()
|
||||
{
|
||||
int totalProcs = 1, proc = 0;
|
||||
int nranksPerNode = 0, localRank = 0;
|
||||
char hostname[1024];
|
||||
getHostName(hostname, 1024);
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &totalProcs);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
|
||||
MPI_Comm shmcomm;
|
||||
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
|
||||
MPI_Comm_size(shmcomm, &nranksPerNode);
|
||||
MPI_Comm_free(&shmcomm);
|
||||
localRank = proc % nranksPerNode;
|
||||
#endif
|
||||
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
|
||||
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
|
||||
|
||||
PRINT("# minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d graph: %d, "
|
||||
"kernel num: %d\n",
|
||||
minBytes, maxBytes, (stepFactor > 1) ? stepFactor : stepBytes, (stepFactor > 1) ? "factor" : "bytes",
|
||||
warmup_iters, iters, datacheck, cudaGraphLaunches, kernel_num);
|
||||
PRINT("#\n");
|
||||
PRINT("# Using devices\n");
|
||||
|
||||
#define MAX_LINE 2048
|
||||
char line[MAX_LINE];
|
||||
int len = 0;
|
||||
size_t maxMem = ~0;
|
||||
|
||||
int cudaDev = localRank;
|
||||
int rank = proc;
|
||||
cudaDeviceProp prop;
|
||||
char busIdChar[] = "00000000:00:00.0";
|
||||
CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev));
|
||||
CUDACHECK(cudaDeviceGetPCIBusId(busIdChar, sizeof(busIdChar), cudaDev));
|
||||
len += snprintf(line + len, MAX_LINE - len, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n", rank, getpid(),
|
||||
hostname, cudaDev, busIdChar, prop.name);
|
||||
maxMem = std::min(maxMem, prop.totalGlobalMem);
|
||||
|
||||
#if MSCCLPP_USE_MPI_FOR_TESTS
|
||||
char* lines = (proc == 0) ? (char*)malloc(totalProcs * MAX_LINE) : NULL;
|
||||
// Gather all output in rank order to root (0)
|
||||
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
|
||||
if (proc == 0) {
|
||||
for (int p = 0; p < totalProcs; p++)
|
||||
PRINT("%s", lines + MAX_LINE * p);
|
||||
free(lines);
|
||||
}
|
||||
MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
|
||||
#else
|
||||
PRINT("%s", line);
|
||||
#endif
|
||||
// We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
|
||||
size_t memMaxBytes = (maxMem - (1 << 30)) / (datacheck ? 3 : 2);
|
||||
if (maxBytes > memMaxBytes) {
|
||||
maxBytes = memMaxBytes;
|
||||
if (proc == 0)
|
||||
printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
|
||||
}
|
||||
|
||||
cudaStream_t stream;
|
||||
void* sendbuff;
|
||||
void* recvbuff;
|
||||
void* expected;
|
||||
size_t sendBytes, recvBytes;
|
||||
|
||||
mscclppTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)totalProcs);
|
||||
|
||||
CUDACHECK(cudaSetDevice(cudaDev));
|
||||
TESTCHECK(AllocateBuffs(&sendbuff, sendBytes, &recvbuff, recvBytes, &expected, (size_t)maxBytes));
|
||||
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
|
||||
PRINT("#\n");
|
||||
PRINT("# Initializing MSCCL++\n");
|
||||
|
||||
mscclppUniqueId mscclppId;
|
||||
if (proc == 0)
|
||||
MSCCLPPCHECK(mscclppGetUniqueId(&mscclppId));
|
||||
MPI_Bcast((void*)&mscclppId, sizeof(mscclppId), MPI_BYTE, 0, MPI_COMM_WORLD);
|
||||
mscclppComm_t comm;
|
||||
MSCCLPPCHECK(mscclppCommInitRankFromId(&comm, totalProcs, mscclppId, rank));
|
||||
|
||||
double* delta;
|
||||
CUDACHECK(cudaHostAlloc(&delta, sizeof(double) * NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped));
|
||||
|
||||
fflush(stdout);
|
||||
|
||||
struct testWorker worker;
|
||||
|
||||
worker.args.minbytes = minBytes;
|
||||
worker.args.maxbytes = maxBytes;
|
||||
worker.args.stepbytes = stepBytes;
|
||||
worker.args.stepfactor = stepFactor;
|
||||
worker.args.localRank = localRank;
|
||||
worker.args.nranksPerNode = nranksPerNode;
|
||||
|
||||
worker.args.totalProcs = totalProcs;
|
||||
worker.args.proc = proc;
|
||||
worker.args.gpuNum = cudaDev;
|
||||
worker.args.kernel_num = kernel_num;
|
||||
worker.args.sendbuff = sendbuff;
|
||||
worker.args.recvbuff = recvbuff;
|
||||
worker.args.expected = expected;
|
||||
worker.args.comm = comm;
|
||||
worker.args.stream = stream;
|
||||
|
||||
worker.args.error = 0;
|
||||
worker.args.bw = 0.0;
|
||||
worker.args.bw_count = 0;
|
||||
|
||||
worker.args.reportErrors = datacheck;
|
||||
|
||||
worker.func = runTests;
|
||||
TESTCHECK(worker.func(&worker.args));
|
||||
|
||||
MSCCLPPCHECK(mscclppCommDestroy(comm));
|
||||
|
||||
// Free off CUDA allocated memory
|
||||
if (sendbuff)
|
||||
CUDACHECK(cudaFree((char*)sendbuff));
|
||||
if (recvbuff)
|
||||
CUDACHECK(cudaFree((char*)recvbuff));
|
||||
if (datacheck)
|
||||
CUDACHECK(cudaFree(expected));
|
||||
CUDACHECK(cudaFreeHost(delta));
|
||||
|
||||
int error = worker.args.error;
|
||||
PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK");
|
||||
PRINT("#\n");
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Finalize();
|
||||
#endif
|
||||
return testSuccess;
|
||||
}
|
||||
166
tests/common.h
166
tests/common.h
@@ -1,14 +1,144 @@
|
||||
/*************************************************************************
|
||||
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
|
||||
*
|
||||
* See LICENSE.txt for license information
|
||||
************************************************************************/
|
||||
|
||||
#ifndef MSCCLPP_TESTS_COMMON_H_
|
||||
#define MSCCLPP_TESTS_COMMON_H_
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "mscclpp.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include "mpi.h"
|
||||
#include <mpi.h>
|
||||
#endif // MSCCLPP_USE_MPI_FOR_TESTS
|
||||
|
||||
void print_usage(const char* prog)
|
||||
#define CUDACHECK(cmd) \
|
||||
do { \
|
||||
cudaError_t err = cmd; \
|
||||
if (err != cudaSuccess) { \
|
||||
char hostname[1024]; \
|
||||
getHostName(hostname, 1024); \
|
||||
printf("%s: Test CUDA failure %s:%d '%s'\n", hostname, __FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
return testCudaError; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
// Propagate errors up
|
||||
#define MSCCLPPCHECK(cmd) \
|
||||
do { \
|
||||
mscclppResult_t res = cmd; \
|
||||
if (res != mscclppSuccess && res != mscclppInProgress) { \
|
||||
char hostname[1024]; \
|
||||
getHostName(hostname, 1024); \
|
||||
printf("%s: Failure at %s:%d -> %s\n", hostname, __FILE__, __LINE__, mscclppGetErrorString(res)); \
|
||||
return testMcclppError; \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
// Relay errors up and trace
|
||||
#define TESTCHECK(cmd) \
|
||||
do { \
|
||||
testResult_t r = cmd; \
|
||||
if (r != testSuccess) { \
|
||||
char hostname[1024]; \
|
||||
getHostName(hostname, 1024); \
|
||||
printf(" .. %s pid %d: Test failure %s:%d\n", hostname, getpid(), __FILE__, __LINE__); \
|
||||
return r; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef enum
|
||||
{
|
||||
testSuccess = 0,
|
||||
testInternalError = 1,
|
||||
testCudaError = 2,
|
||||
testMcclppError = 3,
|
||||
testTimeout = 4,
|
||||
testNumResults = 5
|
||||
} testResult_t;
|
||||
|
||||
struct testColl
|
||||
{
|
||||
const char name[20];
|
||||
void (*getCollByteCount)(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks);
|
||||
testResult_t (*initData)(struct testArgs* args, int in_place);
|
||||
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
|
||||
testResult_t (*runColl)(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
|
||||
cudaStream_t stream, int kernel_num);
|
||||
};
|
||||
|
||||
struct testEngine
|
||||
{
|
||||
void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks);
|
||||
// We can add more parameters for other communication primitives
|
||||
testResult_t (*runTest)(struct testArgs* args);
|
||||
};
|
||||
|
||||
extern struct testEngine mscclppTestEngine;
|
||||
|
||||
struct testArgs
|
||||
{
|
||||
size_t nbytes;
|
||||
size_t minbytes;
|
||||
size_t maxbytes;
|
||||
size_t stepbytes;
|
||||
size_t stepfactor;
|
||||
|
||||
int totalProcs;
|
||||
int proc;
|
||||
int gpuNum;
|
||||
int localRank;
|
||||
int nranksPerNode;
|
||||
int kernel_num;
|
||||
void* sendbuff;
|
||||
size_t sendBytes;
|
||||
size_t sendInplaceOffset;
|
||||
void* recvbuff;
|
||||
size_t recvInplaceOffset;
|
||||
mscclppComm_t comm;
|
||||
cudaStream_t stream;
|
||||
|
||||
void* expected;
|
||||
size_t expectedBytes;
|
||||
int error;
|
||||
double bw;
|
||||
int bw_count;
|
||||
|
||||
int reportErrors;
|
||||
|
||||
struct testColl* collTest;
|
||||
};
|
||||
|
||||
typedef testResult_t (*entryFunc_t)(struct testArgs* args);
|
||||
struct testWorker
|
||||
{
|
||||
entryFunc_t func;
|
||||
struct testArgs args;
|
||||
};
|
||||
|
||||
// Provided by common.cu
|
||||
extern testResult_t TimeTest(struct testArgs* args);
|
||||
|
||||
static void getHostName(char* hostname, int maxlen)
|
||||
{
|
||||
gethostname(hostname, maxlen);
|
||||
for (int i = 0; i < maxlen; i++) {
|
||||
if (hostname[i] == '.') {
|
||||
hostname[i] = '\0';
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline void print_usage(const char* prog)
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
printf("usage: %s IP:PORT [rank nranks]\n", prog);
|
||||
@@ -17,30 +147,8 @@ void print_usage(const char* prog)
|
||||
#endif
|
||||
}
|
||||
|
||||
void parse_arguments(int argc, const char* argv[], const char** ip_port, int* rank, int* world_size)
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
if (argc != 2 && argc != 4) {
|
||||
print_usage(argv[0]);
|
||||
exit(-1);
|
||||
}
|
||||
*ip_port = argv[1];
|
||||
if (argc == 4) {
|
||||
*rank = atoi(argv[2]);
|
||||
*world_size = atoi(argv[3]);
|
||||
} else {
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, world_size);
|
||||
}
|
||||
#else
|
||||
if (argc != 4) {
|
||||
print_usage(argv[0]);
|
||||
exit(-1);
|
||||
}
|
||||
*ip_port = argv[1];
|
||||
*rank = atoi(argv[2]);
|
||||
*world_size = atoi(argv[3]);
|
||||
#endif
|
||||
}
|
||||
#define PRINT \
|
||||
if (is_main_thread) \
|
||||
printf
|
||||
|
||||
#endif // MSCCLPP_TESTS_COMMON_H_
|
||||
Reference in New Issue
Block a user