diff --git a/src/include/ib.h b/src/include/ib.h index 70d219b6..7494ab11 100644 --- a/src/include/ib.h +++ b/src/include/ib.h @@ -12,21 +12,6 @@ #define MSCCLPP_IB_MAX_SENDS 64 #define MSCCLPP_IB_MAX_DEVS 8 -// MR info to be shared with the remote peer -struct mscclppIbMrInfo -{ - uint64_t addr; - uint32_t rkey; -}; - -// IB memory region -struct mscclppIbMr -{ - struct ibv_mr* mr; - void* buff; - struct mscclppIbMrInfo info; -}; - // QP info to be shared with the remote peer struct mscclppIbQpInfo { diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index de6edbc3..1477258d 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -13,9 +13,11 @@ #include #include +#include #ifdef __cplusplus -extern "C" { +extern "C" +{ #endif /*************************************************************************************************************** @@ -174,6 +176,32 @@ typedef struct char internal[MSCCLPP_UNIQUE_ID_BYTES]; } mscclppUniqueId; +// MR info to be shared with the remote peer +struct mscclppIbMrInfo +{ + uint64_t addr; + uint32_t rkey; +}; + +// IB memory region +struct mscclppIbMr +{ + struct ibv_mr* mr; + void* buff; + struct mscclppIbMrInfo info; +}; + +struct mscclppRegisteredMemoryP2P +{ + void* remoteBuff; + mscclppIbMr* IbMr; +}; + +struct mscclppRegisteredMemory +{ + std::vector p2p; +}; + /* Error type */ typedef enum { diff --git a/src/init.cc b/src/init.cc index 2c6db009..b631e82f 100644 --- a/src/init.cc +++ b/src/init.cc @@ -560,6 +560,89 @@ mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm) return mscclppSuccess; } +struct bufferInfo +{ + cudaIpcMemHandle_t handleBuff; + mscclppIbMrInfo infoBuffMr; +}; + +MSCCLPP_API(mscclppResult_t, mscclppRegisterBuffer, mscclppComm_t comm, void* local_memory, size_t size, + mscclppRegisteredMemory* regMem); +mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size, + mscclppRegisteredMemory* regMem) +{ + std::vector ibMrs; + for (int i = 0; i < comm->nConns; ++i) { + struct mscclppConn* conn = &comm->conns[i]; + struct bufferInfo bInfo; + struct mscclppIbMr* ibBuffMr; + + // TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB + if (conn->transport == mscclppTransportP2P) { + CUDACHECK(cudaIpcGetMemHandle(&bInfo.handleBuff, local_memory)); + } else if (conn->transport == mscclppTransportIB) { + MSCCLPPCHECK(mscclppIbContextRegisterMr(conn->ibCtx, local_memory, size, &ibBuffMr)); + bInfo.infoBuffMr = ibBuffMr->info; + ibMrs.push_back(ibBuffMr); + } + + MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo))); + } + + // Recv info from peers + for (int i = 0; i < comm->nConns; ++i) { + struct mscclppConn* conn = &comm->conns[i]; + struct bufferInfo bInfo; + + mscclppRegisteredMemoryP2P p2p; + p2p.IbMr = NULL; + p2p.remoteBuff = NULL; + MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->devConn->remoteRank, conn->devConn->tag, &bInfo, sizeof(bInfo))); + + // TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB + if (conn->transport == mscclppTransportP2P) { + CUDACHECK(cudaIpcOpenMemHandle((void**)&p2p.remoteBuff, bInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess)); + } else if (conn->transport == mscclppTransportIB) { + p2p.IbMr = ibMrs[i]; + } + regMem->p2p.push_back(p2p); + } + return mscclppSuccess; +} + +MSCCLPP_API(mscclppResult_t, mscclppRegisteredBufferWrite, mscclppComm_t comm, mscclppRegisteredMemory* regMem, + void* srcBuff, size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream); +mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, void* srcBuff, + size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream) +{ + int ret = 0; + // TODO: transport should be an argument too so user can decide which transport to use + for (int i = 0; i < comm->nConns; ++i) { + struct mscclppConn* conn = &comm->conns[i]; + // TODO: (conn->transport & mscclppTransportP2P) to support both P2P and IB + if (conn->transport == mscclppTransportP2P) { + // TODO: check errors + void* dstBuff = regMem->p2p[i].remoteBuff; + cudaMemcpyAsync(dstBuff, srcBuff, size, cudaMemcpyDeviceToDevice, (cudaStream_t)stream); + } else { + conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)size, + /*wrId=*/0, /*srcOffset=*/srcOffset, + /*dstOffset=*/dstOffset, + /*signaled=*/false); + if ((ret = conn->ibQp->postSend()) != 0) { + // Return value is errno. + WARN("data postSend failed: errno %d", ret); + } + // ?? + // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_ENTRY, (uint32_t)trigger.fields.dataSize, + // trigger.fields.connId); + } + } + return mscclppSuccess; +} + +// TODO: destroy registered buffer + MSCCLPP_API(mscclppResult_t, mscclppProxyLaunch, mscclppComm_t comm); mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm) {