From 1be76d128d31378afba41fe88e10a7edb2b02f81 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Fri, 10 Mar 2023 10:49:36 +0000 Subject: [PATCH 01/16] 128-bit trigger --- src/include/mscclpp.h | 21 +++++++++++++-------- src/proxy.cc | 22 ++++++++++++++++------ tests/p2p_test.cu | 20 +++++++++++++++----- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index a47593eb..2a92e987 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -22,18 +22,23 @@ typedef enum : uint64_t { mscclppData = 0x1, mscclppFlag = 0x2, mscclppSync = 0x4} mscclppTriggerType_t; -#define MSCCLPP_SIZE_BITS 30 -#define MSCCLPP_OFFSET_BITS 31 +#define MSCCLPP_BITS_SIZE 30 +#define MSCCLPP_BITS_OFFSET 31 +#define MSCCLPP_BITS_CONNID 10 -#define TRIGGER_VALUE(__TYPE__,__OFFSET__,__SIZE__) (((((__TYPE__) << MSCCLPP_OFFSET_BITS) + (__OFFSET__)) << MSCCLPP_SIZE_BITS) + __SIZE__ ) +#define TRIGGER_VALUE(__TYPE__,__OFFSET__,__SIZE__) (((((__TYPE__) << MSCCLPP_BITS_OFFSET) + (__OFFSET__)) << MSCCLPP_BITS_SIZE) + __SIZE__ ) -// the summation of number of bits must be 64 or less -union alignas(8) mscclppTrigger { - uint64_t value; +// the summation of number of bits must be 128 or less +union alignas(16) mscclppTrigger { + uint64_t value[2]; struct { - uint64_t dataSize : MSCCLPP_SIZE_BITS; - uint64_t dataOffset : MSCCLPP_OFFSET_BITS; + // high 64 bits: value[0] + uint64_t dataSize : MSCCLPP_BITS_SIZE; + uint64_t dataOffset : MSCCLPP_BITS_OFFSET; uint64_t type : 3; + uint64_t : 0; // ensure 64-bit alignment + // low 64 bits: value[1] + uint64_t connId : MSCCLPP_BITS_CONNID; } fields; }; diff --git a/src/proxy.cc b/src/proxy.cc index aaf6d6c5..f7a772b4 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -68,8 +68,13 @@ void* mscclppProxyServiceP2P(void* _args) { while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { for (struct mscclppConn *conn : conns) { // Poll to see if we are ready to send anything - trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - if (trigger.value == 0) continue; + trigger.value[0] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); + if (trigger.value[0] == 0) continue; + // TODO(chhwang): latency overhead of reading value[1] is too large (~9us) + trigger.value[1] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value + 1); + if (trigger.value[1] != 42) { + WARN("Unexpected value"); + } // Iterate over what send is needed if (trigger.fields.type & mscclppData){ @@ -85,7 +90,7 @@ void* mscclppProxyServiceP2P(void* _args) { PROXYCUDACHECK(cudaStreamSynchronize(stream)); } - // Send completion + // Send completion: reset only the high 64 bits volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); *tmp = 0; conn->fifoTail++; @@ -196,8 +201,13 @@ void* mscclppProxyServiceIb(void* _args) { } #else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1) // Poll to see if we are ready to send anything - trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - if (trigger.value == 0) continue; + trigger.value[0] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); + if (trigger.value[0] == 0) continue; + // TODO(chhwang): latency overhead of reading value[1] is too large (~9us) + trigger.value[1] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value + 1); + if (trigger.value[1] != 42) { + WARN("Unexpected value"); + } if (trigger.fields.type & mscclppData) { conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, @@ -240,7 +250,7 @@ void* mscclppProxyServiceIb(void* _args) { } } - // Send completion + // Send completion: reset only the high 64 bits volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); *tmp = 0; conn->fifoTail++; diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index f90deb44..469b17bb 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -42,6 +42,17 @@ static double getTime(void) __constant__ mscclppDevConn_t constDevConns[16]; +__forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t connId, uint64_t type, + uint64_t dataOffset, uint64_t dataSize) +{ + asm volatile( + "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), + "l"((type << (MSCCLPP_BITS_SIZE + MSCCLPP_BITS_OFFSET)) + + (dataOffset << (MSCCLPP_BITS_SIZE)) + + (dataSize)), + "l"(connId)); +} + __global__ void kernel(int rank, int world_size) { if (threadIdx.x % 32 != 0) return; @@ -54,7 +65,7 @@ __global__ void kernel(int rank, int world_size) volatile uint64_t *remoteFlag = devConn.remoteFlag; volatile uint64_t *proxyFlag = devConn.proxyFlag; int curFifoHead = *devConn.triggerFifoHead; - volatile uint64_t *trig = (volatile uint64_t *)&devConn.trigger[curFifoHead]; + mscclppTrigger *trig = &devConn.trigger[curFifoHead]; curFifoHead += 1; if (curFifoHead == MSCCLPP_PROXY_FIFO_SIZE) curFifoHead = 0; @@ -78,12 +89,11 @@ __global__ void kernel(int rank, int world_size) #if (USE_DMA_FOR_P2P == 1) // Wait until the proxy have sent my data and flag - while (*trig != 0) {} + // Check only the high 64 bits + while (*(volatile uint64_t *)trig->value != 0) {} // Trigger sending data and flag - uint64_t dataOffset = rank * sizeof(int); - uint64_t dataSize = sizeof(int); - *trig = TRIGGER_VALUE(mscclppFlag | mscclppData, dataOffset, dataSize); + setTrigger(trig, /*for test*/42, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} From ea7134549efb90154c559e13822bda18f035d9ec Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Mon, 13 Mar 2023 07:02:26 +0000 Subject: [PATCH 02/16] vector instructions for trigger --- src/proxy.cc | 11 +++++++++-- tests/p2p_test.cu | 8 ++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index f7a772b4..95813bd8 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -39,6 +39,9 @@ struct proxyArgs { int connIdx; }; + +#include + // TODO(saemal) We need to add a fifo for each DMA engine void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; @@ -68,10 +71,14 @@ void* mscclppProxyServiceP2P(void* _args) { while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { for (struct mscclppConn *conn : conns) { // Poll to see if we are ready to send anything - trigger.value[0] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); + // trigger.value[0] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); + volatile uint64_t* src = (volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); + volatile uint64_t* dst = (volatile uint64_t *)trigger.value; + __m128i xmm0 = _mm_loadu_si128((__m128i*)src); + _mm_storeu_si128((__m128i*)dst, xmm0); if (trigger.value[0] == 0) continue; // TODO(chhwang): latency overhead of reading value[1] is too large (~9us) - trigger.value[1] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value + 1); + // trigger.value[1] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value + 1); if (trigger.value[1] != 42) { WARN("Unexpected value"); } diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index 469b17bb..cf6aae68 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -269,7 +269,7 @@ int main(int argc, const char *argv[]) CUDACHECK(cudaEventCreate(&ev_end)); // warm up - int warmupiter = 10; + // int warmupiter = 10; // for (int i = 0; i < warmupiter; ++i) { // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); // } @@ -278,14 +278,14 @@ int main(int argc, const char *argv[]) cudaGraph_t graph; cudaGraphExec_t instance; cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - int cudagraphiter = 10; + int cudagraphiter = 100; for (int i = 0; i < cudagraphiter; ++i) { kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); } cudaStreamEndCapture(stream, &graph); cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - int cudagraphwarmup = 20; + int cudagraphwarmup = 100; for (int i = 0; i < cudagraphwarmup; ++i) { cudaGraphLaunch(instance, stream); } @@ -294,7 +294,7 @@ int main(int argc, const char *argv[]) // measure runtime // CUDACHECK(cudaEventRecord(ev_start, stream)); double t0 = getTime(); - int cudagraphlaunch = 10; + int cudagraphlaunch = 100; for (int i = 0; i < cudagraphlaunch; ++i) { // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); cudaGraphLaunch(instance, stream); From e357beef00849878b12ca1c511015269a7027158 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Mon, 13 Mar 2023 14:19:36 +0000 Subject: [PATCH 03/16] One fifo per proxy --- src/ib.cc | 32 ++-- src/include/comm.h | 5 - src/include/mscclpp.h | 3 +- src/include/proxy.h | 8 +- src/init.cc | 28 ++-- src/proxy.cc | 340 ++++++++++++++++++++---------------------- tests/p2p_test.cu | 23 ++- 7 files changed, 212 insertions(+), 227 deletions(-) diff --git a/src/ib.cc b/src/ib.cc index 8423819d..51a5820c 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -186,8 +186,8 @@ mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct ms qp_init_attr.send_cq = cq; qp_init_attr.recv_cq = cq; qp_init_attr.qp_type = IBV_QPT_RC; - qp_init_attr.cap.max_send_wr = MAXCONNECTIONS; - qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS; + qp_init_attr.cap.max_send_wr = MAXCONNECTIONS * MSCCLPP_PROXY_FIFO_SIZE; + qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS * MSCCLPP_PROXY_FIFO_SIZE; qp_init_attr.cap.max_send_sge = 1; qp_init_attr.cap.max_recv_sge = 1; qp_init_attr.cap.max_inline_data = 0; @@ -381,25 +381,23 @@ int mscclppIbQp::stageSendWithImm(struct mscclppIbMr *ibMr, const mscclppIbMrInf int mscclppIbQp::postSend() { - struct ibv_send_wr *bad_wr; - int ret = ibv_post_send(this->qp, this->wrs, &bad_wr); - if (ret != 0) { - return ret; - } - // std::memset(this->wrs, 0, sizeof(struct ibv_send_wr) * this->wrn); - // std::memset(this->sges, 0, sizeof(struct ibv_sge) * this->wrn); - this->wrn = 0; - return 0; + struct ibv_send_wr *bad_wr; + int ret = ibv_post_send(this->qp, this->wrs, &bad_wr); + if (ret != 0) { + return ret; + } + this->wrn = 0; + return 0; } int mscclppIbQp::postRecv(uint64_t wrId) { - struct ibv_recv_wr wr, *bad_wr; - wr.wr_id = wrId; - wr.sg_list = nullptr; - wr.num_sge = 0; - wr.next = nullptr; - return ibv_post_recv(this->qp, &wr, &bad_wr); + struct ibv_recv_wr wr, *bad_wr; + wr.wr_id = wrId; + wr.sg_list = nullptr; + wr.num_sge = 0; + wr.next = nullptr; + return ibv_post_recv(this->qp, &wr, &bad_wr); } int mscclppIbQp::pollCq() diff --git a/src/include/comm.h b/src/include/comm.h index 8da70875..81449fe2 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -162,13 +162,8 @@ struct mscclppConn { mscclppTransport_t transport; int remoteRank; int buffSize; - mscclppTrigger *cpuTriggerFifo; - // fifoTail indicates where CPU needs to read the head of the fifo. only accessible by CPU - // No atomicity is required for fifoTail as only a single CPU thread accesses it. - int fifoTail; uint64_t *remoteProxyFlag; uint64_t *cpuProxyFlag; - void *cpuTriggerFifoGdrDesc; void *cpuProxyFlagGdrDesc; struct mscclppDevConn *devConn; struct mscclppIbContext *ibCtx; diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 2a92e987..1aef4de0 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -85,9 +85,10 @@ struct mscclppDevConn { // // localBuff[srcOffset..srcOffset+size-1] <- remoteBuff[dstOffset..dstOffset+size-1] // virtual void pullRemoteBuff(size_t srcOffset, size_t dstOffset, size_t size); - int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic + unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic mscclppTrigger* trigger; uint64_t* proxyFlag; + int connId; }; typedef struct mscclppComm* mscclppComm_t; diff --git a/src/include/proxy.h b/src/include/proxy.h index 225e9fae..5e616aa2 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -12,8 +12,12 @@ typedef enum { } mscclppProxyRunState_t; struct mscclppProxyState { - pthread_t *threads; - mscclppProxyRunState_t *runs; + pthread_t thread; + mscclppProxyRunState_t run; + mscclppTrigger *cpuTriggerFifo; + mscclppTrigger *gpuTriggerFifo; + unsigned int *gpuTriggerFifoHead; + void *cpuTriggerFifoGdrDesc; }; mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); diff --git a/src/init.cc b/src/init.cc index 2cb063a6..a914d225 100644 --- a/src/init.cc +++ b/src/init.cc @@ -171,24 +171,16 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i WARN("devConnOut is the output of this function and needs to be allocated by the user"); return mscclppInvalidUsage; } - struct mscclppConn *conn = &comm->conns[comm->nConns++]; + struct mscclppConn *conn = &comm->conns[comm->nConns]; conn->transport = transportType; conn->remoteRank = remoteRank; conn->buffSize = buffSize; - conn->devConn = devConnOut; - conn->devConn->localBuff = localBuff; - conn->devConn->localFlag = localFlag; - conn->devConn->tag = tag; - - // TODO(saemal): these two should be shared for all P2P-DMA connections made from each GPU. Same for each IB driver. - MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuTriggerFifo, &conn->devConn->trigger, MSCCLPP_PROXY_FIFO_SIZE, &conn->cpuTriggerFifoGdrDesc)); - MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->triggerFifoHead, 1)); conn->ibCtx = NULL; conn->ibQp = NULL; + int ibDevIdx = -1; if (ibDev != NULL) { // Check if an IB context exists - int ibDevIdx = -1; int firstNullIdx = -1; for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { if (comm->ibContext[i] == NULL) { @@ -210,6 +202,22 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i } conn->ibCtx = comm->ibContext[ibDevIdx]; } + int proxyIdx = (ibDevIdx == -1) ? MSCCLPP_IB_MAX_DEVS : ibDevIdx; + struct mscclppProxyState *proxyState = &comm->proxyState[proxyIdx]; + if (proxyState->cpuTriggerFifo == NULL) { + MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo, + MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc)); + MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1)); + } + conn->devConn = devConnOut; + conn->devConn->localBuff = localBuff; + conn->devConn->localFlag = localFlag; + conn->devConn->tag = tag; + conn->devConn->connId = comm->nConns; + conn->devConn->trigger = proxyState->gpuTriggerFifo; + conn->devConn->triggerFifoHead = proxyState->gpuTriggerFifoHead; + + comm->nConns++; return mscclppSuccess; } diff --git a/src/proxy.cc b/src/proxy.cc index 95813bd8..05bf7282 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -5,6 +5,7 @@ #include "ib.h" #include "checks.h" +#include #include #include #include @@ -36,11 +37,13 @@ struct proxyArgs { struct mscclppIbContext* ibCtx; cudaStream_t stream; volatile mscclppProxyRunState_t* run; - int connIdx; + mscclppTrigger *triggerFifo; }; - -#include +static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { + __m128i xmm0 = _mm_loadu_si128((__m128i *)src); + _mm_storeu_si128((__m128i *)dst, xmm0); +} // TODO(saemal) We need to add a fifo for each DMA engine void* mscclppProxyServiceP2P(void* _args) { @@ -55,6 +58,7 @@ void* mscclppProxyServiceP2P(void* _args) { conns.push_back(conn); } } + mscclppTrigger *fifo = args->triggerFifo; cudaStream_t stream = args->stream; free(_args); @@ -68,42 +72,32 @@ void* mscclppProxyServiceP2P(void* _args) { PROXYCUDACHECK(cudaSetDevice(comm->cudaDev)); PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); + // fifoTail indicates where CPU needs to read the head of the fifo. + int fifoTail = 0; while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { - for (struct mscclppConn *conn : conns) { - // Poll to see if we are ready to send anything - // trigger.value[0] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); - volatile uint64_t* src = (volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); - volatile uint64_t* dst = (volatile uint64_t *)trigger.value; - __m128i xmm0 = _mm_loadu_si128((__m128i*)src); - _mm_storeu_si128((__m128i*)dst, xmm0); - if (trigger.value[0] == 0) continue; - // TODO(chhwang): latency overhead of reading value[1] is too large (~9us) - // trigger.value[1] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value + 1); - if (trigger.value[1] != 42) { - WARN("Unexpected value"); - } + // Poll to see if we are ready to send anything + readTrigger(&trigger, &fifo[fifoTail]); + if (trigger.value[0] == 0) continue; - // Iterate over what send is needed - if (trigger.fields.type & mscclppData){ - void *srcBuff = (void *)((char *)conn->devConn->localBuff + trigger.fields.dataOffset); - void *dstBuff = (void *)((char *)conn->devConn->remoteBuff + trigger.fields.dataOffset); - PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, stream)); - } - if (trigger.fields.type & mscclppFlag) { - PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->localFlag, sizeof(uint64_t), cudaMemcpyDeviceToDevice, stream)); - } - // Wait for completion - if (trigger.fields.type & mscclppSync){ - PROXYCUDACHECK(cudaStreamSynchronize(stream)); - } + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; - // Send completion: reset only the high 64 bits - volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - *tmp = 0; - conn->fifoTail++; - if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE) - conn->fifoTail = 0; + // Iterate over what send is needed + if (trigger.fields.type & mscclppData){ + void *srcBuff = (void *)((char *)conn->devConn->localBuff + trigger.fields.dataOffset); + void *dstBuff = (void *)((char *)conn->devConn->remoteBuff + trigger.fields.dataOffset); + PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, stream)); } + if (trigger.fields.type & mscclppFlag) { + PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->localFlag, sizeof(uint64_t), cudaMemcpyDeviceToDevice, stream)); + } + // Wait for completion + if (trigger.fields.type & mscclppSync){ + PROXYCUDACHECK(cudaStreamSynchronize(stream)); + } + + // Send completion: reset only the high 64 bits + *(volatile uint64_t *)(&fifo[fifoTail]) = 0; + fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; } // Need a sync in case previous copies are not completed @@ -128,6 +122,7 @@ void* mscclppProxyServiceIb(void* _args) { conns.push_back(conn); } } + mscclppTrigger *fifo = args->triggerFifo; free(_args); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) @@ -135,8 +130,16 @@ void* mscclppProxyServiceIb(void* _args) { SEND_STATE_INIT, SEND_STATE_INPROGRESS }; - int sendState = SEND_STATE_INIT; - uint64_t currentProxyFlagVlaue = *conn->cpuProxyFlag; + int *sendState; + uint64_t *currentProxyFlagVlaue; + if (mscclppCalloc((void **)&sendState, comm->nConns) != mscclppSuccess) { + WARN("mscclppCalloc failed: errno %d", errno); + return NULL; + } + if (mscclppCalloc((void **)¤tProxyFlagVlaue, comm->nConns) != mscclppSuccess) { + WARN("mscclppCalloc failed: errno %d", errno); + return NULL; + } #endif int rank = comm->rank; @@ -146,7 +149,10 @@ void* mscclppProxyServiceIb(void* _args) { NumaBind(ibCtx->numaNode); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) - for (struct mscclppConn *conn : conns) { + for (int i = 0; i < (int)comm->nConns; ++i) { + sendState[i] = SEND_STATE_INIT; + struct mscclppConn *conn = &comm->conns[i]; + currentProxyFlagVlaue[i] = *conn->cpuProxyFlag; // Post recv if (conn->ibQp->postRecv(0) != 0) { WARN("postRecv failed: errno %d", errno); @@ -154,28 +160,90 @@ void* mscclppProxyServiceIb(void* _args) { } #endif + // fifoTail indicates where CPU needs to read the head of the fifo. + int fifoTail = 0; while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { - for (struct mscclppConn *conn : conns) { + // Poll to see if we are ready to send anything + readTrigger(&trigger, &fifo[fifoTail]); + #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) - // Try send - if (sendState == SEND_STATE_INIT) { - trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - if (trigger.value != 0) { - // Do send - conn->ibQp->stageSendWithImm(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, - /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/true, /*immData=*/0); - if (conn->ibQp->postSend() != 0) { - WARN("postSend failed: errno %d", errno); + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; + // Try send + if (sendState[trigger.fields.connId] == SEND_STATE_INIT) { + if (trigger.value[0] != 0) { + // Do send + conn->ibQp->stageSendWithImm(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, + /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/true, /*immData=*/0); + int ret; + if ((ret = conn->ibQp->postSend()) != 0) { + // Return value is errno. + WARN("postSend failed: errno %d", ret); + } + sendState[trigger.fields.connId] = SEND_STATE_INPROGRESS; + } + } + + // Poll completions + wcNum = conn->ibQp->pollCq(); + if (wcNum < 0) { + WARN("rank %d pollCq failed: errno %d", rank, errno); + } else { + for (int i = 0; i < wcNum; ++i) { + struct ibv_wc *wc = &conn->ibQp->wcs[i]; + if (wc->status != IBV_WC_SUCCESS) { + WARN("rank %d wc status %d", rank, wc->status); + continue; + } + if (wc->qp_num != conn->ibQp->qp->qp_num) { + WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num); + continue; + } + if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { + // TODO(chhwang): cpu flush + *((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagVlaue[trigger.fields.connId]; + // recv completion + if (conn->ibQp->postRecv(wc->wr_id) != 0) { + WARN("postRecv failed: errno %d", errno); } - sendState = SEND_STATE_INPROGRESS; + // WARN("rank %d recv completion", rank); + } else if (wc->opcode == IBV_WC_RDMA_WRITE) { + // send completion + *(volatile uint64_t *)(&fifo[fifoTail]) = 0; + fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + sendState[trigger.fields.connId] = SEND_STATE_INIT; + // WARN("rank %d send completion", rank); } } + } +#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1) + if (trigger.value[0] == 0) continue; - // Poll completions - wcNum = conn->ibQp->pollCq(); - if (wcNum < 0) { - WARN("rank %d pollCq failed: errno %d", rank, errno); - } else { + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; + + if (trigger.fields.type & mscclppData) { + conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, + /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/false); + } + if (trigger.fields.type & mscclppFlag) { + // My local flag is copied to the peer's proxy flag + conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t), + /*wrId=*/0, /*offset=*/0, /*signaled=*/true); + } + int ret; + if ((ret = conn->ibQp->postSend()) != 0) { + // Return value is errno. + WARN("postSend failed: errno %d", ret); + } + + // Wait for completion + if (trigger.fields.type & mscclppSync) { + bool waiting = true; + while (waiting) { + wcNum = conn->ibQp->pollCq(); + if (wcNum < 0) { + WARN("rank %d pollCq failed: errno %d", rank, errno); + continue; + } for (int i = 0; i < wcNum; ++i) { struct ibv_wc *wc = &conn->ibQp->wcs[i]; if (wc->status != IBV_WC_SUCCESS) { @@ -186,85 +254,19 @@ void* mscclppProxyServiceIb(void* _args) { WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num); continue; } - if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { - // TODO(chhwang): cpu flush - *((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagVlaue; - // recv completion - if (conn->ibQp->postRecv(wc->wr_id) != 0) { - WARN("postRecv failed: errno %d", errno); - } - // WARN("rank %d recv completion", rank); - } else if (wc->opcode == IBV_WC_RDMA_WRITE) { + if (wc->opcode == IBV_WC_RDMA_WRITE) { // send completion - volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - *tmp = 0; - conn->fifoTail++; - if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE) - conn->fifoTail = 0; - sendState = SEND_STATE_INIT; - // WARN("rank %d send completion", rank); + waiting = false; + break; } } } -#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1) - // Poll to see if we are ready to send anything - trigger.value[0] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value); - if (trigger.value[0] == 0) continue; - // TODO(chhwang): latency overhead of reading value[1] is too large (~9us) - trigger.value[1] = *(volatile uint64_t *)(conn->cpuTriggerFifo[conn->fifoTail].value + 1); - if (trigger.value[1] != 42) { - WARN("Unexpected value"); - } - - if (trigger.fields.type & mscclppData) { - conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, - /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/false); - } - if (trigger.fields.type & mscclppFlag) { - // My local flag is copied to the peer's proxy flag - conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t), - /*wrId=*/0, /*offset=*/0, /*signaled=*/true); - } - if (conn->ibQp->postSend() != 0) { - WARN("postSend failed: errno %d", errno); - } - - // Wait for completion - if (trigger.fields.type & mscclppSync) { - bool waiting = true; - while (waiting) { - wcNum = conn->ibQp->pollCq(); - if (wcNum < 0) { - WARN("rank %d pollCq failed: errno %d", rank, errno); - continue; - } - for (int i = 0; i < wcNum; ++i) { - struct ibv_wc *wc = &conn->ibQp->wcs[i]; - if (wc->status != IBV_WC_SUCCESS) { - WARN("rank %d wc status %d", rank, wc->status); - continue; - } - if (wc->qp_num != conn->ibQp->qp->qp_num) { - WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num); - continue; - } - if (wc->opcode == IBV_WC_RDMA_WRITE) { - // send completion - waiting = false; - break; - } - } - } - } - - // Send completion: reset only the high 64 bits - volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - *tmp = 0; - conn->fifoTail++; - if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE) - conn->fifoTail = 0; -#endif } + + // Send completion: reset only the high 64 bits + *(volatile uint64_t *)(&fifo[fifoTail]) = 0; + fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; +#endif } *run = MSCCLPP_PROXY_RUN_STATE_IDLE; // WARN("Proxy exits: rank %d", rank); @@ -283,69 +285,51 @@ void* mscclppProxyService(void* _args) { return ret; } -// mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses) { -// comm->proxyState.listenSock = sock; -// comm->proxyState.peerAddresses = peerAddresses; -// return mscclppSuccess; -// } - mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { - if (comm->ibContext[i] == NULL) continue; - if (comm->proxyState[i].threads == NULL) { - MSCCLPPCHECK(mscclppCalloc(&comm->proxyState[i].threads, 1)); + for (int i = 0; i < MSCCLPP_IB_MAX_DEVS + 1; ++i) { + // `i == MSCCLPP_IB_MAX_DEVS` is for the P2P proxy + bool is_p2p = (i == MSCCLPP_IB_MAX_DEVS); + if (!is_p2p) { + if (comm->ibContext[i] == NULL) continue; } - if (comm->proxyState[i].runs == NULL) { - MSCCLPPCHECK(mscclppCalloc(&comm->proxyState[i].runs, 1)); + if (comm->proxyState[i].cpuTriggerFifo == NULL) { + // reachable when there is no mscclppTransportP2P type connection + continue; } - // Create IB proxy threads struct proxyArgs *args; MSCCLPPCHECK(mscclppCalloc(&args, 1)); args->comm = comm; - args->ibCtx = comm->ibContext[i]; - args->run = comm->proxyState[i].runs; + args->ibCtx = is_p2p ? NULL : comm->ibContext[i]; + args->run = &comm->proxyState[i].run; + args->triggerFifo = comm->proxyState[i].cpuTriggerFifo; + if (is_p2p) { + CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); + } *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; - pthread_create(comm->proxyState[i].threads, NULL, mscclppProxyService, args); - mscclppSetThreadName(comm->proxyState[i].threads[0], "MSCCLPP Service IB - %02d", i); + pthread_create(&comm->proxyState[i].thread, NULL, mscclppProxyService, args); + if (is_p2p) { + mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); + } else { + mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service IB - %02d", i); + } } - // P2P proxy - mscclppProxyState *proxyState = &comm->proxyState[MSCCLPP_IB_MAX_DEVS]; - if (proxyState->threads == NULL) { - MSCCLPPCHECK(mscclppCalloc(&proxyState->threads, 1)); - } - if (proxyState->runs == NULL) { - MSCCLPPCHECK(mscclppCalloc(&proxyState->runs, 1)); - } - // Create P2P DMA proxy thread - struct proxyArgs *args; - MSCCLPPCHECK(mscclppCalloc(&args, 1)); - args->comm = comm; - args->ibCtx = NULL; - args->run = proxyState->runs; - args->connIdx = -1; // unused - CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); - *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; - pthread_create(proxyState->threads, NULL, mscclppProxyService, args); - mscclppSetThreadName(proxyState->threads[0], "MSCCLPP Service P2P - %02d", comm->cudaDev); return mscclppSuccess; } -static void _stopProxy(struct mscclppComm* comm, int devIdx, int connIdx) { - volatile int *run = (volatile int *)&comm->proxyState[devIdx].runs[connIdx]; - if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) return; - *run = MSCCLPP_PROXY_RUN_STATE_EXITING; - while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) { - usleep(1000); - } -} - mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { - if (comm->ibContext[i] != NULL) { - _stopProxy(comm, i, 0); + for (int i = 0; i < MSCCLPP_IB_MAX_DEVS + 1; ++i) { + // `i == MSCCLPP_IB_MAX_DEVS` is for the P2P proxy + if (i < MSCCLPP_IB_MAX_DEVS) { + if (comm->ibContext[i] == NULL) continue; + } + volatile int *run = (volatile int *)&comm->proxyState[i].run; + if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) { + continue; + } + *run = MSCCLPP_PROXY_RUN_STATE_EXITING; + while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) { + usleep(1000); } } - // P2P proxies - _stopProxy(comm, MSCCLPP_IB_MAX_DEVS, 0); return mscclppSuccess; } diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index cf6aae68..677e56fe 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -64,12 +64,8 @@ __global__ void kernel(int rank, int world_size) volatile uint64_t *localFlag = devConn.localFlag; volatile uint64_t *remoteFlag = devConn.remoteFlag; volatile uint64_t *proxyFlag = devConn.proxyFlag; - int curFifoHead = *devConn.triggerFifoHead; + unsigned int curFifoHead = atomicInc(devConn.triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); mscclppTrigger *trig = &devConn.trigger[curFifoHead]; - curFifoHead += 1; - if (curFifoHead == MSCCLPP_PROXY_FIFO_SIZE) - curFifoHead = 0; - *devConn.triggerFifoHead = curFifoHead; uint64_t baseFlag = *localFlag; @@ -93,7 +89,7 @@ __global__ void kernel(int rank, int world_size) while (*(volatile uint64_t *)trig->value != 0) {} // Trigger sending data and flag - setTrigger(trig, /*for test*/42, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); + setTrigger(trig, devConn.connId, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} @@ -101,13 +97,12 @@ __global__ void kernel(int rank, int world_size) #else // USE_DMA_FOR_P2P == 0 if (devConn.remoteBuff == NULL) { // IB - // Trigger sending data and flag - uint64_t dataOffset = rank * sizeof(int); - uint64_t dataSize = sizeof(int); - *trig = TRIGGER_VALUE(mscclppSync | mscclppFlag | mscclppData, dataOffset, dataSize); - // Wait until the proxy have sent my data and flag - while (*trig != 0) {} + // Check only the high 64 bits + while (*(volatile uint64_t *)trig->value != 0) {} + + // Trigger sending data and flag + setTrigger(trig, devConn.connId, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} @@ -285,7 +280,7 @@ int main(int argc, const char *argv[]) cudaStreamEndCapture(stream, &graph); cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - int cudagraphwarmup = 100; + int cudagraphwarmup = 10; for (int i = 0; i < cudagraphwarmup; ++i) { cudaGraphLaunch(instance, stream); } @@ -294,7 +289,7 @@ int main(int argc, const char *argv[]) // measure runtime // CUDACHECK(cudaEventRecord(ev_start, stream)); double t0 = getTime(); - int cudagraphlaunch = 100; + int cudagraphlaunch = 10; for (int i = 0; i < cudagraphlaunch; ++i) { // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); cudaGraphLaunch(instance, stream); From 86dd8e2e17bc751688693dc43fb587850cb04528 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Mon, 13 Mar 2023 14:21:37 +0000 Subject: [PATCH 04/16] Remove unused macro --- src/include/mscclpp.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 1aef4de0..bae46d45 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -26,8 +26,6 @@ typedef enum : uint64_t { mscclppData = 0x1, #define MSCCLPP_BITS_OFFSET 31 #define MSCCLPP_BITS_CONNID 10 -#define TRIGGER_VALUE(__TYPE__,__OFFSET__,__SIZE__) (((((__TYPE__) << MSCCLPP_BITS_OFFSET) + (__OFFSET__)) << MSCCLPP_BITS_SIZE) + __SIZE__ ) - // the summation of number of bits must be 128 or less union alignas(16) mscclppTrigger { uint64_t value[2]; From 9b124cabdb6aea8dfa90adba614c058cc2c0e901 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Mon, 13 Mar 2023 14:27:29 +0000 Subject: [PATCH 05/16] cleanup --- src/proxy.cc | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index 05bf7282..ec00f848 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -45,19 +45,10 @@ static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { _mm_storeu_si128((__m128i *)dst, xmm0); } -// TODO(saemal) We need to add a fifo for each DMA engine void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; volatile mscclppProxyRunState_t *run = args->run; - std::vector conns; - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn *conn = &comm->conns[i]; - // TODO(saemal): we need to create another transport type which doesn't need a proxy. - if (conn->transport == mscclppTransportP2P) { - conns.push_back(conn); - } - } mscclppTrigger *fifo = args->triggerFifo; cudaStream_t stream = args->stream; free(_args); @@ -115,13 +106,6 @@ void* mscclppProxyServiceIb(void* _args) { struct mscclppComm *comm = args->comm; struct mscclppIbContext *ibCtx = args->ibCtx; volatile mscclppProxyRunState_t *run = args->run; - std::vector conns; - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn *conn = &comm->conns[i]; - if (conn->transport == mscclppTransportIB) { - conns.push_back(conn); - } - } mscclppTrigger *fifo = args->triggerFifo; free(_args); From ab9298d6e02317cca6a257c494c3929acb5294ff Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Mon, 13 Mar 2023 23:21:27 +0000 Subject: [PATCH 06/16] fixed the bits for trigger --- src/include/mscclpp.h | 18 ++++++++++-------- tests/p2p_test.cu | 5 ++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index bae46d45..2b68b0c3 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -22,21 +22,23 @@ typedef enum : uint64_t { mscclppData = 0x1, mscclppFlag = 0x2, mscclppSync = 0x4} mscclppTriggerType_t; -#define MSCCLPP_BITS_SIZE 30 -#define MSCCLPP_BITS_OFFSET 31 +#define MSCCLPP_BITS_SIZE 32 +#define MSCCLPP_BITS_OFFSET 32 +#define MSCCLPP_BITS_TYPE 3 #define MSCCLPP_BITS_CONNID 10 // the summation of number of bits must be 128 or less union alignas(16) mscclppTrigger { uint64_t value[2]; struct { - // high 64 bits: value[0] - uint64_t dataSize : MSCCLPP_BITS_SIZE; + // first 64 bits: value[0] + uint64_t dataSize : MSCCLPP_BITS_SIZE; uint64_t dataOffset : MSCCLPP_BITS_OFFSET; - uint64_t type : 3; - uint64_t : 0; // ensure 64-bit alignment - // low 64 bits: value[1] - uint64_t connId : MSCCLPP_BITS_CONNID; + uint64_t : (64-MSCCLPP_BITS_SIZE-MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment + // second 64 bits: value[1] + uint64_t connId : MSCCLPP_BITS_CONNID; + uint64_t type : MSCCLPP_BITS_TYPE; + uint64_t : (64-MSCCLPP_BITS_CONNID-MSCCLPP_BITS_TYPE); } fields; }; diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index 677e56fe..c70b5cd4 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -47,10 +47,9 @@ __forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t connId { asm volatile( "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), - "l"((type << (MSCCLPP_BITS_SIZE + MSCCLPP_BITS_OFFSET)) + - (dataOffset << (MSCCLPP_BITS_SIZE)) + + "l"((dataOffset << (MSCCLPP_BITS_SIZE)) + (dataSize)), - "l"(connId)); + "l"((type << MSCCLPP_BITS_CONNID) + connId)); } __global__ void kernel(int rank, int world_size) From 2bcf418b13081fb43c279e3e1056b77346b778df Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Tue, 14 Mar 2023 03:36:38 +0000 Subject: [PATCH 07/16] more comments --- src/include/mscclpp.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 2b68b0c3..591dc5b0 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -38,7 +38,7 @@ union alignas(16) mscclppTrigger { // second 64 bits: value[1] uint64_t connId : MSCCLPP_BITS_CONNID; uint64_t type : MSCCLPP_BITS_TYPE; - uint64_t : (64-MSCCLPP_BITS_CONNID-MSCCLPP_BITS_TYPE); + uint64_t : (64-MSCCLPP_BITS_CONNID-MSCCLPP_BITS_TYPE); // ensure 64-bit alignment } fields; }; From e000eb9177484bf3256bf666b8ca8109204f7ac3 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Tue, 14 Mar 2023 05:26:54 +0000 Subject: [PATCH 08/16] some compilation clean up --- src/bootstrap/bootstrap.cc | 8 ++++---- src/include/bootstrap.h | 2 +- tests/p2p_test.cu | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index 7c48a78b..d3d432b1 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -170,7 +170,7 @@ out: return NULL; } -mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle, bool idFromEnv) { +mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle) { struct mscclppSocket* listenSock; struct bootstrapRootArgs* args; pthread_t thread; @@ -209,10 +209,10 @@ mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool return mscclppInvalidArgument; } if (isRoot) - MSCCLPPCHECK(bootstrapCreateRoot(handle, false)); + MSCCLPPCHECK(bootstrapCreateRoot(handle)); } else { memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union mscclppSocketAddress)); - MSCCLPPCHECK(bootstrapCreateRoot(handle, false)); + MSCCLPPCHECK(bootstrapCreateRoot(handle)); } // printf("addr = %s port = %d\n", inet_ntoa(handle->addr.sin.sin_addr), (int)ntohs(handle->addr.sin.sin_port)); // printf("addr = %s\n", inet_ntoa((*(struct sockaddr_in*)&handle->addr.sa).sin_addr)); @@ -248,7 +248,7 @@ mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscc struct mscclppSocket* proxySocket; mscclppSocketAddress nextAddr; struct mscclppSocket sock, listenSockRoot; - struct extInfo info = { 0 }; + struct extInfo info; MSCCLPPCHECK(mscclppCalloc(&state, 1)); state->rank = rank; diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index e7519085..90588e10 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -19,7 +19,7 @@ struct mscclppBootstrapHandle { static_assert(sizeof(struct mscclppBootstrapHandle) <= sizeof(mscclppUniqueId), "Bootstrap handle is too large to fit inside MSCCLPP unique ID"); mscclppResult_t bootstrapNetInit(const char* ip_port_pair = NULL); -mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle, bool idFromEnv); +mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle); mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot = true, const char* ip_port_pair = NULL); mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm); mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size); diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index c70b5cd4..9a5591d8 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -61,7 +61,9 @@ __global__ void kernel(int rank, int world_size) mscclppDevConn_t devConn = constDevConns[remoteRank]; volatile int *data = (volatile int *)devConn.localBuff; volatile uint64_t *localFlag = devConn.localFlag; +#if (USE_DMA_FOR_P2P == 0) volatile uint64_t *remoteFlag = devConn.remoteFlag; +#endif volatile uint64_t *proxyFlag = devConn.proxyFlag; unsigned int curFifoHead = atomicInc(devConn.triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); mscclppTrigger *trig = &devConn.trigger[curFifoHead]; From 2279a690d13e4f146af9abad2df136a41fa86016 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Tue, 14 Mar 2023 05:38:15 +0000 Subject: [PATCH 09/16] mscclpp_net.h is not required anywhere --- Makefile | 2 +- src/include/mscclpp_net.h | 2 +- src/include/net.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index d23758ad..c4c233e1 100644 --- a/Makefile +++ b/Makefile @@ -108,7 +108,7 @@ LIBSRCS += $(addprefix src/bootstrap/,bootstrap.cc socket.cc) LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS)) LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) -INCEXPORTS := mscclpp.h mscclpp_net.h +INCEXPORTS := mscclpp.h INCTARGETS := $(INCEXPORTS:%=$(BUILDDIR)/$(INCDIR)/%) LIBNAME := libmscclpp.so diff --git a/src/include/mscclpp_net.h b/src/include/mscclpp_net.h index 7208ac4b..70f23099 100644 --- a/src/include/mscclpp_net.h +++ b/src/include/mscclpp_net.h @@ -3,7 +3,7 @@ * * See LICENSE.txt for license information ************************************************************************/ - +// TODO(saemal): this file is to be removed. #ifndef MSCCLPP_NET_H_ #define MSCCLPP_NET_H_ diff --git a/src/include/net.h b/src/include/net.h index 82b8a423..3b3634d8 100644 --- a/src/include/net.h +++ b/src/include/net.h @@ -8,7 +8,7 @@ #define MSCCLPP_INT_NET_H_ #include "mscclpp.h" -#include "mscclpp_net.h" +// #include "mscclpp_net.h" // #include "comm.h" #include "checks.h" From ae01fa49588ab4ae8c574859dade445de27f58ef Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 08:32:19 +0000 Subject: [PATCH 10/16] Remove mscclpp_net.h and net.h --- src/bootstrap/bootstrap.cc | 1 - src/debug.cc | 2 +- src/include/debug.h | 5 +- src/include/mscclpp_net.h | 313 ------------------------------------- src/include/net.h | 46 ------ 5 files changed, 5 insertions(+), 362 deletions(-) delete mode 100644 src/include/mscclpp_net.h delete mode 100644 src/include/net.h diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index d3d432b1..6b1cfa9c 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -8,7 +8,6 @@ #include "core.h" #include "utils.h" #include "bootstrap.h" -#include "net.h" #include #include diff --git a/src/debug.cc b/src/debug.cc index 3c0f663b..6a7235a4 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -5,7 +5,7 @@ ************************************************************************/ #include "core.h" -#include "mscclpp_net.h" +#include "debug.h" #include #include #include diff --git a/src/include/debug.h b/src/include/debug.h index 6c06748c..326ff42e 100644 --- a/src/include/debug.h +++ b/src/include/debug.h @@ -7,7 +7,7 @@ #ifndef MSCCLPP_DEBUG_H_ #define MSCCLPP_DEBUG_H_ -#include "mscclpp_net.h" +#include "mscclpp.h" #include #include #include @@ -19,6 +19,9 @@ // Conform to pthread and NVTX standard #define MSCCLPP_THREAD_NAMELEN 16 +typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel; +typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys; + extern int mscclppDebugLevel; extern uint64_t mscclppDebugMask; extern pthread_mutex_t mscclppDebugLock; diff --git a/src/include/mscclpp_net.h b/src/include/mscclpp_net.h deleted file mode 100644 index 70f23099..00000000 --- a/src/include/mscclpp_net.h +++ /dev/null @@ -1,313 +0,0 @@ -/************************************************************************* - * Copyright (c) 2017-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ -// TODO(saemal): this file is to be removed. -#ifndef MSCCLPP_NET_H_ -#define MSCCLPP_NET_H_ - -#include "mscclpp.h" -#include - -#define MSCCLPP_NET_HANDLE_MAXSIZE 128 - -#define MSCCLPP_PTR_HOST 0x1 -#define MSCCLPP_PTR_CUDA 0x2 -#define MSCCLPP_PTR_DMABUF 0x4 - -// Maximum number of requests per comm object -#define MSCCLPP_NET_MAX_REQUESTS 8 - -typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel; -typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys; - -typedef void (*mscclppDebugLogger_t)(mscclppDebugLogLevel level, unsigned long flags, const char *file, int line, const char *fmt, ...); - -typedef struct { - char* name; // Used mostly for logging. - char* pciPath; // Path to the PCI device in /sys. - uint64_t guid; // Unique identifier for the NIC chip. Important for - // cards with multiple PCI functions (Physical or virtual). - int ptrSupport; // [MSCCLPP_PTR_HOST|MSCCLPP_PTR_CUDA|MSCCLPP_PTR_DMABUF] - int speed; // Port speed in Mbps. - int port; // Port number. - float latency; // Network latency - int maxComms; // Maximum number of comms we can create - int maxRecvs; // Maximum number of grouped receives. -}mscclppNetProperties_v6_t; - -typedef mscclppNetProperties_v6_t mscclppNetProperties_t; - -typedef struct { - // Name of the network (mainly for logs) - const char* name; - // Initialize the network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create a connection. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Connect to a handle and return a sending comm object for that peer. - // This call must not block for the connection to be established, and instead - // should return successfully with sendComm == NULL with the expectation that - // it will be called again until sendComm != NULL. - mscclppResult_t (*connect)(int dev, void* handle, void** sendComm); - // Finalize connection establishment after remote peer has called connect. - // This call must not block for the connection to be established, and instead - // should return successfully with recvComm == NULL with the expectation that - // it will be called again until recvComm != NULL. - mscclppResult_t (*accept)(void* listenComm, void** recvComm); - // Register/Deregister memory. Comm can be either a sendComm or a recvComm. - // Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle); - /* DMA-BUF support */ - mscclppResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle); - mscclppResult_t (*deregMr)(void* comm, void* mhandle); - // Asynchronous send to a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request); - // Asynchronous recv from a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* sizes); - // Close and free send/recv comm objects - mscclppResult_t (*closeSend)(void* sendComm); - mscclppResult_t (*closeRecv)(void* recvComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppNet_v6_t; - -typedef mscclppNet_v6_t mscclppNet_t; - -#define MSCCLPP_PLUGIN_SYMBOL mscclppNetPlugin_v6 - -typedef struct { - // Name of the collective network (mainly for logs) - const char* name; - // Initialize the collective network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters capable of doing collective operations. - // If ndev returns 0, all other functions might be set to NULL. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create connections. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Create a group for collective operations. handles have been created - // using listen() above. rank indicates caller's rank in the collective network. - mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm); - // Returns whether a reduction operation on a data type is supported. - // 1 for supported, 0 otherwise. - mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported); - // Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle); - /* DMA-BUF support */ - mscclppResult_t (*regMrDmaBuf)(void* collComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle); - mscclppResult_t (*deregMr)(void* collComm, void* mhandle); - // Performs an asynchronous allreduce operation on the collective group. - // May return request == NULL if the call cannot be performed (or would block). - mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count, - mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free collective comm objects - mscclppResult_t (*closeColl)(void* collComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppCollNet_v6_t; - -typedef mscclppCollNet_v6_t mscclppCollNet_t; - -#define MSCCLPP_COLLNET_PLUGIN_SYMBOL mscclppCollNetPlugin_v6 - -// v5 struct for backwards compatibility -typedef struct { - // Name of the network (mainly for logs) - const char* name; - // Initialize the network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create a connection. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Connect to a handle and return a sending comm object for that peer. - // This call must not block for the connection to be established, and instead - // should return successfully with sendComm == NULL with the expectation that - // it will be called again until sendComm != NULL. - mscclppResult_t (*connect)(int dev, void* handle, void** sendComm); - // Finalize connection establishment after remote peer has called connect. - // This call must not block for the connection to be established, and instead - // should return successfully with recvComm == NULL with the expectation that - // it will be called again until recvComm != NULL. - mscclppResult_t (*accept)(void* listenComm, void** recvComm); - // Register/Deregister memory. Comm can be either a sendComm or a recvComm. - // Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* comm, void* mhandle); - // Asynchronous send to a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request); - // Asynchronous recv from a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* sizes); - // Close and free send/recv comm objects - mscclppResult_t (*closeSend)(void* sendComm); - mscclppResult_t (*closeRecv)(void* recvComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppNet_v5_t; - -// v5 struct for backwards compatibility -typedef struct { - // Name of the collective network (mainly for logs) - const char* name; - // Initialize the collective network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters capable of doing collective operations. - // If ndev returns 0, all other functions might be set to NULL. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create connections. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Create a group for collective operations. handles have been created - // using listen() above. rank indicates caller's rank in the collective network. - mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm); - // Returns whether a reduction operation on a data type is supported. - // 1 for supported, 0 otherwise. - mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported); - // Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* collComm, void* mhandle); - // Performs an asynchronous allreduce operation on the collective group. - // May return request == NULL if the call cannot be performed (or would block). - mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count, - mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free collective comm objects - mscclppResult_t (*closeColl)(void* collComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppCollNet_v5_t; - -// v4 struct for backwards compatibility -typedef struct { - char* name; // Used mostly for logging. - char* pciPath; // Path to the PCI device in /sys. - uint64_t guid; // Unique identifier for the NIC chip. Important for - // cards with multiple PCI functions (Physical or virtual). - int ptrSupport; // MSCCLPP_PTR_HOST or MSCCLPP_PTR_HOST|MSCCLPP_PTR_CUDA - int speed; // Port speed in Mbps. - int port; // Port number. - int maxComms; // Maximum number of comms we can create -} mscclppNetProperties_v4_t; - -// v4 struct for backwards compatibility -typedef struct { - // Name of the network (mainly for logs) - const char* name; - // Initialize the network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v4_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create a connection. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Connect to a handle and return a sending comm object for that peer. - mscclppResult_t (*connect)(int dev, void* handle, void** sendComm); - // Finalize connection establishment after remote peer has called connectHandle - mscclppResult_t (*accept)(void* listenComm, void** recvComm); - // Register/Deregister memory. Comm can be either a sendComm or a recvComm. - // Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* comm, void* mhandle); - // Asynchronous send to a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*isend)(void* sendComm, void* data, int size, void* mhandle, void** request); - // Asynchronous recv from a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*irecv)(void* recvComm, void* data, int size, void* mhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* recvComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free send/recv comm objects - mscclppResult_t (*closeSend)(void* sendComm); - mscclppResult_t (*closeRecv)(void* recvComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppNet_v4_t; - -// v4 struct for backwards compatibility -typedef struct { - // Name of the collective network (mainly for logs) - const char* name; - // Initialize the collective network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters capable of doing collective operations. - // If ndev returns 0, all other functions might be set to NULL. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v4_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create connections. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Create a group for collective operations. handles have been created - // using listen() above. rank indicates caller's rank in the collective network. - mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm); - // Returns whether a reduction operation on a data type is supported. - // 1 for supported, 0 otherwise. - mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported); - // Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* collComm, void* mhandle); - // Performs an asynchronous allreduce operation on the collective group. - // May return request == NULL if the call cannot be performed (or would block). - mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count, - mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free collective comm objects - mscclppResult_t (*closeColl)(void* collComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppCollNet_v4_t; - -#endif // end include guard diff --git a/src/include/net.h b/src/include/net.h deleted file mode 100644 index 3b3634d8..00000000 --- a/src/include/net.h +++ /dev/null @@ -1,46 +0,0 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef MSCCLPP_INT_NET_H_ -#define MSCCLPP_INT_NET_H_ - -#include "mscclpp.h" -// #include "mscclpp_net.h" -// #include "comm.h" -#include "checks.h" - -typedef char mscclppNetHandle_t[MSCCLPP_NET_HANDLE_MAXSIZE]; - -mscclppResult_t mscclppNetPluginInit(); -// mscclppResult_t mscclppNetInit(struct mscclppComm* comm); -// int mscclppNetVersion(struct mscclppComm* comm); - -// // Translation to external API -// static const char* mscclppNetName(struct mscclppComm* comm) { return comm->mscclppNet->name; } -// static mscclppResult_t mscclppNetDevices(struct mscclppComm* comm, int* ndev) { MSCCLPPCHECK(comm->mscclppNet->devices(ndev)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetGetProperties(struct mscclppComm* comm, int dev, mscclppNetProperties_t* props) { MSCCLPPCHECK(comm->mscclppNet->getProperties(dev, props)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetListen(struct mscclppComm* comm, int dev, void* handle, void** listenComm) { MSCCLPPCHECK(comm->mscclppNet->listen(dev, handle, listenComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetConnect(struct mscclppComm* comm, int dev, void* handle, void** sendComm) { MSCCLPPCHECK(comm->mscclppNet->connect(dev, handle, sendComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetAccept(struct mscclppComm* comm, void* listenComm, void** recvComm) { MSCCLPPCHECK(comm->mscclppNet->accept(listenComm, recvComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetRegMr(struct mscclppComm* comm, void* netComm, void* data, int size, int type, void** mhandle) { MSCCLPPCHECK(comm->mscclppNet->regMr(netComm, data, size, type, mhandle)); return mscclppSuccess; } -// /* DMA-BUF support */ -// static mscclppResult_t mscclppNetRegMrDmaBuf(struct mscclppComm* comm, void* netComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle) { MSCCLPPCHECK(comm->mscclppNet->regMrDmaBuf(netComm, data, size, type, offset, fd, mhandle)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetDeregMr(struct mscclppComm* comm, void* netComm, void* mhandle) { MSCCLPPCHECK(comm->mscclppNet->deregMr(netComm, mhandle)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetIsend(struct mscclppComm* comm, void* sendComm, void* data, int size, int tag, void* mhandle, void** request) { MSCCLPPCHECK(comm->mscclppNet->isend(sendComm, data, size, tag, mhandle, request)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetIrecv(struct mscclppComm* comm, void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request) { MSCCLPPCHECK(comm->mscclppNet->irecv(recvComm, n, data, sizes, tags, mhandles, request)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetIflush(struct mscclppComm* comm, void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) { MSCCLPPCHECK(comm->mscclppNet->iflush(recvComm, n, data, sizes, mhandles, request)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetTest(struct mscclppComm* comm, void* request, int* done, int* sizes) { MSCCLPPCHECK(comm->mscclppNet->test(request, done, sizes)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetCloseSend(struct mscclppComm* comm, void* sendComm) { MSCCLPPCHECK(comm->mscclppNet->closeSend(sendComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetCloseRecv(struct mscclppComm* comm, void* recvComm) { MSCCLPPCHECK(comm->mscclppNet->closeRecv(recvComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetCloseListen(struct mscclppComm* comm, void* listenComm) { MSCCLPPCHECK(comm->mscclppNet->closeListen(listenComm)); return mscclppSuccess; } - -// // Test whether the current GPU support GPU Direct RDMA. -// mscclppResult_t mscclppGpuGdrSupport(struct mscclppComm* comm, int* gdrSupport); - -// extern mscclppNet_t mscclppNetIb; -// extern mscclppNet_t mscclppNetSocket; - -#endif From e89d15450350264bf3bb00bd518f8309c9b1657b Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 08:38:55 +0000 Subject: [PATCH 11/16] Check run state periodically --- src/proxy.cc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index ec00f848..4c88e6fd 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -12,6 +12,7 @@ #include #include +#define MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD 100 // TODO(chhwang): verify if MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0 is useful, otherwise delete this option. #define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 1 @@ -63,9 +64,15 @@ void* mscclppProxyServiceP2P(void* _args) { PROXYCUDACHECK(cudaSetDevice(comm->cudaDev)); PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); + int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; // fifoTail indicates where CPU needs to read the head of the fifo. int fifoTail = 0; - while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { + for (;;) { + if (runCheckCounter-- == 0) { + runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; + // Check if we need to exit + if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; + } // Poll to see if we are ready to send anything readTrigger(&trigger, &fifo[fifoTail]); if (trigger.value[0] == 0) continue; @@ -144,9 +151,15 @@ void* mscclppProxyServiceIb(void* _args) { } #endif + int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; // fifoTail indicates where CPU needs to read the head of the fifo. int fifoTail = 0; - while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { + for (;;) { + if (runCheckCounter-- == 0) { + runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; + // Check if we need to exit + if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; + } // Poll to see if we are ready to send anything readTrigger(&trigger, &fifo[fifoTail]); From 75ec82d2577e07978172daada76645dfba72c599 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 09:00:38 +0000 Subject: [PATCH 12/16] Store fifo tail in proxy state --- src/include/proxy.h | 2 ++ src/proxy.cc | 33 +++++++++++++++------------------ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/include/proxy.h b/src/include/proxy.h index 5e616aa2..223747d1 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -16,6 +16,8 @@ struct mscclppProxyState { mscclppProxyRunState_t run; mscclppTrigger *cpuTriggerFifo; mscclppTrigger *gpuTriggerFifo; + // cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo. + unsigned int cpuTriggerFifoTail; unsigned int *gpuTriggerFifoHead; void *cpuTriggerFifoGdrDesc; }; diff --git a/src/proxy.cc b/src/proxy.cc index 4c88e6fd..81f2cf23 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -37,8 +37,7 @@ struct proxyArgs { struct mscclppComm* comm; struct mscclppIbContext* ibCtx; cudaStream_t stream; - volatile mscclppProxyRunState_t* run; - mscclppTrigger *triggerFifo; + struct mscclppProxyState *proxyState; }; static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { @@ -49,8 +48,9 @@ static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; - volatile mscclppProxyRunState_t *run = args->run; - mscclppTrigger *fifo = args->triggerFifo; + volatile mscclppProxyRunState_t *run = &args->proxyState->run; + mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; + unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; cudaStream_t stream = args->stream; free(_args); @@ -66,7 +66,6 @@ void* mscclppProxyServiceP2P(void* _args) { int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; // fifoTail indicates where CPU needs to read the head of the fifo. - int fifoTail = 0; for (;;) { if (runCheckCounter-- == 0) { runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; @@ -74,7 +73,7 @@ void* mscclppProxyServiceP2P(void* _args) { if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } // Poll to see if we are ready to send anything - readTrigger(&trigger, &fifo[fifoTail]); + readTrigger(&trigger, &fifo[*fifoTail]); if (trigger.value[0] == 0) continue; struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; @@ -94,8 +93,8 @@ void* mscclppProxyServiceP2P(void* _args) { } // Send completion: reset only the high 64 bits - *(volatile uint64_t *)(&fifo[fifoTail]) = 0; - fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; + *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; } // Need a sync in case previous copies are not completed @@ -112,8 +111,9 @@ void* mscclppProxyServiceIb(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; struct mscclppIbContext *ibCtx = args->ibCtx; - volatile mscclppProxyRunState_t *run = args->run; - mscclppTrigger *fifo = args->triggerFifo; + volatile mscclppProxyRunState_t *run = &args->proxyState->run; + mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; + unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; free(_args); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) @@ -152,8 +152,6 @@ void* mscclppProxyServiceIb(void* _args) { #endif int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; - // fifoTail indicates where CPU needs to read the head of the fifo. - int fifoTail = 0; for (;;) { if (runCheckCounter-- == 0) { runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; @@ -161,7 +159,7 @@ void* mscclppProxyServiceIb(void* _args) { if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } // Poll to see if we are ready to send anything - readTrigger(&trigger, &fifo[fifoTail]); + readTrigger(&trigger, &fifo[*fifoTail]); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; @@ -261,8 +259,8 @@ void* mscclppProxyServiceIb(void* _args) { } // Send completion: reset only the high 64 bits - *(volatile uint64_t *)(&fifo[fifoTail]) = 0; - fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; + *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; #endif } *run = MSCCLPP_PROXY_RUN_STATE_IDLE; @@ -297,12 +295,11 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { MSCCLPPCHECK(mscclppCalloc(&args, 1)); args->comm = comm; args->ibCtx = is_p2p ? NULL : comm->ibContext[i]; - args->run = &comm->proxyState[i].run; - args->triggerFifo = comm->proxyState[i].cpuTriggerFifo; + args->proxyState = &comm->proxyState[i]; if (is_p2p) { CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); } - *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; + comm->proxyState[i].run = MSCCLPP_PROXY_RUN_STATE_RUNNING; pthread_create(&comm->proxyState[i].thread, NULL, mscclppProxyService, args); if (is_p2p) { mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); From aacee9727bc3d3736ffde485bbb83c0972bd4908 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 09:11:51 +0000 Subject: [PATCH 13/16] trigger wrappers --- src/include/mscclpp.h | 20 ++++++++++++++++++++ tests/p2p_test.cu | 23 +++++------------------ 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 591dc5b0..85a6df24 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -69,6 +69,26 @@ union alignas(16) mscclppTrigger { ***************************************/ struct mscclppDevConn { +#ifdef __CUDACC__ + __forceinline__ __device__ mscclppTrigger *getTrigger() { + unsigned int curFifoHead = atomicInc(this->triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); + return &this->trigger[curFifoHead]; + } + + __forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) { + asm volatile( + "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), + "l"((dataOffset << (MSCCLPP_BITS_SIZE)) + + (dataSize)), + "l"((type << MSCCLPP_BITS_CONNID) + this->connId)); + } + + __forceinline__ __device__ void waitTrigger(mscclppTrigger *trig) { + // Check only the first 64 bits + while (*(volatile uint64_t *)trig->value != 0) {} + } +#endif // __CUDACC__ + int tag; void* localBuff; diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index 9a5591d8..04a412d6 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -42,16 +42,6 @@ static double getTime(void) __constant__ mscclppDevConn_t constDevConns[16]; -__forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t connId, uint64_t type, - uint64_t dataOffset, uint64_t dataSize) -{ - asm volatile( - "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), - "l"((dataOffset << (MSCCLPP_BITS_SIZE)) + - (dataSize)), - "l"((type << MSCCLPP_BITS_CONNID) + connId)); -} - __global__ void kernel(int rank, int world_size) { if (threadIdx.x % 32 != 0) return; @@ -65,8 +55,7 @@ __global__ void kernel(int rank, int world_size) volatile uint64_t *remoteFlag = devConn.remoteFlag; #endif volatile uint64_t *proxyFlag = devConn.proxyFlag; - unsigned int curFifoHead = atomicInc(devConn.triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); - mscclppTrigger *trig = &devConn.trigger[curFifoHead]; + mscclppTrigger *trig = devConn.getTrigger(); uint64_t baseFlag = *localFlag; @@ -86,11 +75,10 @@ __global__ void kernel(int rank, int world_size) #if (USE_DMA_FOR_P2P == 1) // Wait until the proxy have sent my data and flag - // Check only the high 64 bits - while (*(volatile uint64_t *)trig->value != 0) {} + devConn.waitTrigger(trig); // Trigger sending data and flag - setTrigger(trig, devConn.connId, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); + devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} @@ -99,11 +87,10 @@ __global__ void kernel(int rank, int world_size) if (devConn.remoteBuff == NULL) { // IB // Wait until the proxy have sent my data and flag - // Check only the high 64 bits - while (*(volatile uint64_t *)trig->value != 0) {} + devConn.waitTrigger(trig); // Trigger sending data and flag - setTrigger(trig, devConn.connId, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); + devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} From 135520a14ae7cc38cb9cd22fe0d52dbaa75cd10e Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 09:21:52 +0000 Subject: [PATCH 14/16] cleanups --- src/include/comm.h | 208 --------------------------------------------- src/proxy.cc | 8 +- 2 files changed, 4 insertions(+), 212 deletions(-) diff --git a/src/include/comm.h b/src/include/comm.h index 81449fe2..65435951 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -177,231 +177,23 @@ struct mscclppConn { }; struct mscclppComm { -// struct mscclppMemoryStack memPermanent, memScoped; -// // List of destructors to run when comm is destructed -// struct mscclppDestructor* destructorHead; - -// struct mscclppChannel channels[MAXCHANNELS]; -// struct mscclppPeerInfo* peerInfo; -// struct mscclppTopoSystem* topo; - struct mscclppConn conns[MAXCONNECTIONS]; int nConns; -// mscclppNet_t* mscclppNet; -// mscclppCollNet_t* mscclppCollNet; void* bootstrap; -// // Bitmasks for mscclppTransportP2pSetup -// uint64_t* connectSend; -// uint64_t* connectRecv; uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches. int rank; // my rank in the communicator int nRanks; // number of GPUs in communicator int cudaDev; // my cuda device index -// int compCap; // compute capability of the GPU -// int64_t busId; // my PCI bus ID in int format -// cpu_set_t cpuAffinity; // CPU affinity of the GPU - - // int node; - // int nNodes; - // int localRank; - // int localRanks; - // int maxLocalRanks; - // int* rankToNode; - // int* rankToLocalRank; - // int* localRankToRank; -// // localRanks and localRanktoRank for all nodes -// struct mscclppNodeRanks* nodeRanks; - -// bool checkPointers; -// bool dmaBufSupport; - -// // Counter for tracking CUDA launches (P2P and collectives included) -// uint64_t opCount; -// // Collective operation counter -// uint64_t collOpCount; - -// // Channels for collectives -// int nChannels; -// // Channels (per peer) for p2p -// int p2pnChannels; -// int p2pnChannelsPerPeer; -// int p2pChannels[MAXCHANNELS]; - -// // Should this comm allocate LL buffers for network P2P connections? -// bool allocP2pNetLLBuffers; - -// // Buffer sizes -// int buffSizes[MSCCLPP_NUM_PROTOCOLS]; -// int p2pChunkSize; - -// // Algorithm/Protocols thresholds -// ssize_t threadThresholds[MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; -// float latencies[MSCCLPP_NUM_FUNCTIONS][MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; -// float bandwidths[MSCCLPP_NUM_FUNCTIONS][MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; -// int maxThreads[MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; - -// /* This attribute can indicate the states of communicators and return code of -// * asynchronous MSCCLPP operations. */ -// mscclppResult_t asyncResult; // Flag to ask MSCCLPP kernels to abort volatile uint32_t *abortFlag; -// // Device side of the communicator (for cudaFree's) -// struct mscclppDevComm* devComm; // actually = &mscclppDevCommAndChannels::comm - -// // Operation pool. -// int workFifoDepth; // size of workFifoHeap[], power of 2 -// struct mscclppWork* workFifoHeap; -// struct mscclppWork* devWorkFifoHeap; -// void* workFifoHeapGdrHandle; - -// // Work completion notificaion -// uint32_t* workFifoDone/*[MAXCHANNELS]*/; // in cudaHost memory -// uint32_t workFifoSent; // Monotonic (mod 1<<32) index of next unused fifo slot. -// uint32_t workFifoAckdMin; // Monotonic index of least unprocessed fifo slot over all channels. - -// // Intra-process sync -// struct mscclppComm* intraComm0; // leader of intra-process comms (self possible) -// struct mscclppComm* intraNext; // next of intra-process comms, intraComm0 is head -// int intraRank; -// int intraRanks; -// uint32_t intraBarrierPhase; -// char intraPad1[64 - sizeof(uint64_t)]; -// uint64_t intraBarrierCounter; // only used if this is intraComm0 -// char intraPad2[64 - sizeof(uint64_t)]; -// uint64_t intraBarrierGate; // only used if this is intraComm0 - struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS]; - // Last one is for P2P proxies. struct mscclppProxyState proxyState[MSCCLPP_IB_MAX_DEVS + 1]; - -// // Whether this communicator uses collNet -// int collNetSupport; -// int intraHighestTransportType; - -// size_t channelSize; // User requested work size (bytes) for channel partitions - -// // Internal streams -// struct mscclppStrongStream deviceStream, hostStream; - -// // pools backed by comm->memPermanent -// struct mscclppMemoryPool memPool_mscclppProxyOp; -// struct mscclppMemoryPool memPool_mscclppKernelPlan; -// struct mscclppMemoryPool memPool_mscclppPointerList; -// // Next comm in this thread's active mscclppGroup[Start|End](). Holds "0x1" when -// // this comm is not yet in a group. -// struct mscclppComm* groupNext; -// // Subset of those in groupNext list. Holds 0x1 if not needing preconnect. -// struct mscclppComm* preconnectNext; -// int persistentRefs; // number of persistent plan-lists capturing this comm -// struct mscclppTasks tasks; - -// // user-created reduction ops -// int userRedOpCapacity, userRedOpFreeHead; -// mscclppUserRedOp *userRedOps; - -// // Queue of things for the main thread to do -// struct mscclppIntruQueueMpsc callbackQueue; - -// // List of kernel plans built form tasks. -// struct mscclppIntruQueue planQueue; -// // First of the unlaunched kernels in `planQueue` -// struct mscclppKernelPlan* unlaunchedPlansHead; - -// // communicator mode -// int blocking; -// // initState is to more conveniently reclaim resources when errors happen. -// mscclppResult_t initState; -// // flag to indicate if mscclppCommFinalize() is called -// bool finalizeCalled; -// // shared structures for finalization -// int finalizeRankCnt; }; -// enum mscclppLaunchMode { -// mscclppLaunchModeInvalid=0, -// mscclppLaunchModeParallel, -// mscclppLaunchModeGroup -// }; -// extern enum mscclppLaunchMode mscclppParamLaunchMode; - -// void mscclppCommPushFree(struct mscclppComm* comm, void* buf); -// void mscclppCommPushCudaFree(struct mscclppComm* comm, void* buf); -// void mscclppCommPushCudaHostFree(struct mscclppComm* comm, void* buf); -// void mscclppCommPushCudaGdrFree(struct mscclppComm* comm, void* handle); - -// inline mscclppResult_t mscclppCommPollCallbacks(struct mscclppComm* comm, bool waitSome) { -// mscclppResult_t result = mscclppSuccess; -// struct mscclppCommCallback* cb = mscclppIntruQueueMpscDequeueAll(&comm->callbackQueue, waitSome); -// while (cb != nullptr) { -// struct mscclppCommCallback* next = cb->next; -// mscclppResult_t res1 = cb->fn(comm, cb); // may reclaim memory of cb -// if (res1 != mscclppSuccess) result = res1; -// cb = next; -// } -// MSCCLPPCHECK(result); -// return mscclppSuccess; -// } - -// inline void mscclppCommIntraBarrierIn(struct mscclppComm* comm, uint32_t x) { -// int phase = comm->intraBarrierPhase; -// if (comm->intraRanks == 1) { -// // Release everyone (just me). -// comm->intraBarrierGate = (uint64_t(x)<<32) | (phase^1); -// } else { -// struct mscclppComm* comm0 = comm->intraComm0; -// uint64_t count = __atomic_add_fetch(&comm0->intraBarrierCounter, (uint64_t(x)<<32) + 1, __ATOMIC_RELEASE); -// if (uint32_t(count) == uint32_t(comm->intraRanks)) { -// // Reset. -// __atomic_store_n(&comm0->intraBarrierCounter, 0, __ATOMIC_RELAXED); -// // Release everyone. -// __atomic_store_n(&comm0->intraBarrierGate, (count>>32<<32) | (phase^1), __ATOMIC_RELEASE); -// } -// } -// } - -// // returns sum of x values contributed to mscclppCommIntraBarrierIn(comm, x) -// inline uint32_t mscclppCommIntraBarrierOut(struct mscclppComm* comm) { -// struct mscclppComm* comm0 = comm->intraComm0; -// comm->intraBarrierPhase ^= 1; -// uint32_t phase = comm->intraBarrierPhase; -// uint64_t gate = __atomic_load_n(&comm0->intraBarrierGate, __ATOMIC_RELAXED); -// if ((gate & 1) != phase) { -// uint64_t t0 = clockNano(); -// do { -// // Spin vigorously for first 5us. -// if (clockNano()-t0 >= 5*1000) sched_yield(); -// gate = __atomic_load_n(&comm0->intraBarrierGate, __ATOMIC_RELAXED); -// } while ((gate & 1) != phase); -// } -// if (comm->intraRanks != 1) __atomic_thread_fence(__ATOMIC_ACQUIRE); -// return gate>>32; -// } - -// // Scrambles the bits of non-builtin values of mscclppRedOp_t according to the -// // communicator memory address. Used to catch bugs so that integer handles -// // associated with this communicator won't collide with handles of other -// // communicatrs. This function is its own inverse. -// static inline mscclppRedOp_t mscclppUserRedOpMangle(mscclppComm *comm, mscclppRedOp_t op) { -// // Preserve the built-in values. -// if(int(op) < int(mscclppNumOps)) -// return op; -// uint64_t h = reinterpret_cast(comm); -// h ^= h >> 32; -// h *= 0x9e3779b97f4a7c13u; // Knuth's 64-bit magical hash constant -// h >>= 32; // h is now an excellent 32-bit hash of the comm pointer -// h &= int(mscclppMaxRedOp); // mscclppMaxRedOp is a power of 2 minus 1 -// int op1 = int(h) ^ int(op); -// // Since builtin values are preserved, we also have to preserve their preimage. -// return op1 < int(mscclppNumOps) ? op : mscclppRedOp_t(op1); -// } - -// mscclppResult_t mscclppCommEnsureReady(mscclppComm_t comm); -// mscclppResult_t mscclppCommSetAsyncError(mscclppComm_t comm, mscclppResult_t nextState); - #endif diff --git a/src/proxy.cc b/src/proxy.cc index 81f2cf23..a46be2b6 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -122,12 +122,12 @@ void* mscclppProxyServiceIb(void* _args) { SEND_STATE_INPROGRESS }; int *sendState; - uint64_t *currentProxyFlagVlaue; + uint64_t *currentProxyFlagValue; if (mscclppCalloc((void **)&sendState, comm->nConns) != mscclppSuccess) { WARN("mscclppCalloc failed: errno %d", errno); return NULL; } - if (mscclppCalloc((void **)¤tProxyFlagVlaue, comm->nConns) != mscclppSuccess) { + if (mscclppCalloc((void **)¤tProxyFlagValue, comm->nConns) != mscclppSuccess) { WARN("mscclppCalloc failed: errno %d", errno); return NULL; } @@ -143,7 +143,7 @@ void* mscclppProxyServiceIb(void* _args) { for (int i = 0; i < (int)comm->nConns; ++i) { sendState[i] = SEND_STATE_INIT; struct mscclppConn *conn = &comm->conns[i]; - currentProxyFlagVlaue[i] = *conn->cpuProxyFlag; + currentProxyFlagValue[i] = *conn->cpuProxyFlag; // Post recv if (conn->ibQp->postRecv(0) != 0) { WARN("postRecv failed: errno %d", errno); @@ -195,7 +195,7 @@ void* mscclppProxyServiceIb(void* _args) { } if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { // TODO(chhwang): cpu flush - *((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagVlaue[trigger.fields.connId]; + *((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagValue[trigger.fields.connId]; // recv completion if (conn->ibQp->postRecv(wc->wr_id) != 0) { WARN("postRecv failed: errno %d", errno); From c2859d258c56a525d70e82b58b04d5a8de007267 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 09:22:28 +0000 Subject: [PATCH 15/16] Use aligned ld/st --- src/proxy.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index a46be2b6..9e55a98b 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -41,8 +41,8 @@ struct proxyArgs { }; static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { - __m128i xmm0 = _mm_loadu_si128((__m128i *)src); - _mm_storeu_si128((__m128i *)dst, xmm0); + __m128i xmm0 = _mm_load_si128((__m128i *)src); + _mm_store_si128((__m128i *)dst, xmm0); } void* mscclppProxyServiceP2P(void* _args) { From dc41c58769e24d7233f259bd1c2e29056073bb05 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 10:05:56 +0000 Subject: [PATCH 16/16] Alloc proxy states on demand --- src/include/comm.h | 140 +------------------------------------------- src/include/proxy.h | 4 ++ src/init.cc | 29 +++++++-- src/proxy.cc | 45 ++++++-------- 4 files changed, 46 insertions(+), 172 deletions(-) diff --git a/src/include/comm.h b/src/include/comm.h index 65435951..46c910d0 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -7,24 +7,9 @@ #ifndef MSCCLPP_COMM_H_ #define MSCCLPP_COMM_H_ -// #include "transport.h" -// #include "p2p.h" -// #include "collectives.h" #include "proxy.h" -// #include "strongstream.h" #include "ib.h" -// #if CUDART_VERSION < 9000 -// struct cudaLaunchParams { -// void *func; -// dim3 gridDim; -// dim3 blockDim; -// void **args; -// size_t sharedMem; -// cudaStream_t stream; -// }; -// #endif - // #define CACHE_LINE_SIZE 128 // #define MEM_ALIGN 4096 // #define CUDA_IPC_MIN 2097152UL @@ -36,128 +21,6 @@ #define MAXCONNECTIONS 1024 -// struct mscclppSendMem { -// union { -// struct { -// uint64_t head; -// char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)]; -// void* ptrExchange; -// uint64_t redOpArgExchange[2]; -// char pad2[CACHE_LINE_SIZE-sizeof(void*)-2*sizeof(uint64_t)]; -// int offsFifo[MSCCLPP_STEPS]; -// }; -// char pad3[MEM_ALIGN]; -// }; -// }; - -// struct mscclppRecvMem { -// union { -// struct { -// uint64_t tail; -// char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)]; -// int sizesFifo[MSCCLPP_STEPS]; -// int offsFifo[MSCCLPP_STEPS]; -// int flush; // For GDRCopy-based flush -// }; -// char pad4[MEM_ALIGN]; -// }; -// }; - -// enum helperThreadState {ThreadStart, ThreadStop}; - -// #define MSCCLPP_IPC_POOL_SIZE (2*MSCCLPP_MAX_LOCAL_RANKS*MSCCLPP_MAX_OPS) - -// struct mscclppGraphHelperResources { -// mscclppComm* comm; -// pthread_mutex_t threadLock; -// pthread_cond_t threadCond; -// enum helperThreadState threadState; -// void* ipcBases[MSCCLPP_IPC_POOL_SIZE]; -// int ipcTail; -// int ipcHead; -// }; - -// struct mscclppUserRedOp { -// int freeNext; // -1=allocated, otherwise index of next free entry in array -// mscclppDataType_t datatype; -// mscclppDevRedOpFull opFull; -// }; - -// struct mscclppNodeRanks { -// int localRanks; -// int* localRankToRank; -// }; - -// struct mscclppDestructor { -// struct mscclppDestructor* next; -// void* obj; -// mscclppResult_t(*fn)(struct mscclppDestructor* me); -// }; - -// struct mscclppCommCallback { -// struct mscclppCommCallback* next; -// mscclppResult_t(*fn)(struct mscclppComm* comm, struct mscclppCommCallback* cb); -// }; - -// struct mscclppChannel { -// struct mscclppChannelPeer* peers; -// struct mscclppDevChannelPeer* devPeers; -// struct mscclppRing ring; -// int* devRingUserRanks; -// struct mscclppTree tree; -// struct mscclppTree collnetChain; -// struct mscclppDirect collnetDirect; -// int id; // index of this channel -// uint32_t workFifoSent; // last used work index+1 -// uint64_t p2pOpCount; -// }; - -// struct mscclppWorkList { -// struct mscclppWorkList* next; -// struct mscclppWork work; -// }; - -// struct mscclppPointerList { -// struct mscclppPointerList* next; -// void *ptr; -// }; - -// struct mscclppKernelPlan { -// // A kernel plan is also a callback that reclaims itself. Hence this must -// // be the first member. -// struct mscclppCommCallback reclaimer; -// struct mscclppMemoryPool memPool_mscclppProxyOp; // memory to return to comm in cleanup - -// struct mscclppComm* comm; -// struct mscclppKernelPlan* next; - -// bool persistent; // aka captured in a graph -// bool kernelSpecialized; -// void *kernelFn; -// int channelUbound; // only channels c < channelUbound are present -// int channelCount; // number of channels present -// uint64_t channelMask; // which channels are present, channelCount == popcount(channelMask) -// bool hasProxyOps; // does any channel have a non-empty proxyOpQueue -// int threadPerBlock; -// // workHeap fields are null until uploadWorkFifo() or preparePersistentKernel() -// struct mscclppWork* workHead; - -// int collOpCount; // zero based for this plan - -// struct mscclppIntruQueue ipcMemQueue; - -// struct Channel { -// int nWork; -// union { -// int nWorkElem; // used for coll and reg coll -// int p2pTailElem[2]; // used for p2p, indexed by mscclppWorkElemP2pType-1 -// }; -// size_t collBytes; -// struct mscclppIntruQueue workQueue; -// struct mscclppIntruQueue proxyOpQueue; -// } channels[MAXCHANNELS]; -// }; - struct mscclppConn { mscclppTransport_t transport; int remoteRank; @@ -192,8 +55,7 @@ struct mscclppComm { volatile uint32_t *abortFlag; struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS]; - // Last one is for P2P proxies. - struct mscclppProxyState proxyState[MSCCLPP_IB_MAX_DEVS + 1]; + struct mscclppProxyState *proxyState[MSCCLPP_PROXY_MAX_NUM]; }; #endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 223747d1..61f9ea24 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -5,6 +5,8 @@ #include "comm.h" #include +#define MSCCLPP_PROXY_MAX_NUM (MSCCLPP_IB_MAX_DEVS + 1) // One is for a P2P proxy. + typedef enum { MSCCLPP_PROXY_RUN_STATE_IDLE = 0, MSCCLPP_PROXY_RUN_STATE_RUNNING, @@ -20,6 +22,8 @@ struct mscclppProxyState { unsigned int cpuTriggerFifoTail; unsigned int *gpuTriggerFifoHead; void *cpuTriggerFifoGdrDesc; + // NULL for the P2P proxy. + struct mscclppIbContext *ibContext; }; mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); diff --git a/src/init.cc b/src/init.cc index a914d225..5cb2cdcd 100644 --- a/src/init.cc +++ b/src/init.cc @@ -202,12 +202,29 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i } conn->ibCtx = comm->ibContext[ibDevIdx]; } - int proxyIdx = (ibDevIdx == -1) ? MSCCLPP_IB_MAX_DEVS : ibDevIdx; - struct mscclppProxyState *proxyState = &comm->proxyState[proxyIdx]; - if (proxyState->cpuTriggerFifo == NULL) { - MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo, - MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc)); - MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1)); + // Find a proxy state that uses the given IB device + struct mscclppProxyState *proxyState = NULL; + for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { + if (comm->proxyState[i] == NULL) { + // Cannot find, create a new one + MSCCLPPCHECK(mscclppCalloc(&proxyState, 1)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo, + MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc)); + MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1)); + proxyState->ibContext = conn->ibCtx; + comm->proxyState[i] = proxyState; + break; + } + if (comm->proxyState[i]->ibContext == conn->ibCtx) { + // `conn->ibCtx == NULL` indicatess the P2P proxy. + proxyState = comm->proxyState[i]; + break; + } + } + if (proxyState == NULL) { + // Cannot reach + WARN("Unexpected error"); + return mscclppInternalError; } conn->devConn = devConnOut; conn->devConn->localBuff = localBuff; diff --git a/src/proxy.cc b/src/proxy.cc index 9e55a98b..8e4bdfed 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -35,9 +35,8 @@ static void NumaBind(int node) struct proxyArgs { struct mscclppComm* comm; - struct mscclppIbContext* ibCtx; - cudaStream_t stream; struct mscclppProxyState *proxyState; + cudaStream_t stream; }; static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { @@ -110,7 +109,7 @@ void* mscclppProxyServiceP2P(void* _args) { void* mscclppProxyServiceIb(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; - struct mscclppIbContext *ibCtx = args->ibCtx; + struct mscclppIbContext *ibCtx = args->proxyState->ibContext; volatile mscclppProxyRunState_t *run = &args->proxyState->run; mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; @@ -270,9 +269,8 @@ void* mscclppProxyServiceIb(void* _args) { void* mscclppProxyService(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; - struct mscclppIbContext *ibCtx = args->ibCtx; void *ret; - if (ibCtx == NULL) { + if (args->proxyState->ibContext == NULL) { ret = mscclppProxyServiceP2P(_args); } else { ret = mscclppProxyServiceIb(_args); @@ -281,42 +279,35 @@ void* mscclppProxyService(void* _args) { } mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS + 1; ++i) { - // `i == MSCCLPP_IB_MAX_DEVS` is for the P2P proxy - bool is_p2p = (i == MSCCLPP_IB_MAX_DEVS); - if (!is_p2p) { - if (comm->ibContext[i] == NULL) continue; - } - if (comm->proxyState[i].cpuTriggerFifo == NULL) { - // reachable when there is no mscclppTransportP2P type connection - continue; - } + for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { + struct mscclppProxyState *proxyState = comm->proxyState[i]; + if (proxyState == NULL) break; + bool is_p2p = (proxyState->ibContext == NULL); + struct proxyArgs *args; MSCCLPPCHECK(mscclppCalloc(&args, 1)); args->comm = comm; - args->ibCtx = is_p2p ? NULL : comm->ibContext[i]; - args->proxyState = &comm->proxyState[i]; + args->proxyState = proxyState; if (is_p2p) { CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); } - comm->proxyState[i].run = MSCCLPP_PROXY_RUN_STATE_RUNNING; - pthread_create(&comm->proxyState[i].thread, NULL, mscclppProxyService, args); + proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; + pthread_create(&proxyState->thread, NULL, mscclppProxyService, args); if (is_p2p) { - mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); + mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); } else { - mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service IB - %02d", i); + mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i); } } return mscclppSuccess; } mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS + 1; ++i) { - // `i == MSCCLPP_IB_MAX_DEVS` is for the P2P proxy - if (i < MSCCLPP_IB_MAX_DEVS) { - if (comm->ibContext[i] == NULL) continue; - } - volatile int *run = (volatile int *)&comm->proxyState[i].run; + for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { + struct mscclppProxyState *proxyState = comm->proxyState[i]; + if (proxyState == NULL) break; + + volatile int *run = (volatile int *)&proxyState->run; if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) { continue; }