add memory region functions

This commit is contained in:
Felipe Petroski Such
2023-04-07 12:26:56 -07:00
committed by Crutcher Dunnavant
parent 44a8a539ad
commit 38cd87cdcc
3 changed files with 112 additions and 16 deletions

View File

@@ -12,21 +12,6 @@
#define MSCCLPP_IB_MAX_SENDS 64
#define MSCCLPP_IB_MAX_DEVS 8
// MR info to be shared with the remote peer
struct mscclppIbMrInfo
{
uint64_t addr;
uint32_t rkey;
};
// IB memory region
struct mscclppIbMr
{
struct ibv_mr* mr;
void* buff;
struct mscclppIbMrInfo info;
};
// QP info to be shared with the remote peer
struct mscclppIbQpInfo
{

View File

@@ -13,9 +13,11 @@
#include <mscclppfifo.h>
#include <time.h>
#include <vector>
#ifdef __cplusplus
extern "C" {
extern "C"
{
#endif
/***************************************************************************************************************
@@ -174,6 +176,32 @@ typedef struct
char internal[MSCCLPP_UNIQUE_ID_BYTES];
} mscclppUniqueId;
// MR info to be shared with the remote peer
struct mscclppIbMrInfo
{
uint64_t addr;
uint32_t rkey;
};
// IB memory region
struct mscclppIbMr
{
struct ibv_mr* mr;
void* buff;
struct mscclppIbMrInfo info;
};
struct mscclppRegisteredMemoryP2P
{
void* remoteBuff;
mscclppIbMr* IbMr;
};
struct mscclppRegisteredMemory
{
std::vector<mscclppRegisteredMemoryP2P> p2p;
};
/* Error type */
typedef enum
{

View File

@@ -560,6 +560,89 @@ mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm)
return mscclppSuccess;
}
struct bufferInfo
{
cudaIpcMemHandle_t handleBuff;
mscclppIbMrInfo infoBuffMr;
};
MSCCLPP_API(mscclppResult_t, mscclppRegisterBuffer, mscclppComm_t comm, void* local_memory, size_t size,
mscclppRegisteredMemory* regMem);
mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size,
mscclppRegisteredMemory* regMem)
{
std::vector<struct mscclppIbMr*> ibMrs;
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
struct bufferInfo bInfo;
struct mscclppIbMr* ibBuffMr;
// TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcGetMemHandle(&bInfo.handleBuff, local_memory));
} else if (conn->transport == mscclppTransportIB) {
MSCCLPPCHECK(mscclppIbContextRegisterMr(conn->ibCtx, local_memory, size, &ibBuffMr));
bInfo.infoBuffMr = ibBuffMr->info;
ibMrs.push_back(ibBuffMr);
}
MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo)));
}
// Recv info from peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
struct bufferInfo bInfo;
mscclppRegisteredMemoryP2P p2p;
p2p.IbMr = NULL;
p2p.remoteBuff = NULL;
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo)));
// TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcOpenMemHandle((void**)&p2p.remoteBuff, bInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess));
} else if (conn->transport == mscclppTransportIB) {
p2p.IbMr = ibMrs[i];
}
regMem->p2p.push_back(p2p);
}
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppRegisteredBufferWrite, mscclppComm_t comm, mscclppRegisteredMemory* regMem,
void* srcBuff, size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream);
mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, void* srcBuff,
size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream)
{
int ret = 0;
// TODO: transport should be an argument too so user can decide which transport to use
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn* conn = &comm->conns[i];
// TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB
if (conn->transport == mscclppTransportP2P) {
// TODO: check errors
void* dstBuff = regMem->p2p[i].remoteBuff;
cudaMemcpyAsync(dstBuff, srcBuff, size, cudaMemcpyDeviceToDevice, (cudaStream_t)stream);
} else {
conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)size,
/*wrId=*/0, /*srcOffset=*/srcOffset,
/*dstOffset=*/dstOffset,
/*signaled=*/false);
if ((ret = conn->ibQp->postSend()) != 0) {
// Return value is errno.
WARN("data postSend failed: errno %d", ret);
}
// ??
// npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize,
// trigger.fields.connId);
}
}
return mscclppSuccess;
}
// TODO: destroy registered buffer
MSCCLPP_API(mscclppResult_t, mscclppProxyLaunch, mscclppComm_t comm);
mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm)
{