Merge branch 'main' into binyli/mscclpp-test

This commit is contained in:
Saeed Maleki
2023-04-08 06:30:48 +00:00
5 changed files with 225 additions and 10 deletions

View File

@@ -8,6 +8,7 @@ DEBUG ?= 0
VERBOSE ?= 1
TRACE ?= 0
NPKIT ?= 0
GDRCOPY ?= 0
USE_MPI_FOR_TESTS ?= 1
######## CUDA
@@ -57,10 +58,8 @@ CXXFLAGS := -DCUDA_MAJOR=$(CUDA_MAJOR) -DCUDA_MINOR=$(CUDA_MINOR) -fPIC -fvisi
ifneq ($(TRACE), 0)
CXXFLAGS += -DENABLE_TRACE
endif
# Maxrregcount needs to be set accordingly to MSCCLPP_MAX_NTHREADS (otherwise it will cause kernel launch errors)
# 512 : 120, 640 : 96, 768 : 80, 1024 : 60
# We would not have to set this if we used __launch_bounds__, but this only works on kernels, not on functions.
NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11 --expt-extended-lambda -Xptxas -maxrregcount=96 -Xfatbin -compress-all
NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11 --expt-extended-lambda -Xfatbin -compress-all
# Use addprefix so that we can specify more than one path
NVLDFLAGS := -L$(CUDA_LIB) -lcudart -lrt
@@ -96,6 +95,15 @@ MPI_LDFLAGS :=
MPI_MACRO :=
endif
#### GDRCOPY
ifeq ($(GDRCOPY), 1)
GDRCOPY_LDFLAGS := -lgdrapi
CXXFLAGS += -DMSCCLPP_USE_GDRCOPY
NVCUFLAGS += -DMSCCLPP_USE_GDRCOPY
else
GDRCOPY_LDFLAGS :=
endif
#### MSCCL++
BUILDDIR ?= $(abspath ./build)
INCDIR := include
@@ -108,18 +116,22 @@ CXXFLAGS += -DENABLE_NPKIT
NVCUFLAGS += -DENABLE_NPKIT
endif
LDFLAGS := $(NVLDFLAGS) -libverbs -lnuma
LDFLAGS := $(NVLDFLAGS) $(GDRCOPY_LDFLAGS) -libverbs -lnuma
LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc init.cc proxy.cc ib.cc config.cc)
LIBSRCS += $(addprefix src/bootstrap/,bootstrap.cc socket.cc)
ifneq ($(NPKIT), 0)
LIBSRCS += $(addprefix src/misc/,npkit.cc)
endif
ifeq ($(GDRCOPY), 1)
LIBSRCS += $(addprefix src/,gdr.cc)
endif
LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS))
LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
HEADERS := $(wildcard src/include/*.h)
CPPSOURCES := $(shell find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "*/build/*")
CPPSOURCES := $(shell find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*")
PYTHONCPPSOURCES := $(shell find ./python/src/ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)')
INCEXPORTS := mscclpp.h mscclppfifo.h
INCTARGETS := $(INCEXPORTS:%=$(BUILDDIR)/$(INCDIR)/%)
@@ -128,6 +140,12 @@ LIBNAME := libmscclpp.so
LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR)
LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR).$(MSCCLPP_PATCH)
UTDIR := tests/unittests
UTSRCS := $(addprefix $(UTDIR)/,ib_test.cc)
UTOBJS := $(patsubst %.cc,%.o,$(UTSRCS))
UTOBJTARGETS := $(UTOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
UTBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(UTOBJS))
TESTSDIR := tests
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc allgather_test_standalone.cu)
TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS))
@@ -148,15 +166,19 @@ build: lib tests mscclpp-test
lib: $(LIBOBJTARGETS) $(INCTARGETS) $(LIBTARGET)
tests: $(TESTSBINS)
unittests: $(UTBINS)
tests: unittests $(TESTSBINS)
mscclpp-test: $(LIBTARGET) $(MSCLLPPTESTBINS)
cpplint:
clang-format-12 -style=file --verbose --Werror --dry-run $(CPPSOURCES)
clang-format-12 --dry-run $(CPPSOURCES)
cpplint-autofix:
clang-format-12 -style=file --verbose --Werror -i $(CPPSOURCES)
clang-format-12 -i $(PYTHONCPPSOURCES)
# Run cpplint on a single file, example: make cpplint-file-autofix INPUTFILE=src/bootstrap/bootstrap.cc
cpplint-file-autofix:
@@ -167,6 +189,11 @@ $(BUILDDIR)/$(OBJDIR)/%.o: %.cc $(HEADERS)
@mkdir -p $(@D)
$(CXX) -o $@ $(INCLUDE) $(CXXFLAGS) -c $<
# Compile utobjs
$(BUILDDIR)/$(OBJDIR)/$(UTDIR)/%.o: $(UTDIR)/%.cc $(HEADERS)
@mkdir -p $(@D)
$(CXX) -o $@ $(INCLUDE) $(CXXFLAGS) -c $<
$(BUILDDIR)/$(INCDIR)/%.h: src/$(INCDIR)/%.h
@mkdir -p $(@D)
cp $< $@
@@ -177,6 +204,11 @@ $(LIBTARGET): $(LIBOBJTARGETS)
ln -sf $(LIBTARGET) $(BUILDDIR)/$(LIBDIR)/$(LIBNAME)
ln -sf $(LIBTARGET) $(BUILDDIR)/$(LIBDIR)/$(LIBSONAME)
# UT bins
$(BUILDDIR)/$(BINDIR)/$(UTDIR)/%: $(BUILDDIR)/$(OBJDIR)/$(UTDIR)/%.o $(LIBOBJTARGETS)
@mkdir -p $(@D)
$(NVCC) -o $@ $+ $(MPI_LDFLAGS) $(LDFLAGS)
# Compile .cc tests
$(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cc $(INCTARGETS)
@mkdir -p $(@D)
@@ -188,7 +220,7 @@ $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cu $(INCTARGETS)
$(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) $(INCLUDE) -c $< $(MPI_MACRO)
# Test bins
$(BUILDDIR)/$(BINDIR)/%: $(BUILDDIR)/$(OBJDIR)/%.o $(LIBTARGET)
$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%: $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o $(LIBTARGET)
@mkdir -p $(@D)
$(NVCC) -o $@ $< $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp

View File

@@ -23,11 +23,19 @@ struct mscclppProxyState
// fifo cudaHostCalloc'ed that is produced by device and consumed by host
mscclppTrigger* triggerFifo;
#if defined(MSCCLPP_USE_GDRCOPY)
mscclppTrigger* triggerFifoDev;
void* triggerFifoDesc;
#endif
// allocated on the device and only accessed by the device
uint64_t* fifoHead;
// allocated on the device. Read-only by device, write-only by host
uint64_t* fifoTailDev;
#if defined(MSCCLPP_USE_GDRCOPY)
uint64_t* fifoTailDevHostPtr;
void* fifoTailDesc;
#endif
// allocated on the host. Only accessed by the host. This is a copy of the
// value pointed to by fifoTailDev and the invariant is that
// *fifoTailDev <= fifoTailHost. Meaning that host's copy of tail is

View File

@@ -1,6 +1,9 @@
#include "bootstrap.h"
#include "config.h"
#include "core.h"
#if defined(MSCCLPP_USE_GDRCOPY)
#include "gdr.h"
#endif
#include "mscclpp.h"
#include <map>
#include <sstream>
@@ -24,6 +27,24 @@ pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
static bool initialized = false;
// static size_t maxLocalSizeBytes = 0;
#if defined(MSCCLPP_USE_GDRCOPY)
gdr_t mscclppGdrCopy = NULL;
mscclppResult_t initGdrCopy()
{
if (mscclppGdrCopy == NULL) {
mscclppGdrCopy = mscclppGdrInit();
if (mscclppGdrCopy == NULL) {
WARN("GDR init failed");
return mscclppSystemError;
}
}
return mscclppSuccess;
}
#endif
static mscclppResult_t mscclppInit()
{
if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE))
@@ -66,6 +87,10 @@ mscclppResult_t mscclppBootstrapAllGather(mscclppComm_t comm, void* data, int si
MSCCLPP_API(mscclppResult_t, mscclppCommInitRank, mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank);
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, const char* ipPortPair, int rank)
{
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(initGdrCopy());
#endif
mscclppResult_t res = mscclppSuccess;
mscclppComm_t _comm = NULL;
// uint64_t hash = getHostHash();
@@ -108,6 +133,10 @@ fail:
MSCCLPP_API(mscclppResult_t, mscclppCommInitRankFromId, mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank);
mscclppResult_t mscclppCommInitRankFromId(mscclppComm_t* comm, int nranks, mscclppUniqueId id, int rank)
{
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(initGdrCopy());
#endif
mscclppResult_t res = mscclppSuccess;
mscclppComm_t _comm = NULL;
mscclppBootstrapHandle* handle = (mscclppBootstrapHandle*)&id;
@@ -160,10 +189,17 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm)
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState* proxyState = comm->proxyState[i];
if (proxyState) {
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(mscclppGdrCudaFree(proxyState->triggerFifoDesc));
#else
MSCCLPPCHECK(mscclppCudaHostFree(proxyState->triggerFifo));
#endif
MSCCLPPCHECK(mscclppCudaFree(proxyState->fifoHead));
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(mscclppGdrCudaFree(proxyState->fifoTailDesc));
#else
MSCCLPPCHECK(mscclppCudaFree(proxyState->fifoTailDev));
#endif
if (proxyState->p2pStream)
CUDACHECK(cudaStreamDestroy(proxyState->p2pStream));
CUDACHECK(cudaStreamDestroy(proxyState->fifoStream));
@@ -344,10 +380,19 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void
// If we couldn't find a matching context, create one
if (proxyState == NULL) {
MSCCLPPCHECK(mscclppCalloc(&proxyState, 1));
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->triggerFifo, &proxyState->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE,
&proxyState->triggerFifoDesc));
#else
MSCCLPPCHECK(mscclppCudaHostCalloc(&proxyState->triggerFifo, MSCCLPP_PROXY_FIFO_SIZE));
#endif
MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->fifoHead, 1));
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(
mscclppGdrCudaCalloc(&proxyState->fifoTailDevHostPtr, &proxyState->fifoTailDev, 1, &proxyState->fifoTailDesc));
#else
MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->fifoTailDev, 1));
#endif
proxyState->fifoTailHost = 0;
if (transportType == mscclppTransportIB) {
@@ -379,7 +424,11 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void
conn->devConn->remoteRank = remoteRank;
conn->devConn->tag = tag;
conn->devConn->fifo.connId = comm->nConns;
#if defined(MSCCLPP_USE_GDRCOPY)
conn->devConn->fifo.triggerFifo = proxyState->triggerFifoDev;
#else
conn->devConn->fifo.triggerFifo = proxyState->triggerFifo;
#endif
conn->devConn->fifo.triggerFifoHead = proxyState->fifoHead;
conn->devConn->fifo.triggerFifoTail = proxyState->fifoTailDev;

View File

@@ -106,12 +106,18 @@ void* mscclppProxyService(void* _args)
volatile mscclppProxyRunState_t* run = &args->proxyState->run;
mscclppTrigger* fifo = args->proxyState->triggerFifo;
uint64_t* fifoTail = &args->proxyState->fifoTailHost;
#if defined(MSCCLPP_USE_GDRCOPY)
volatile uint64_t* fifoTailDevPtr = args->proxyState->fifoTailDevHostPtr;
#else
uint64_t* fifoTailDevPtr = args->proxyState->fifoTailDev;
#endif
uint64_t fifoTailCached = *fifoTail;
mscclppTrigger trigger;
mscclppIbContext* ibCtx = args->proxyState->ibContext;
cudaStream_t p2pStream = args->proxyState->p2pStream;
#if !defined(MSCCLPP_USE_GDRCOPY)
cudaStream_t fifoStream = args->proxyState->fifoStream;
#endif
bool isP2pProxy = (ibCtx == nullptr);
free(_args); // allocated in mscclppProxyCreate
@@ -210,16 +216,24 @@ void* mscclppProxyService(void* _args)
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
if (((fifoTailCached % MSCCLPP_PROXY_FIFO_FLUSH_COUNTER) == 0) || (trigger.fields.type & mscclppSync)) {
#if defined(MSCCLPP_USE_GDRCOPY)
*fifoTailDevPtr = fifoTailCached;
#else
PROXYCUDACHECK(
cudaMemcpyAsync(fifoTailDevPtr, &fifoTailCached, sizeof(uint64_t), cudaMemcpyHostToDevice, fifoStream));
#endif
}
}
*fifoTail = fifoTailCached;
// make sure the tail is flushed before we shut the proxy
#if defined(MSCCLPP_USE_GDRCOPY)
*fifoTailDevPtr = fifoTailCached;
#else
PROXYCUDACHECK(
cudaMemcpyAsync(fifoTailDevPtr, &fifoTailCached, sizeof(uint64_t), cudaMemcpyHostToDevice, fifoStream));
PROXYCUDACHECK(cudaStreamSynchronize(fifoStream));
#endif
if (isP2pProxy) {
PROXYCUDACHECK(cudaStreamSynchronize(p2pStream));
}

112
tests/unittests/ib_test.cc Normal file
View File

@@ -0,0 +1,112 @@
#include "alloc.h"
#include "checks.h"
#include "ib.h"
#include <set>
#include <string>
// Measure current time in second.
static double getTime(void)
{
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
exit(EXIT_FAILURE);
}
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}
// Example usage:
// Receiver: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 0 0 0
// Sender: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 1 0 0
int main(int argc, const char* argv[])
{
if (argc != 5) {
printf("Usage: %s <ip:port> <0(recv)/1(send)> <gpu id> <ib id>\n", argv[0]);
return 1;
}
const char* ip_port = argv[1];
int is_send = atoi(argv[2]);
int cudaDevId = atoi(argv[3]);
std::string ibDevName = "mlx5_ib" + std::string(argv[4]);
CUDACHECK(cudaSetDevice(cudaDevId));
int* data;
int nelem = 1;
MSCCLPPCHECK(mscclppCudaCalloc(&data, nelem));
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRank(&comm, 2, ip_port, is_send));
struct mscclppIbContext* ctx;
struct mscclppIbQp* qp;
struct mscclppIbMr* mr;
MSCCLPPCHECK(mscclppIbContextCreate(&ctx, ibDevName.c_str()));
MSCCLPPCHECK(mscclppIbContextCreateQp(ctx, &qp));
MSCCLPPCHECK(mscclppIbContextRegisterMr(ctx, data, sizeof(int) * nelem, &mr));
struct mscclppIbQpInfo* qpInfo;
MSCCLPPCHECK(mscclppCalloc(&qpInfo, 2));
qpInfo[is_send] = qp->info;
struct mscclppIbMrInfo* mrInfo;
MSCCLPPCHECK(mscclppCalloc(&mrInfo, 2));
mrInfo[is_send] = mr->info;
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, qpInfo, sizeof(struct mscclppIbQpInfo)));
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, mrInfo, sizeof(struct mscclppIbMrInfo)));
for (int i = 0; i < 2; ++i) {
if (i == is_send)
continue;
qp->rtr(&qpInfo[i]);
qp->rts();
break;
}
printf("connection succeed\n");
// A simple barrier
int* tmp;
MSCCLPPCHECK(mscclppCalloc(&tmp, 2));
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
if (is_send) {
int maxIter = 100000;
double start = getTime();
for (int iter = 0; iter < maxIter; ++iter) {
qp->stageSend(mr, &mrInfo[0], sizeof(int) * nelem, 0, 0, 0, true);
if (qp->postSend() != 0) {
WARN("postSend failed");
return 1;
}
bool waiting = true;
while (waiting) {
int wcNum = qp->pollCq();
if (wcNum < 0) {
WARN("pollCq failed: errno %d", errno);
return 1;
}
for (int i = 0; i < wcNum; ++i) {
struct ibv_wc* wc = &qp->wcs[i];
if (wc->status != IBV_WC_SUCCESS) {
WARN("wc status %d", wc->status);
return 1;
}
waiting = false;
break;
}
}
}
// TODO(chhwang): print detailed stats such as avg, 99%p, etc.
printf("%f us/iter\n", (getTime() - start) / maxIter * 1e6);
}
// A simple barrier
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
MSCCLPPCHECK(mscclppIbContextDestroy(ctx));
MSCCLPPCHECK(mscclppCommDestroy(comm));
return 0;
}