This commit is contained in:
Changho Hwang
2023-04-12 09:25:35 +00:00
parent 63a5be6953
commit dd0883b84f
5 changed files with 74 additions and 69 deletions

View File

@@ -20,12 +20,12 @@
} \
} while (false)
#define CUDACHECKNORET(cmd) \
#define CUDACHECKNORET(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
WARN("Cuda failure '%s'", cudaGetErrorString(err)); \
return; \
return; \
} \
} while (false)

View File

@@ -16,8 +16,6 @@
#define MAXCONNECTIONS 64
struct mscclppConn
{
int connId;
@@ -42,8 +40,8 @@ struct mscclppComm
void* bootstrap;
uint64_t
magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
// Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
uint64_t magic;
int rank; // my rank in the communicator
int nRanks; // number of GPUs in communicator

View File

@@ -29,7 +29,6 @@ struct alignas(16) mscclppDevConnSignalEpochId
uint64_t proxy;
};
/***************************************************************************************************************
* A mscclppDevConn provides a zero-copy connection between two GPUs connected via P2P NVLink or InfiniBand.
* The communication API is one-sided meaning that for every single data transfer, only one side
@@ -183,11 +182,11 @@ struct mscclppDevConn
// my remote peer's buffer. only non-NULL with gpu's direct access
// gpu can directly write into it
void* remoteBuff;
};
// Host interface for mscclppDevCon functionality
struct mscclppHostConn{
struct mscclppHostConn
{
virtual ~mscclppHostConn() = default;
virtual void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize) = 0;
virtual void signal() = 0;

View File

@@ -248,7 +248,8 @@ MSCCLPP_API const char* mscclppGetErrorString(mscclppResult_t code)
}
}
MSCCLPP_API mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag, mscclppDevConn_t** devConn)
MSCCLPP_API mscclppResult_t mscclppGetDeviceConnection(mscclppComm_t comm, int remoteRank, int tag,
mscclppDevConn_t** devConn)
{
for (int i = 0; i < comm->nConns; i++) {
if (comm->devConns[i].remoteRank == remoteRank && comm->devConns[i].tag == tag) {
@@ -314,59 +315,72 @@ static void npkitCollectExitEvents(struct mscclppConn* conn, uint8_t type)
#endif
struct mscclppHostP2PConn : mscclppHostConn{
mscclppHostP2PConn(mscclppConn* _conn, cudaStream_t _stream) : conn(_conn), p2pStream(_stream){}
void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize){
void* srcBuff = (void*)((char*)conn->devConn->localBuff + srcDataOffset);
void* dstBuff = (void*)((char*)conn->devConn->remoteBuff + dstDataOffset);
CUDACHECKNORET(cudaMemcpyAsync(dstBuff, srcBuff, dataSize, cudaMemcpyDeviceToDevice, p2pStream));
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)dataSize);
struct mscclppHostP2PConn : mscclppHostConn
{
mscclppHostP2PConn(mscclppConn* _conn, cudaStream_t _stream) : conn(_conn), p2pStream(_stream)
{
}
void signal(){
CUDACHECKNORET(cudaMemcpyAsync(&conn->devConn->remoteSignalEpochId->proxy,
&(conn->devConn->localSignalEpochId->device), sizeof(uint64_t),
cudaMemcpyDeviceToDevice, p2pStream));
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t));
void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize)
{
void* srcBuff = (void*)((char*)conn->devConn->localBuff + srcDataOffset);
void* dstBuff = (void*)((char*)conn->devConn->remoteBuff + dstDataOffset);
CUDACHECKNORET(cudaMemcpyAsync(dstBuff, srcBuff, dataSize, cudaMemcpyDeviceToDevice, p2pStream));
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)dataSize);
}
void wait(){}
void flush(){
CUDACHECKNORET(cudaStreamSynchronize(p2pStream));
npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT);
void signal()
{
CUDACHECKNORET(cudaMemcpyAsync(&conn->devConn->remoteSignalEpochId->proxy,
&(conn->devConn->localSignalEpochId->device), sizeof(uint64_t),
cudaMemcpyDeviceToDevice, p2pStream));
npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t));
}
void wait()
{
}
void flush()
{
CUDACHECKNORET(cudaStreamSynchronize(p2pStream));
npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT);
}
mscclppConn* conn;
cudaStream_t p2pStream;
};
struct mscclppHostIBConn : mscclppHostConn{
mscclppHostIBConn(mscclppConn* conn) : conn(conn) {}
struct mscclppHostIBConn : mscclppHostConn
{
mscclppHostIBConn(mscclppConn* conn) : conn(conn)
{
}
void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize){
this->ibQp->stageSend(this->ibBuffMr, &this->ibBuffMrInfo, (uint32_t)dataSize,
/*wrId=*/0, /*srcOffset=*/srcDataOffset,
/*dstOffset=*/dstDataOffset,
/*signaled=*/false);
int ret = this->ibQp->postSend();
if (ret != 0) {
// Return value is errno.
WARN("data postSend failed: errno %d", ret);
}
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)dataSize);
void put(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize)
{
this->ibQp->stageSend(this->ibBuffMr, &this->ibBuffMrInfo, (uint32_t)dataSize,
/*wrId=*/0, /*srcOffset=*/srcDataOffset, /*dstOffset=*/dstDataOffset, /*signaled=*/false);
int ret = this->ibQp->postSend();
if (ret != 0) {
// Return value is errno.
WARN("data postSend failed: errno %d", ret);
}
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)dataSize);
}
void signal(){
// My local device flag is copied to the remote's proxy flag
this->ibQp->stageSend(this->ibSignalEpochIdMr, &this->ibSignalEpochIdMrInfo, sizeof(uint64_t),
/*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/sizeof(uint64_t), /*signaled=*/true);
int ret = this->ibQp->postSend();
if (ret != 0) {
WARN("flag postSend failed: errno %d", ret);
}
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t));
void signal()
{
// My local device flag is copied to the remote's proxy flag
this->ibQp->stageSend(this->ibSignalEpochIdMr, &this->ibSignalEpochIdMrInfo, sizeof(uint64_t),
/*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/sizeof(uint64_t), /*signaled=*/true);
int ret = this->ibQp->postSend();
if (ret != 0) {
WARN("flag postSend failed: errno %d", ret);
}
npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_FLAG_ENTRY, (uint32_t)sizeof(uint64_t));
}
void wait(){}
void flush(){
void wait()
{
}
void flush()
{
bool isWaiting = true;
while (isWaiting) {
int wcNum = this->ibQp->pollCq();
@@ -401,10 +415,8 @@ struct mscclppHostIBConn : mscclppHostConn{
struct mscclppIbMrInfo ibSignalEpochIdMrInfo;
};
MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff, uint64_t buffSize,
mscclppTransport_t transportType, const char* ibDev)
MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, int tag, void* localBuff,
uint64_t buffSize, mscclppTransport_t transportType, const char* ibDev)
{
// save this processes numa binding and set it to the one closest to the device
// so that all the allocation are close to the device
@@ -517,8 +529,7 @@ MSCCLPP_API mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, i
if (transportType == mscclppTransportIB) {
conn->hostConn = new mscclppHostIBConn(conn);
}
else if (transportType == mscclppTransportP2P) {
} else if (transportType == mscclppTransportP2P) {
conn->hostConn = new mscclppHostP2PConn(conn, proxyState->p2pStream);
}
@@ -665,7 +676,7 @@ struct bufferInfo
};
MSCCLPP_API mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* local_memory, size_t size,
mscclppRegisteredMemory* regMem)
mscclppRegisteredMemory* regMem)
{
std::vector<struct mscclppIbMr*> ibMrs;
for (int i = 0; i < comm->nConns; ++i) {
@@ -706,8 +717,9 @@ MSCCLPP_API mscclppResult_t mscclppRegisterBuffer(mscclppComm_t comm, void* loca
return mscclppSuccess;
}
MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem, void* srcBuff,
size_t size, uint32_t srcOffset, uint32_t dstOffset, int64_t stream)
MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, mscclppRegisteredMemory* regMem,
void* srcBuff, size_t size, uint32_t srcOffset,
uint32_t dstOffset, int64_t stream)
{
int ret = 0;
// TODO: transport should be an argument too so user can decide which transport to use
@@ -720,9 +732,7 @@ MSCCLPP_API mscclppResult_t mscclppRegisteredBufferWrite(mscclppComm_t comm, msc
} else {
struct mscclppHostIBConn* hostConn = (struct mscclppHostIBConn*)conn->hostConn;
hostConn->ibQp->stageSend(hostConn->ibBuffMr, &hostConn->ibBuffMrInfo, (uint32_t)size,
/*wrId=*/0, /*srcOffset=*/srcOffset,
/*dstOffset=*/dstOffset,
/*signaled=*/false);
/*wrId=*/0, /*srcOffset=*/srcOffset, /*dstOffset=*/dstOffset, /*signaled=*/false);
if ((ret = hostConn->ibQp->postSend()) != 0) {
// Return value is errno.
WARN("data postSend failed: errno %d", ret);

View File

@@ -42,10 +42,9 @@ mscclppResult_t mscclppProxyFifo::create()
{
MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoHead, 1));
#if defined(MSCCLPP_USE_GDRCOPY)
MSCCLPPCHECK(mscclppGdrCudaCalloc(&this->triggerFifo, &this->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE,
&this->triggerFifoDesc));
MSCCLPPCHECK(
mscclppGdrCudaCalloc(&this->fifoTailDevHostPtr, &this->fifoTailDev, 1, &this->fifoTailDesc));
mscclppGdrCudaCalloc(&this->triggerFifo, &this->triggerFifoDev, MSCCLPP_PROXY_FIFO_SIZE, &this->triggerFifoDesc));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&this->fifoTailDevHostPtr, &this->fifoTailDev, 1, &this->fifoTailDesc));
#else
MSCCLPPCHECK(mscclppCudaHostCalloc(&this->triggerFifo, MSCCLPP_PROXY_FIFO_SIZE));
MSCCLPPCHECK(mscclppCudaCalloc(&this->fifoTailDev, 1));
@@ -118,7 +117,6 @@ static void processTrigger(const mscclppTrigger trigger, mscclppConn* conn)
}
}
void* mscclppProxyService(void* _args)
{
struct proxyArgs* args = (struct proxyArgs*)_args;
@@ -147,7 +145,7 @@ void* mscclppProxyService(void* _args)
if (trigger.value[0] == 0) {
continue; // there is one in progreess
}
mscclppConn* conn = &comm->conns[trigger.fields.connId];
processTrigger(trigger, conn);