diff --git a/Makefile b/Makefile index cd5c4ddc..e1c4e595 100644 --- a/Makefile +++ b/Makefile @@ -101,9 +101,9 @@ LIBDIR := lib OBJDIR := obj BINDIR := bin -LDFLAGS := $(NVLDFLAGS) -libverbs +LDFLAGS := $(NVLDFLAGS) -libverbs -lgdrapi -LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc) +LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc gdr.cc) LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc ib.cc) LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS)) LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) diff --git a/src/bootstrap/init.cc b/src/bootstrap/init.cc index 0a649c1c..88cdf221 100644 --- a/src/bootstrap/init.cc +++ b/src/bootstrap/init.cc @@ -1,6 +1,7 @@ #include "mscclpp.h" #include "bootstrap.h" #include "core.h" +#include "gdr.h" #include #include @@ -19,6 +20,17 @@ pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; static bool initialized = false; // static size_t maxLocalSizeBytes = 0; +gdr_t mscclppGdrCopy = NULL; + +mscclppResult_t initGdrCopy() { + mscclppGdrCopy = mscclppGdrInit(); + if (mscclppGdrCopy == NULL) { + WARN("GDR init failed"); + return mscclppSystemError; + } + return mscclppSuccess; +} + static mscclppResult_t mscclppInit() { if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE)) return mscclppSuccess; pthread_mutex_lock(&initLock); @@ -61,7 +73,11 @@ mscclppResult_t mscclppBootStrapAllGather(mscclppComm_t comm, void* data, int si } MSCCLPP_API(mscclppResult_t, mscclppCommInitRank, mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair); -mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair){ +mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair) { + if (mscclppGdrCopy == NULL) { + MSCCLPPCHECK(initGdrCopy()); + } + mscclppResult_t res = mscclppSuccess; mscclppComm_t _comm = NULL; // uint64_t hash = getHostHash(); @@ -162,7 +178,7 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i conn->devConn->localBuff = localBuff; conn->devConn->localFlag = localFlag; conn->devConn->tag = tag; - MSCCLPPCHECK(mscclppCudaHostCalloc(&conn->devConn->trigger, 1)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuTrigger, &conn->devConn->trigger, 1, &conn->cpuTriggerGdrDesc)); conn->ibCtx = NULL; conn->ibQp = NULL; @@ -233,7 +249,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output } struct mscclppDevConn *devConn = conn->devConn; devConn->remoteBuff = NULL; - MSCCLPPCHECK(mscclppCudaCalloc(&devConn->remoteFlag, 1)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuRemoteFlag, &devConn->remoteFlag, 1, &conn->cpuRemoteFlagGdrDesc)); struct mscclppIbContext *ibCtx = conn->ibCtx; if (conn->ibQp == NULL) { diff --git a/src/bootstrap/proxy.cc b/src/bootstrap/proxy.cc index 56e9467d..85d7c990 100644 --- a/src/bootstrap/proxy.cc +++ b/src/bootstrap/proxy.cc @@ -8,7 +8,7 @@ #include #include -#define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 1 +#define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 0 struct proxyArgs { struct mscclppComm* comm; @@ -35,7 +35,7 @@ void* mscclppProxyService(void* _args) { struct mscclppConn *conn = &comm->conns[i]; if (conn->transport != mscclppTransportIB) continue; if (conn->ibCtx != ibCtx) continue; - volatile uint64_t *tmp = (volatile uint64_t *)conn->devConn->trigger; + volatile uint64_t *tmp = (volatile uint64_t *)conn->cpuTrigger; trigToSendStateAndConn[tmp].first = SEND_STATE_INIT; trigToSendStateAndConn[tmp].second = conn; qpNumToConn[conn->ibQp->qp->qp_num] = conn; @@ -83,18 +83,18 @@ void* mscclppProxyService(void* _args) { } struct mscclppConn *conn = qpNumToConn[wc->qp_num]; if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { +#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA != 1) + // TODO(chhwang): cpu flush + *((volatile int *)conn->cpuRemoteFlag) = 1; +#endif // recv completion if (qpNumToConn[wc->qp_num]->ibQp->postRecv(wc->wr_id) != 0) { WARN("postRecv failed: errno %d", errno); } -#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA != 1) - // TODO(chhwang): gdc & cpu flush - // *((volatile int *)conn->devConn->remoteFlag) = 1; -#endif // WARN("rank %d recv completion", rank); } else if (wc->opcode == IBV_WC_RDMA_WRITE) { // send completion - volatile uint64_t *tmp = (volatile uint64_t *)conn->devConn->trigger; + volatile uint64_t *tmp = (volatile uint64_t *)conn->cpuTrigger; *tmp = 0; trigToSendStateAndConn[tmp].first = SEND_STATE_INIT; // WARN("rank %d send completion", rank); diff --git a/src/gdr.cc b/src/gdr.cc new file mode 100644 index 00000000..b1033331 --- /dev/null +++ b/src/gdr.cc @@ -0,0 +1,67 @@ +#include "gdr.h" + +// Used to make the GDR library calls thread safe +pthread_mutex_t gdrLock = PTHREAD_MUTEX_INITIALIZER; + +gdr_t wrap_gdr_open(void) { + return gdr_open(); +} + +mscclppResult_t wrap_gdr_close(gdr_t g) { + int ret = gdr_close(g); + if (ret != 0) { + WARN("gdr_close() failed: %d", ret); + return mscclppSystemError; + } + return mscclppSuccess; +} + +mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle) { + int ret; + GDRLOCKCALL(gdr_pin_buffer(g, addr, size, p2p_token, va_space, handle), ret); + if (ret != 0) { + WARN("gdr_pin_buffer(addr %lx, size %zi) failed: %d", addr, size, ret); + return mscclppSystemError; + } + return mscclppSuccess; +} + +mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle) { + int ret; + GDRLOCKCALL(gdr_unpin_buffer(g, handle), ret); + if (ret != 0) { + WARN("gdr_unpin_buffer(handle %lx) failed: %d", handle.h, ret); + return mscclppSystemError; + } + return mscclppSuccess; +} + +mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info) { + int ret; + GDRLOCKCALL(gdr_get_info(g, handle, info), ret); + if (ret != 0) { + WARN("gdr_get_info(handle %lx) failed: %d", handle.h, ret); + return mscclppSystemError; + } + return mscclppSuccess; +} + +mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size) { + int ret; + GDRLOCKCALL(gdr_map(g, handle, va, size), ret); + if (ret != 0) { + WARN("gdr_map(handle %lx, size %zi) failed: %d", handle.h, size, ret); + return mscclppSystemError; + } + return mscclppSuccess; +} + +mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size) { + int ret; + GDRLOCKCALL(gdr_unmap(g, handle, va, size), ret); + if (ret != 0) { + WARN("gdr_unmap(handle %lx, va %p, size %zi) failed: %d", handle.h, va, size, ret); + return mscclppSystemError; + } + return mscclppSuccess; +} diff --git a/src/include/comm.h b/src/include/comm.h index af9622c8..50596038 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -162,6 +162,10 @@ struct mscclppConn { mscclppTransport_t transport; int remoteRank; int buffSize; + mscclppTrigger *cpuTrigger; + int *cpuRemoteFlag; + void *cpuTriggerGdrDesc; + void *cpuRemoteFlagGdrDesc; struct mscclppDevConn *devConn; struct mscclppIbContext *ibCtx; struct mscclppIbQp *ibQp; diff --git a/src/include/gdr.h b/src/include/gdr.h new file mode 100644 index 00000000..31db1a5d --- /dev/null +++ b/src/include/gdr.h @@ -0,0 +1,149 @@ +#ifndef MSCCLPP_GDR_H_ +#define MSCCLPP_GDR_H_ + +#include "gdrapi.h" +#include "debug.h" +#include "checks.h" +#include "align.h" +#include "alloc.h" + +// These can be used if the GDR library isn't thread safe +#include +extern pthread_mutex_t gdrLock; +#define GDRLOCK() pthread_mutex_lock(&gdrLock) +#define GDRUNLOCK() pthread_mutex_unlock(&gdrLock) +#define GDRLOCKCALL(cmd, ret) do { \ + GDRLOCK(); \ + ret = cmd; \ + GDRUNLOCK(); \ +} while(false) + +#define GDRCHECK(cmd) do { \ + int e; \ + /* GDRLOCKCALL(cmd, e); */ \ + e = cmd; \ + if( e != 0 ) { \ + WARN("GDRCOPY failure %d", e); \ + return mscclppSystemError; \ + } \ +} while(false) + +gdr_t wrap_gdr_open(void); +mscclppResult_t wrap_gdr_close(gdr_t g); +mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle); +mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle); +mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info); +mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size); +mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size); + +// Global GDR driver handle +extern gdr_t mscclppGdrCopy; + +typedef struct gdr_mem_desc { + void *gdrDevMem; + void *gdrMap; + size_t gdrOffset; + size_t gdrMapSize; + gdr_mh_t gdrMh; +} gdr_mem_desc_t; + +static gdr_t mscclppGdrInit() { + // int libMajor, libMinor, drvMajor, drvMinor; + gdr_t handle = wrap_gdr_open(); + + // if (handle != NULL) { + // mscclppResult_t res; + + // // Query the version of libgdrapi + // MSCCLPPCHECKGOTO(wrap_gdr_runtime_get_version(&libMajor, &libMinor), res, error); + + // // Query the version of gdrdrv driver + // MSCCLPPCHECKGOTO(wrap_gdr_driver_get_version(handle, &drvMajor, &drvMinor), res, error); + + // // Only support GDRAPI 2.1 and later + // if (libMajor < 2 || (libMajor == 2 && libMinor < 1) || drvMajor < 2 || (drvMajor == 2 && drvMinor < 1)) { + // goto error; + // } + // else + // INFO(MSCCLPP_INIT, "GDRCOPY enabled library %d.%d driver %d.%d", libMajor, libMinor, drvMajor, drvMinor); + // } + return handle; +// error: +// if (handle != NULL) (void) wrap_gdr_close(handle); +// return NULL; +} + +template +mscclppResult_t mscclppGdrCudaCallocDebug(T** ptr, T** devPtr, size_t nelem, void** gdrDesc, const char *filefunc, int line) { + mscclppResult_t result = mscclppSuccess; + cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed; + *ptr = nullptr; + *devPtr = nullptr; + *gdrDesc = nullptr; + CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode)); + + gdr_info_t info; + size_t mapSize; + gdr_mh_t mh; + char *devMem; + void *gdrMap; + ssize_t off; + gdr_mem_desc_t* md; + uint64_t alignedAddr; + size_t align; + + mapSize = sizeof(T)*nelem; + + // GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE + ALIGN_SIZE(mapSize, GPU_PAGE_SIZE); + // GDRCOPY Pinned buffer has to be GPU_PAGE_SIZE aligned too + MSCCLPPCHECKGOTO(mscclppCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1), result, finish); + alignedAddr = (((uint64_t) devMem) + GPU_PAGE_OFFSET) & GPU_PAGE_MASK; + align = alignedAddr - (uint64_t)devMem; + WARN("GDR: mscclppGdrCopy %p alignedAddr %p, mapSize %lu", mscclppGdrCopy, (void*)alignedAddr, mapSize); + MSCCLPPCHECKGOTO(wrap_gdr_pin_buffer(mscclppGdrCopy, alignedAddr, mapSize, 0, 0, &mh), result, finish); + + MSCCLPPCHECKGOTO(wrap_gdr_map(mscclppGdrCopy, mh, &gdrMap, mapSize), result, finish); + + MSCCLPPCHECKGOTO(wrap_gdr_get_info(mscclppGdrCopy, mh, &info), result, finish); + + // Will offset ever be non zero ? + off = info.va - alignedAddr; + + MSCCLPPCHECKGOTO(mscclppCalloc(&md, 1), result, finish); + md->gdrDevMem = devMem; + md->gdrMap = gdrMap; + md->gdrMapSize = mapSize; + md->gdrOffset = off+align; + md->gdrMh = mh; + *gdrDesc = md; + + *ptr = (T *)((char *)gdrMap+off); + if (devPtr) *devPtr = (T *)(devMem+off+align); + + TRACE(mscclpp_INIT, "GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zi at %p", + md->gdrDevMem, md->gdrMap, md->gdrOffset, md->gdrMh.h, md->gdrMapSize, *ptr); + + return mscclppSuccess; + +finish: + CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode)); + if (*ptr == nullptr) WARN("Failed to CUDA calloc %ld bytes", nelem*sizeof(T)); + INFO(MSCCLPP_ALLOC, "%s:%d Cuda Alloc Size %ld pointer %p", filefunc, line, nelem*sizeof(T), *ptr); + return result; +} +#define mscclppGdrCudaCalloc(...) mscclppGdrCudaCallocDebug(__VA_ARGS__, __FILE__, __LINE__) + + +static mscclppResult_t mscclppGdrCudaFree(void* gdrDesc) { + gdr_mem_desc_t *md = (gdr_mem_desc_t*)gdrDesc; + MSCCLPPCHECK(wrap_gdr_unmap(mscclppGdrCopy, md->gdrMh, md->gdrMap, md->gdrMapSize)); + MSCCLPPCHECK(wrap_gdr_unpin_buffer(mscclppGdrCopy, md->gdrMh)); + CUDACHECK(cudaFree(md->gdrDevMem)); + free(md); + + return mscclppSuccess; +} + + +#endif