diff --git a/src/include/checks.h b/src/include/checks.h index fb86fd66..7422e384 100644 --- a/src/include/checks.h +++ b/src/include/checks.h @@ -20,12 +20,12 @@ } \ } while (false) -#define CUDACHECKNORET(cmd) \ +#define CUDACHECKNORET(cmd) \ do { \ cudaError_t err = cmd; \ if (err != cudaSuccess) { \ WARN("Cuda failure '%s'", cudaGetErrorString(err)); \ - return; \ + return; \ } \ } while (false) diff --git a/src/include/comm.h b/src/include/comm.h index 04e21b56..62a6ba01 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -16,8 +16,6 @@ #define MAXCONNECTIONS 64 - - struct mscclppConn { int connId; @@ -42,8 +40,8 @@ struct mscclppComm void* bootstrap; - uint64_t - magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches. + // Magic number for all network communication. Not a security key -- only goal is to detect mismatches. + uint64_t magic; int rank; // my rank in the communicator int nRanks; // number of GPUs in communicator diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index b7db058b..88404a65 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -29,7 +29,6 @@ struct alignas(16) mscclppDevConnSignalEpochId uint64_t proxy; }; - /*************************************************************************************************************** * A mscclppDevConn provides a zero-copy connection between two GPUs connected via P2P NVLink or InfiniBand. * The communication API is one-sided meaning that for every single data transfer, only one side @@ -183,11 +182,11 @@ struct mscclppDevConn // my remote peer's buffer. only non-NULL with gpu's direct access // gpu can directly write into it void* remoteBuff; - }; // Host interface for mscclppDevCon functionality -struct mscclppHostConn{ +struct mscclppHostConn +{ virtual ~mscclppHostConn() = default; virtual void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) = 0; virtual void signal() = 0; diff --git a/src/init.cc b/src/init.cc index f04f14fa..38cd63e1 100644 --- a/src/init.cc +++ b/src/init.cc @@ -248,7 +248,8 @@ MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code) } } -MSCCLPP_API mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn) +MSCCLPP_API mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, + mscclppDevConn_t** devConn) { for (int i = 0; i < comm->nConns; i++) { if (comm->devConns[i].remoteRank == remoteRank && comm->devConns[i].tag == tag) { @@ -314,59 +315,72 @@ static void npkitCollectExitEvents(struct mscclppConn* conn, uint8_t type) #endif - -struct mscclppHostP2PConn : mscclppHostConn{ - mscclppHostP2PConn(mscclppConn* _conn, cudaStream_t _stream) : conn(_conn), p2pStream(_stream){} - - void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize){ - void* srcBuff = (void*)((char*)conn->devConn->localBuff + srcDataOffset); - void* dstBuff = (void*)((char*)conn->devConn->remoteBuff + dstDataOffset); - CUDACHECKNORET(cudaMemcpyAsync(dstBuff, srcBuff, dataSize, cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)dataSize); +struct mscclppHostP2PConn : mscclppHostConn +{ + mscclppHostP2PConn(mscclppConn* _conn, cudaStream_t _stream) : conn(_conn), p2pStream(_stream) + { } - void signal(){ - CUDACHECKNORET(cudaMemcpyAsync(&conn->devConn->remoteSignalEpochId->proxy, - &(conn->devConn->localSignalEpochId->device), sizeof(uint64_t), - cudaMemcpyDeviceToDevice, p2pStream)); - npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t)); + + void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) + { + void* srcBuff = (void*)((char*)conn->devConn->localBuff + srcDataOffset); + void* dstBuff = (void*)((char*)conn->devConn->remoteBuff + dstDataOffset); + CUDACHECKNORET(cudaMemcpyAsync(dstBuff, srcBuff, dataSize, cudaMemcpyDeviceToDevice, p2pStream)); + npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)dataSize); } - void wait(){} - void flush(){ - CUDACHECKNORET(cudaStreamSynchronize(p2pStream)); - npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT); + void signal() + { + CUDACHECKNORET(cudaMemcpyAsync(&conn->devConn->remoteSignalEpochId->proxy, + &(conn->devConn->localSignalEpochId->device), sizeof(uint64_t), + cudaMemcpyDeviceToDevice, p2pStream)); + npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t)); + } + void wait() + { + } + void flush() + { + CUDACHECKNORET(cudaStreamSynchronize(p2pStream)); + npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT); } mscclppConn* conn; cudaStream_t p2pStream; }; -struct mscclppHostIBConn : mscclppHostConn{ - mscclppHostIBConn(mscclppConn* conn) : conn(conn) {} +struct mscclppHostIBConn : mscclppHostConn +{ + mscclppHostIBConn(mscclppConn* conn) : conn(conn) + { + } - void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize){ - this->ibQp->stageSend(this->ibBuffMr, &this->ibBuffMrInfo, (uint32_t)dataSize, - /*wrId=*/0, /*srcOffset=*/srcDataOffset, - /*dstOffset=*/dstDataOffset, - /*signaled=*/false); - int ret = this->ibQp->postSend(); - if (ret != 0) { - // Return value is errno. - WARN("data postSend failed: errno %d", ret); - } - npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)dataSize); + void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) + { + this->ibQp->stageSend(this->ibBuffMr, &this->ibBuffMrInfo, (uint32_t)dataSize, + /*wrId=*/0, /*srcOffset=*/srcDataOffset, /*dstOffset=*/dstDataOffset, /*signaled=*/false); + int ret = this->ibQp->postSend(); + if (ret != 0) { + // Return value is errno. + WARN("data postSend failed: errno %d", ret); + } + npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)dataSize); } - void signal(){ - // My local device flag is copied to the remote's proxy flag - this->ibQp->stageSend(this->ibSignalEpochIdMr, &this->ibSignalEpochIdMrInfo, sizeof(uint64_t), - /*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/sizeof(uint64_t), /*signaled=*/true); - int ret = this->ibQp->postSend(); - if (ret != 0) { - WARN("flag postSend failed: errno %d", ret); - } - npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t)); + void signal() + { + // My local device flag is copied to the remote's proxy flag + this->ibQp->stageSend(this->ibSignalEpochIdMr, &this->ibSignalEpochIdMrInfo, sizeof(uint64_t), + /*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/sizeof(uint64_t), /*signaled=*/true); + int ret = this->ibQp->postSend(); + if (ret != 0) { + WARN("flag postSend failed: errno %d", ret); + } + npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t)); } - void wait(){} - void flush(){ + void wait() + { + } + void flush() + { bool isWaiting = true; while (isWaiting) { int wcNum = this->ibQp->pollCq(); @@ -401,10 +415,8 @@ struct mscclppHostIBConn : mscclppHostConn{ struct mscclppIbMrInfo ibSignalEpochIdMrInfo; }; - - -MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, uint64_t buffSize, - mscclppTransport_t transportType, const char* ibDev) +MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, + uint64_t buffSize, mscclppTransport_t transportType, const char* ibDev) { // save this processes numa binding and set it to the one closest to the device // so that all the allocation are close to the device @@ -517,8 +529,7 @@ MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, i if (transportType == mscclppTransportIB) { conn->hostConn = new mscclppHostIBConn(conn); - } - else if (transportType == mscclppTransportP2P) { + } else if (transportType == mscclppTransportP2P) { conn->hostConn = new mscclppHostP2PConn(conn, proxyState->p2pStream); } @@ -665,7 +676,7 @@ struct bufferInfo }; MSCCLPP_API mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size, - mscclppRegisteredMemory* regMem) + mscclppRegisteredMemory* regMem) { std::vector ibMrs; for (int i = 0; i < comm->nConns; ++i) { @@ -706,8 +717,9 @@ MSCCLPP_API mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* loca return mscclppSuccess; } -MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, void* srcBuff, - size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream) +MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, + void* srcBuff, size_t size, uint32_t srcOffset, + uint32_t dstOffset, int64_t stream) { int ret = 0; // TODO: transport should be an argument too so user can decide which transport to use @@ -720,9 +732,7 @@ MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, msc } else { struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn; hostConn->ibQp->stageSend(hostConn->ibBuffMr, &hostConn->ibBuffMrInfo, (uint32_t)size, - /*wrId=*/0, /*srcOffset=*/srcOffset, - /*dstOffset=*/dstOffset, - /*signaled=*/false); + /*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, /*signaled=*/false); if ((ret = hostConn->ibQp->postSend()) != 0) { // Return value is errno. WARN("data postSend failed: errno %d", ret); diff --git a/src/proxy.cc b/src/proxy.cc index 044316d7..6cfd799b 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -42,10 +42,9 @@ mscclppResult_t mscclppProxyFifo::create() { MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoHead, 1)); #if defined(MSCCLPP_USE_GDRCOPY) - MSCCLPPCHECK(mscclppGdrCudaCalloc(&this->triggerFifo, &this->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE, - &this->triggerFifoDesc)); MSCCLPPCHECK( - mscclppGdrCudaCalloc(&this->fifoTailDevHostPtr, &this->fifoTailDev, 1, &this->fifoTailDesc)); + mscclppGdrCudaCalloc(&this->triggerFifo, &this->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE, &this->triggerFifoDesc)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&this->fifoTailDevHostPtr, &this->fifoTailDev, 1, &this->fifoTailDesc)); #else MSCCLPPCHECK(mscclppCudaHostCalloc(&this->triggerFifo, MSCCLPP_PROXY_FIFO_SIZE)); MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoTailDev, 1)); @@ -118,7 +117,6 @@ static void processTrigger(const mscclppTrigger trigger, mscclppConn* conn) } } - void* mscclppProxyService(void* _args) { struct proxyArgs* args = (struct proxyArgs*)_args; @@ -147,7 +145,7 @@ void* mscclppProxyService(void* _args) if (trigger.value[0] == 0) { continue; // there is one in progreess } - + mscclppConn* conn = &comm->conns[trigger.fields.connId]; processTrigger(trigger, conn);