GDRCopy support

This commit is contained in:
Changho Hwang
2023-02-22 09:19:30 +00:00
parent 7459a08699
commit 4cfb9b6727
6 changed files with 248 additions and 12 deletions

View File

@@ -101,9 +101,9 @@ LIBDIR := lib
OBJDIR := obj
BINDIR := bin
LDFLAGS := $(NVLDFLAGS) -libverbs
LDFLAGS := $(NVLDFLAGS) -libverbs -lgdrapi
LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc)
LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc gdr.cc)
LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc ib.cc)
LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS))
LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)

View File

@@ -1,6 +1,7 @@
#include "mscclpp.h"
#include "bootstrap.h"
#include "core.h"
#include "gdr.h"
#include <map>
#include <sstream>
@@ -19,6 +20,17 @@ pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
static bool initialized = false;
// static size_t maxLocalSizeBytes = 0;
gdr_t mscclppGdrCopy = NULL;
mscclppResult_t initGdrCopy() {
mscclppGdrCopy = mscclppGdrInit();
if (mscclppGdrCopy == NULL) {
WARN("GDR init failed");
return mscclppSystemError;
}
return mscclppSuccess;
}
static mscclppResult_t mscclppInit() {
if (__atomic_load_n(&initialized, __ATOMIC_ACQUIRE)) return mscclppSuccess;
pthread_mutex_lock(&initLock);
@@ -61,7 +73,11 @@ mscclppResult_t mscclppBootStrapAllGather(mscclppComm_t comm, void* data, int si
}
MSCCLPP_API(mscclppResult_t, mscclppCommInitRank, mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair);
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair){
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair) {
if (mscclppGdrCopy == NULL) {
MSCCLPPCHECK(initGdrCopy());
}
mscclppResult_t res = mscclppSuccess;
mscclppComm_t _comm = NULL;
// uint64_t hash = getHostHash();
@@ -162,7 +178,7 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i
conn->devConn->localBuff = localBuff;
conn->devConn->localFlag = localFlag;
conn->devConn->tag = tag;
MSCCLPPCHECK(mscclppCudaHostCalloc(&conn->devConn->trigger, 1));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuTrigger, &conn->devConn->trigger, 1, &conn->cpuTriggerGdrDesc));
conn->ibCtx = NULL;
conn->ibQp = NULL;
@@ -233,7 +249,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output
}
struct mscclppDevConn *devConn = conn->devConn;
devConn->remoteBuff = NULL;
MSCCLPPCHECK(mscclppCudaCalloc(&devConn->remoteFlag, 1));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuRemoteFlag, &devConn->remoteFlag, 1, &conn->cpuRemoteFlagGdrDesc));
struct mscclppIbContext *ibCtx = conn->ibCtx;
if (conn->ibQp == NULL) {

View File

@@ -8,7 +8,7 @@
#include <sys/syscall.h>
#include <map>
#define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 1
#define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 0
struct proxyArgs {
struct mscclppComm* comm;
@@ -35,7 +35,7 @@ void* mscclppProxyService(void* _args) {
struct mscclppConn *conn = &comm->conns[i];
if (conn->transport != mscclppTransportIB) continue;
if (conn->ibCtx != ibCtx) continue;
volatile uint64_t *tmp = (volatile uint64_t *)conn->devConn->trigger;
volatile uint64_t *tmp = (volatile uint64_t *)conn->cpuTrigger;
trigToSendStateAndConn[tmp].first = SEND_STATE_INIT;
trigToSendStateAndConn[tmp].second = conn;
qpNumToConn[conn->ibQp->qp->qp_num] = conn;
@@ -83,18 +83,18 @@ void* mscclppProxyService(void* _args) {
}
struct mscclppConn *conn = qpNumToConn[wc->qp_num];
if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA != 1)
// TODO(chhwang): cpu flush
*((volatile int *)conn->cpuRemoteFlag) = 1;
#endif
// recv completion
if (qpNumToConn[wc->qp_num]->ibQp->postRecv(wc->wr_id) != 0) {
WARN("postRecv failed: errno %d", errno);
}
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA != 1)
// TODO(chhwang): gdc & cpu flush
// *((volatile int *)conn->devConn->remoteFlag) = 1;
#endif
// WARN("rank %d recv completion", rank);
} else if (wc->opcode == IBV_WC_RDMA_WRITE) {
// send completion
volatile uint64_t *tmp = (volatile uint64_t *)conn->devConn->trigger;
volatile uint64_t *tmp = (volatile uint64_t *)conn->cpuTrigger;
*tmp = 0;
trigToSendStateAndConn[tmp].first = SEND_STATE_INIT;
// WARN("rank %d send completion", rank);

67
src/gdr.cc Normal file
View File

@@ -0,0 +1,67 @@
#include "gdr.h"
// Used to make the GDR library calls thread safe
pthread_mutex_t gdrLock = PTHREAD_MUTEX_INITIALIZER;
gdr_t wrap_gdr_open(void) {
return gdr_open();
}
mscclppResult_t wrap_gdr_close(gdr_t g) {
int ret = gdr_close(g);
if (ret != 0) {
WARN("gdr_close() failed: %d", ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle) {
int ret;
GDRLOCKCALL(gdr_pin_buffer(g, addr, size, p2p_token, va_space, handle), ret);
if (ret != 0) {
WARN("gdr_pin_buffer(addr %lx, size %zi) failed: %d", addr, size, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle) {
int ret;
GDRLOCKCALL(gdr_unpin_buffer(g, handle), ret);
if (ret != 0) {
WARN("gdr_unpin_buffer(handle %lx) failed: %d", handle.h, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info) {
int ret;
GDRLOCKCALL(gdr_get_info(g, handle, info), ret);
if (ret != 0) {
WARN("gdr_get_info(handle %lx) failed: %d", handle.h, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size) {
int ret;
GDRLOCKCALL(gdr_map(g, handle, va, size), ret);
if (ret != 0) {
WARN("gdr_map(handle %lx, size %zi) failed: %d", handle.h, size, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}
mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size) {
int ret;
GDRLOCKCALL(gdr_unmap(g, handle, va, size), ret);
if (ret != 0) {
WARN("gdr_unmap(handle %lx, va %p, size %zi) failed: %d", handle.h, va, size, ret);
return mscclppSystemError;
}
return mscclppSuccess;
}

View File

@@ -162,6 +162,10 @@ struct mscclppConn {
mscclppTransport_t transport;
int remoteRank;
int buffSize;
mscclppTrigger *cpuTrigger;
int *cpuRemoteFlag;
void *cpuTriggerGdrDesc;
void *cpuRemoteFlagGdrDesc;
struct mscclppDevConn *devConn;
struct mscclppIbContext *ibCtx;
struct mscclppIbQp *ibQp;

149
src/include/gdr.h Normal file
View File

@@ -0,0 +1,149 @@
#ifndef MSCCLPP_GDR_H_
#define MSCCLPP_GDR_H_
#include "gdrapi.h"
#include "debug.h"
#include "checks.h"
#include "align.h"
#include "alloc.h"
// These can be used if the GDR library isn't thread safe
#include <pthread.h>
extern pthread_mutex_t gdrLock;
#define GDRLOCK() pthread_mutex_lock(&gdrLock)
#define GDRUNLOCK() pthread_mutex_unlock(&gdrLock)
#define GDRLOCKCALL(cmd, ret) do { \
GDRLOCK(); \
ret = cmd; \
GDRUNLOCK(); \
} while(false)
#define GDRCHECK(cmd) do { \
int e; \
/* GDRLOCKCALL(cmd, e); */ \
e = cmd; \
if( e != 0 ) { \
WARN("GDRCOPY failure %d", e); \
return mscclppSystemError; \
} \
} while(false)
gdr_t wrap_gdr_open(void);
mscclppResult_t wrap_gdr_close(gdr_t g);
mscclppResult_t wrap_gdr_pin_buffer(gdr_t g, unsigned long addr, size_t size, uint64_t p2p_token, uint32_t va_space, gdr_mh_t *handle);
mscclppResult_t wrap_gdr_unpin_buffer(gdr_t g, gdr_mh_t handle);
mscclppResult_t wrap_gdr_get_info(gdr_t g, gdr_mh_t handle, gdr_info_t *info);
mscclppResult_t wrap_gdr_map(gdr_t g, gdr_mh_t handle, void **va, size_t size);
mscclppResult_t wrap_gdr_unmap(gdr_t g, gdr_mh_t handle, void *va, size_t size);
// Global GDR driver handle
extern gdr_t mscclppGdrCopy;
typedef struct gdr_mem_desc {
void *gdrDevMem;
void *gdrMap;
size_t gdrOffset;
size_t gdrMapSize;
gdr_mh_t gdrMh;
} gdr_mem_desc_t;
static gdr_t mscclppGdrInit() {
// int libMajor, libMinor, drvMajor, drvMinor;
gdr_t handle = wrap_gdr_open();
// if (handle != NULL) {
// mscclppResult_t res;
// // Query the version of libgdrapi
// MSCCLPPCHECKGOTO(wrap_gdr_runtime_get_version(&libMajor, &libMinor), res, error);
// // Query the version of gdrdrv driver
// MSCCLPPCHECKGOTO(wrap_gdr_driver_get_version(handle, &drvMajor, &drvMinor), res, error);
// // Only support GDRAPI 2.1 and later
// if (libMajor < 2 || (libMajor == 2 && libMinor < 1) || drvMajor < 2 || (drvMajor == 2 && drvMinor < 1)) {
// goto error;
// }
// else
// INFO(MSCCLPP_INIT, "GDRCOPY enabled library %d.%d driver %d.%d", libMajor, libMinor, drvMajor, drvMinor);
// }
return handle;
// error:
// if (handle != NULL) (void) wrap_gdr_close(handle);
// return NULL;
}
template <typename T>
mscclppResult_t mscclppGdrCudaCallocDebug(T** ptr, T** devPtr, size_t nelem, void** gdrDesc, const char *filefunc, int line) {
mscclppResult_t result = mscclppSuccess;
cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
*ptr = nullptr;
*devPtr = nullptr;
*gdrDesc = nullptr;
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
gdr_info_t info;
size_t mapSize;
gdr_mh_t mh;
char *devMem;
void *gdrMap;
ssize_t off;
gdr_mem_desc_t* md;
uint64_t alignedAddr;
size_t align;
mapSize = sizeof(T)*nelem;
// GDRCOPY Pinned buffer has to be a minimum of a GPU_PAGE_SIZE
ALIGN_SIZE(mapSize, GPU_PAGE_SIZE);
// GDRCOPY Pinned buffer has to be GPU_PAGE_SIZE aligned too
MSCCLPPCHECKGOTO(mscclppCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1), result, finish);
alignedAddr = (((uint64_t) devMem) + GPU_PAGE_OFFSET) & GPU_PAGE_MASK;
align = alignedAddr - (uint64_t)devMem;
WARN("GDR: mscclppGdrCopy %p alignedAddr %p, mapSize %lu", mscclppGdrCopy, (void*)alignedAddr, mapSize);
MSCCLPPCHECKGOTO(wrap_gdr_pin_buffer(mscclppGdrCopy, alignedAddr, mapSize, 0, 0, &mh), result, finish);
MSCCLPPCHECKGOTO(wrap_gdr_map(mscclppGdrCopy, mh, &gdrMap, mapSize), result, finish);
MSCCLPPCHECKGOTO(wrap_gdr_get_info(mscclppGdrCopy, mh, &info), result, finish);
// Will offset ever be non zero ?
off = info.va - alignedAddr;
MSCCLPPCHECKGOTO(mscclppCalloc(&md, 1), result, finish);
md->gdrDevMem = devMem;
md->gdrMap = gdrMap;
md->gdrMapSize = mapSize;
md->gdrOffset = off+align;
md->gdrMh = mh;
*gdrDesc = md;
*ptr = (T *)((char *)gdrMap+off);
if (devPtr) *devPtr = (T *)(devMem+off+align);
TRACE(mscclpp_INIT, "GDRCOPY : allocated devMem %p gdrMap %p offset %lx mh %lx mapSize %zi at %p",
md->gdrDevMem, md->gdrMap, md->gdrOffset, md->gdrMh.h, md->gdrMapSize, *ptr);
return mscclppSuccess;
finish:
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
if (*ptr == nullptr) WARN("Failed to CUDA calloc %ld bytes", nelem*sizeof(T));
INFO(MSCCLPP_ALLOC, "%s:%d Cuda Alloc Size %ld pointer %p", filefunc, line, nelem*sizeof(T), *ptr);
return result;
}
#define mscclppGdrCudaCalloc(...) mscclppGdrCudaCallocDebug(__VA_ARGS__, __FILE__, __LINE__)
static mscclppResult_t mscclppGdrCudaFree(void* gdrDesc) {
gdr_mem_desc_t *md = (gdr_mem_desc_t*)gdrDesc;
MSCCLPPCHECK(wrap_gdr_unmap(mscclppGdrCopy, md->gdrMh, md->gdrMap, md->gdrMapSize));
MSCCLPPCHECK(wrap_gdr_unpin_buffer(mscclppGdrCopy, md->gdrMh));
CUDACHECK(cudaFree(md->gdrDevMem));
free(md);
return mscclppSuccess;
}
#endif