From 9cc21f70e65c663e7704d0290d91539f23060796 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Fri, 17 Mar 2023 22:51:11 +0000 Subject: [PATCH 1/9] redesigning fifo --- src/include/mscclpp.h | 17 +++++++++++------ src/init.cc | 16 ++++++++-------- tests/allgather_test.cu | 10 +++++----- tests/p2p_test.cu | 8 ++++---- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 5c76b7af..7c43ee7f 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -74,7 +74,7 @@ union alignas(16) mscclppTrigger { * ***************************************/ -struct mscclppDevConn { +struct mscclppConcurrentFifo { #ifdef __CUDACC__ __forceinline__ __device__ mscclppTrigger *getTrigger() { unsigned int curFifoHead = atomicInc(this->triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); @@ -110,7 +110,15 @@ struct mscclppDevConn { while (*(volatile uint64_t *)trig->value != 0) {} } #endif // __CUDACC__ + unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic + mscclppTrigger* triggerFifo; + unsigned int* triggerFifoCounter; + uint64_t* proxyFlag; + int connId; +}; + +struct mscclppDevConn { int tag; void* localBuff; @@ -119,11 +127,8 @@ struct mscclppDevConn { void* remoteBuff; uint64_t* remoteFlag; - unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic - mscclppTrigger* triggerFifo; - unsigned int* triggerFifoCounter; - uint64_t* proxyFlag; - int connId; + // multiple threads can access the fifo concurrently + struct mscclppConcurrentFifo fifo; }; typedef struct mscclppComm* mscclppComm_t; diff --git a/src/init.cc b/src/init.cc index 3b718f88..5d37be6a 100644 --- a/src/init.cc +++ b/src/init.cc @@ -231,10 +231,10 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i conn->devConn->localBuff = localBuff; conn->devConn->localFlag = localFlag; conn->devConn->tag = tag; - conn->devConn->connId = comm->nConns; - conn->devConn->triggerFifo = proxyState->gpuTriggerFifo; - conn->devConn->triggerFifoHead = proxyState->gpuTriggerFifoHead; - conn->devConn->triggerFifoCounter = proxyState->gpuTriggerFifoCounter; + conn->devConn->fifo.connId = comm->nConns; + conn->devConn->fifo.triggerFifo = proxyState->gpuTriggerFifo; + conn->devConn->fifo.triggerFifoHead = proxyState->gpuTriggerFifoHead; + conn->devConn->fifo.triggerFifoCounter = proxyState->gpuTriggerFifoCounter; comm->nConns++; return mscclppSuccess; @@ -256,10 +256,10 @@ mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*outpu return mscclppInternalError; } struct mscclppDevConn *devConn = conn->devConn; - MSCCLPPCHECK(mscclppCudaCalloc(&devConn->proxyFlag, 1)); + MSCCLPPCHECK(mscclppCudaCalloc(&devConn->fifo.proxyFlag, 1)); CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleBuff, devConn->localBuff)); CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleFlag, devConn->localFlag)); - CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->proxyFlag)); + CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->fifo.proxyFlag)); return mscclppSuccess; } @@ -282,7 +282,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output struct mscclppDevConn *devConn = conn->devConn; devConn->remoteBuff = NULL; devConn->remoteFlag = NULL; - MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuProxyFlag, &devConn->proxyFlag, 1, &conn->cpuProxyFlagGdrDesc)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuProxyFlag, &devConn->fifo.proxyFlag, 1, &conn->cpuProxyFlagGdrDesc)); struct mscclppIbContext *ibCtx = conn->ibCtx; if (conn->ibQp == NULL) { @@ -291,7 +291,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output // TODO(chhwang): can we register only one MR for the following three? MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localBuff, conn->buffSize, &conn->ibBuffMr)); MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localFlag, sizeof(uint64_t), &conn->ibLocalFlagMr)); - MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->proxyFlag, sizeof(uint64_t), &conn->ibProxyFlagMr)); + MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->fifo.proxyFlag, sizeof(uint64_t), &conn->ibProxyFlagMr)); connInfo->infoQp = conn->ibQp->info; connInfo->infoBuffMr = conn->ibBuffMr->info; connInfo->infoLocalFlagMr = conn->ibLocalFlagMr->info; diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 57bb57b3..5b1cd131 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -53,7 +53,7 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) #if (USE_DMA_FOR_P2P == 0) volatile uint64_t *remoteFlag = devConn.remoteFlag; #endif - volatile uint64_t *proxyFlag = devConn.proxyFlag; + volatile uint64_t *proxyFlag = devConn.fifo.proxyFlag; uint64_t baseFlag = *localFlag; @@ -65,19 +65,19 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) } // Thread-safely obtain the head trigger - mscclppTrigger *trig = devConn.acquireTrigger(); + mscclppTrigger *trig = devConn.fifo.acquireTrigger(); // Each warp receives data from different ranks #if (USE_DMA_FOR_P2P == 1) // Trigger sending data and flag - devConn.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); // Wait until the proxy have sent my data and flag - devConn.waitTrigger(trig); + devConn.fifo.waitTrigger(trig); // Inform other threads that the tail trigger just became idle - devConn.releaseTrigger(); + devConn.fifo.releaseTrigger(); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index 04a412d6..2bb4d4e0 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -54,8 +54,8 @@ __global__ void kernel(int rank, int world_size) #if (USE_DMA_FOR_P2P == 0) volatile uint64_t *remoteFlag = devConn.remoteFlag; #endif - volatile uint64_t *proxyFlag = devConn.proxyFlag; - mscclppTrigger *trig = devConn.getTrigger(); + volatile uint64_t *proxyFlag = devConn.fifo.proxyFlag; + mscclppTrigger *trig = devConn.fifo.getTrigger(); uint64_t baseFlag = *localFlag; @@ -75,10 +75,10 @@ __global__ void kernel(int rank, int world_size) #if (USE_DMA_FOR_P2P == 1) // Wait until the proxy have sent my data and flag - devConn.waitTrigger(trig); + devConn.fifo.waitTrigger(trig); // Trigger sending data and flag - devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} From a485a7f238b98a5371ee7d918dddcacf1275c771 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 01:08:05 +0000 Subject: [PATCH 2/9] single node works fine -- multinode is problematic --- src/include/comm.h | 1 + src/include/mscclpp.h | 81 ++++++++++++++++---------------------- src/include/proxy.h | 25 +++++++----- src/init.cc | 87 +++++++++++++++++++++++++++++------------ src/proxy.cc | 55 +++++++++++++++----------- tests/allgather_test.cu | 50 +++++------------------ tests/p2p_test.cu | 15 ++++--- 7 files changed, 165 insertions(+), 149 deletions(-) diff --git a/src/include/comm.h b/src/include/comm.h index 46c910d0..3e7b4f97 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -55,6 +55,7 @@ struct mscclppComm { volatile uint32_t *abortFlag; struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS]; + cudaStream_t stream; // DMA engine stream for P2P struct mscclppProxyState *proxyState[MSCCLPP_PROXY_MAX_NUM]; }; diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 7c43ee7f..a4035710 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -42,7 +42,38 @@ union alignas(16) mscclppTrigger { } fields; }; -/************************************** +typedef uint64_t mscclppRequest_t; +typedef mscclppTrigger* mscclppTrigger_t; + +struct mscclppConcurrentFifo { +#ifdef __CUDACC__ + __forceinline__ __device__ mscclppRequest_t getTrigger(mscclppTrigger_t* trig) { + uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->triggerFifoHead,1); + while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->triggerFifoTail)); + *trig = &this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]; + return curFifoHead; + } + + __forceinline__ __device__ void setTrigger(mscclppTrigger_t trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) { + asm volatile( + "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), + "l"((dataOffset << (MSCCLPP_BITS_SIZE)) + + (dataSize)), + "l"((type << MSCCLPP_BITS_CONNID) + this->connId)); + } + + __forceinline__ __device__ void waitTrigger(mscclppRequest_t req) { + while (*(volatile uint64_t *)triggerFifoTail <= req); + } +#endif // __CUDACC__ + mscclppTrigger* triggerFifo; + uint64_t* triggerFifoTail; // read by both device and host. written only by host + uint64_t* triggerFifoHead; // read by both device and host. written only by device + int connId; +}; + + +/*************************************************************************************************************** * A mscclppDevConn provides a zero-copy connection between a sender and a receiver that are * connected via P2P NVLink or IB. * The communication API is one-sided meaning that not both side of a connection are involved @@ -72,52 +103,7 @@ union alignas(16) mscclppTrigger { * (note that an atomicInc is used to enable concurrent calls to getTrigger). setTrigger rights the right work element to the fifo * so that the CPU proxy can consume it. * - ***************************************/ - -struct mscclppConcurrentFifo { -#ifdef __CUDACC__ - __forceinline__ __device__ mscclppTrigger *getTrigger() { - unsigned int curFifoHead = atomicInc(this->triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); - return &this->triggerFifo[curFifoHead]; - } - - __forceinline__ __device__ mscclppTrigger *acquireTrigger() { - unsigned int *cnt = this->triggerFifoCounter; - unsigned int old = atomicAdd(cnt, 1); - while (old >= MSCCLPP_PROXY_FIFO_SIZE) { - atomicSub(cnt, 1); - while (*(volatile unsigned int *)cnt >= MSCCLPP_PROXY_FIFO_SIZE) {} - old = atomicAdd(cnt, 1); - } - // Up to MSCCLPP_PROXY_FIFO_SIZE threads can enter here at the same time - return getTrigger(); - } - - __forceinline__ __device__ void releaseTrigger() { - atomicSub(this->triggerFifoCounter, 1); - } - - __forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) { - asm volatile( - "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), - "l"((dataOffset << (MSCCLPP_BITS_SIZE)) + - (dataSize)), - "l"((type << MSCCLPP_BITS_CONNID) + this->connId)); - } - - __forceinline__ __device__ void waitTrigger(mscclppTrigger *trig) { - // Check only the first 64 bits - while (*(volatile uint64_t *)trig->value != 0) {} - } -#endif // __CUDACC__ - unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic - mscclppTrigger* triggerFifo; - unsigned int* triggerFifoCounter; - uint64_t* proxyFlag; - int connId; -}; - - + **************************************************************************************************************/ struct mscclppDevConn { int tag; @@ -126,6 +112,7 @@ struct mscclppDevConn { void* remoteBuff; uint64_t* remoteFlag; + uint64_t* proxyFlag; // this is only written by the proxy thread // multiple threads can access the fifo concurrently struct mscclppConcurrentFifo fifo; diff --git a/src/include/proxy.h b/src/include/proxy.h index 7af16279..f7865a16 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -13,18 +13,25 @@ typedef enum { MSCCLPP_PROXY_RUN_STATE_EXITING, } mscclppProxyRunState_t; +template +struct mscclppGDRState { + T* hostPtr; + T* devPtr; + void* desc; +}; + struct mscclppProxyState { + mscclppTransport_t transportType; pthread_t thread; mscclppProxyRunState_t run; - mscclppTrigger *cpuTriggerFifo; - mscclppTrigger *gpuTriggerFifo; - // cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo. - unsigned int cpuTriggerFifoTail; - unsigned int *gpuTriggerFifoHead; - unsigned int *gpuTriggerFifoCounter; - void *cpuTriggerFifoGdrDesc; - // NULL for the P2P proxy. - struct mscclppIbContext *ibContext; + + // fifo allocation that is accessible on both host and device + mscclppGDRState triggerFifo; + mscclppGDRState fifoHead; + mscclppGDRState fifoTail; + + struct mscclppIbContext *ibContext; // For IB connection only + cudaStream_t stream; // for P2P DMA engine only }; mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); diff --git a/src/init.cc b/src/init.cc index 5d37be6a..0ce27e63 100644 --- a/src/init.cc +++ b/src/init.cc @@ -87,6 +87,7 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, c MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail); _comm->rank = rank; _comm->nRanks = nranks; + // We assume that the user has set the device to the intended one already CUDACHECK(cudaGetDevice(&_comm->cudaDev)); MSCCLPPCHECK(bootstrapNetInit(ip_port_pair)); @@ -179,7 +180,7 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i conn->ibCtx = NULL; conn->ibQp = NULL; int ibDevIdx = -1; - if (ibDev != NULL) { + if (transportType == mscclppTransportIB) { // Check if an IB context exists int firstNullIdx = -1; for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { @@ -192,6 +193,8 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i break; } } + + // If not, create a new one if (ibDevIdx == -1) { // Create a new context. ibDevIdx = firstNullIdx; @@ -200,31 +203,67 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i return mscclppInternalError; } } + // Set the ib context for this conn conn->ibCtx = comm->ibContext[ibDevIdx]; + } else if (transportType == mscclppTransportP2P){ + // Check if a DMA context/stream exists + if (comm->stream == NULL){ + CUDACHECK(cudaStreamCreateWithFlags(&comm->stream, cudaStreamNonBlocking)); + } + } else if (transportType == mscclppTransportSHM){ + WARN("Shared memory interconnection is not implemented yet!"); + return mscclppInternalError; + } else { + WARN("Unexpected connection type!"); + return mscclppInvalidUsage; } - // Find a proxy state that uses the given IB device + + + // Find/create a proxy state for the given connection struct mscclppProxyState *proxyState = NULL; + // First see if there is a matching context + // If not, find the first empty proxy + int firstEmptyProxyIndex = -1; for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { - if (comm->proxyState[i] == NULL) { - // Cannot find, create a new one - MSCCLPPCHECK(mscclppCalloc(&proxyState, 1)); - MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo, - MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc)); - MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1)); - MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoCounter, 1)); + struct mscclppProxyState *curProxy = comm->proxyState[i]; + if (curProxy && (curProxy->transportType == transportType)){ + if ((transportType == mscclppTransportIB && curProxy->ibContext == conn->ibCtx) || (transportType == mscclppTransportP2P)){ + proxyState = curProxy; + break; // we found the matching context + } + } + if (curProxy == NULL && firstEmptyProxyIndex == -1){ + firstEmptyProxyIndex = i; + } + } + + if (proxyState == NULL && firstEmptyProxyIndex == -1){ + WARN("Too many proxies have been allocated!"); + return mscclppInvalidUsage; + } + + // If we couldn't find a matching context, create one + if (proxyState == NULL){ + MSCCLPPCHECK(mscclppCalloc(&proxyState, 1)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->triggerFifo.hostPtr, &proxyState->triggerFifo.devPtr, + MSCCLPP_PROXY_FIFO_SIZE, &proxyState->triggerFifo.desc)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->fifoHead.hostPtr, &proxyState->fifoHead.devPtr, + 1, &proxyState->fifoHead.desc)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->fifoTail.hostPtr, &proxyState->fifoTail.devPtr, + 1, &proxyState->fifoTail.desc)); + + if (transportType == mscclppTransportIB){ proxyState->ibContext = conn->ibCtx; - comm->proxyState[i] = proxyState; - break; - } - if (comm->proxyState[i]->ibContext == conn->ibCtx) { - // `conn->ibCtx == NULL` indicatess the P2P proxy. - proxyState = comm->proxyState[i]; - break; + proxyState->stream = NULL; + } else if (transportType == mscclppTransportP2P){ + proxyState->ibContext = NULL; + proxyState->stream = comm->stream; } + comm->proxyState[firstEmptyProxyIndex] = proxyState; } if (proxyState == NULL) { // Cannot reach - WARN("Unexpected error"); + WARN("Proxy allocation failed!"); return mscclppInternalError; } conn->devConn = devConnOut; @@ -232,9 +271,9 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i conn->devConn->localFlag = localFlag; conn->devConn->tag = tag; conn->devConn->fifo.connId = comm->nConns; - conn->devConn->fifo.triggerFifo = proxyState->gpuTriggerFifo; - conn->devConn->fifo.triggerFifoHead = proxyState->gpuTriggerFifoHead; - conn->devConn->fifo.triggerFifoCounter = proxyState->gpuTriggerFifoCounter; + conn->devConn->fifo.triggerFifo = proxyState->triggerFifo.devPtr; + conn->devConn->fifo.triggerFifoHead = proxyState->fifoHead.devPtr; + conn->devConn->fifo.triggerFifoTail = proxyState->fifoTail.devPtr; comm->nConns++; return mscclppSuccess; @@ -256,10 +295,10 @@ mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*outpu return mscclppInternalError; } struct mscclppDevConn *devConn = conn->devConn; - MSCCLPPCHECK(mscclppCudaCalloc(&devConn->fifo.proxyFlag, 1)); + MSCCLPPCHECK(mscclppCudaCalloc(&devConn->proxyFlag, 1)); + CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->proxyFlag)); CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleBuff, devConn->localBuff)); CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleFlag, devConn->localFlag)); - CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->fifo.proxyFlag)); return mscclppSuccess; } @@ -282,7 +321,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output struct mscclppDevConn *devConn = conn->devConn; devConn->remoteBuff = NULL; devConn->remoteFlag = NULL; - MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuProxyFlag, &devConn->fifo.proxyFlag, 1, &conn->cpuProxyFlagGdrDesc)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuProxyFlag, &devConn->proxyFlag, 1, &conn->cpuProxyFlagGdrDesc)); struct mscclppIbContext *ibCtx = conn->ibCtx; if (conn->ibQp == NULL) { @@ -291,7 +330,7 @@ mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output // TODO(chhwang): can we register only one MR for the following three? MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localBuff, conn->buffSize, &conn->ibBuffMr)); MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localFlag, sizeof(uint64_t), &conn->ibLocalFlagMr)); - MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->fifo.proxyFlag, sizeof(uint64_t), &conn->ibProxyFlagMr)); + MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->proxyFlag, sizeof(uint64_t), &conn->ibProxyFlagMr)); connInfo->infoQp = conn->ibQp->info; connInfo->infoBuffMr = conn->ibBuffMr->info; connInfo->infoLocalFlagMr = conn->ibLocalFlagMr->info; diff --git a/src/proxy.cc b/src/proxy.cc index 8e4bdfed..839a40fd 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -48,9 +48,11 @@ void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; volatile mscclppProxyRunState_t *run = &args->proxyState->run; - mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; - unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; - cudaStream_t stream = args->stream; + mscclppTrigger *fifo = args->proxyState->triggerFifo.hostPtr; + volatile uint64_t *fifoTail = args->proxyState->fifoTail.hostPtr; + volatile uint64_t *fifoHead = args->proxyState->fifoHead.hostPtr; + + cudaStream_t stream = args->proxyState->stream; free(_args); // int rank = comm->rank; @@ -60,9 +62,7 @@ void* mscclppProxyServiceP2P(void* _args) { // TODO(saemal): either ask user or detect it automatically NumaBind((comm->cudaDev / 2) ^ 1); - PROXYCUDACHECK(cudaSetDevice(comm->cudaDev)); - PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - + uint64_t cachedFifoTail = *fifoTail; int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; // fifoTail indicates where CPU needs to read the head of the fifo. for (;;) { @@ -72,9 +72,11 @@ void* mscclppProxyServiceP2P(void* _args) { if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } // Poll to see if we are ready to send anything - readTrigger(&trigger, &fifo[*fifoTail]); - if (trigger.value[0] == 0) continue; - + if (cachedFifoTail == *fifoHead) continue; // no need trigger + readTrigger(&trigger, &fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]); + if (trigger.value[0] == 0) continue; // there is one in progreess + // there is a trigger value ready to be consumed + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; // Iterate over what send is needed @@ -92,15 +94,15 @@ void* mscclppProxyServiceP2P(void* _args) { } // Send completion: reset only the high 64 bits - *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; - *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + *(volatile uint64_t *)(&fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]) = 0; + cachedFifoTail++; + *fifoTail = cachedFifoTail; } // Need a sync in case previous copies are not completed PROXYCUDACHECK(cudaStreamSynchronize(stream)); *run = MSCCLPP_PROXY_RUN_STATE_IDLE; - PROXYCUDACHECK(cudaStreamDestroy(stream)); // WARN("Proxy exits: rank %d", rank); return NULL; @@ -111,8 +113,9 @@ void* mscclppProxyServiceIb(void* _args) { struct mscclppComm *comm = args->comm; struct mscclppIbContext *ibCtx = args->proxyState->ibContext; volatile mscclppProxyRunState_t *run = &args->proxyState->run; - mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; - unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; + mscclppTrigger *fifo = args->proxyState->triggerFifo.hostPtr; + volatile uint64_t *fifoTail = args->proxyState->fifoTail.hostPtr; + volatile uint64_t *fifoHead = args->proxyState->fifoHead.hostPtr; free(_args); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) @@ -150,6 +153,7 @@ void* mscclppProxyServiceIb(void* _args) { } #endif + uint64_t cachedFifoTail = *fifoTail; int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; for (;;) { if (runCheckCounter-- == 0) { @@ -210,7 +214,11 @@ void* mscclppProxyServiceIb(void* _args) { } } #else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1) - if (trigger.value[0] == 0) continue; + // Poll to see if we are ready to send anything + if (cachedFifoTail == *fifoHead) continue; // no need trigger + readTrigger(&trigger, &fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]); + if (trigger.value[0] == 0) continue; // there is one in progreess + // there is a trigger value ready to be consumed struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; @@ -258,10 +266,14 @@ void* mscclppProxyServiceIb(void* _args) { } // Send completion: reset only the high 64 bits - *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; - *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + *(volatile uint64_t *)(&fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]) = 0; + cachedFifoTail++; + *fifoTail = cachedFifoTail; #endif } + + //TODO(saemal): we need to wait for completion of wc here too + *run = MSCCLPP_PROXY_RUN_STATE_IDLE; // WARN("Proxy exits: rank %d", rank); return NULL; @@ -282,20 +294,17 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { struct mscclppProxyState *proxyState = comm->proxyState[i]; if (proxyState == NULL) break; - bool is_p2p = (proxyState->ibContext == NULL); struct proxyArgs *args; MSCCLPPCHECK(mscclppCalloc(&args, 1)); args->comm = comm; args->proxyState = proxyState; - if (is_p2p) { - CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); - } + proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; pthread_create(&proxyState->thread, NULL, mscclppProxyService, args); - if (is_p2p) { + if (proxyState->transportType == mscclppTransportP2P) { mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); - } else { + } else if (proxyState->transportType == mscclppTransportIB) { mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i); } } diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 5b1cd131..b9035e28 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -7,8 +7,7 @@ #include #include -#define RANKS_PER_NODE 8 -#define USE_DMA_FOR_P2P 1 +#define RANKS_PER_NODE 2 #define MSCCLPPCHECK(call) do { \ mscclppResult_t res = call; \ @@ -50,10 +49,7 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) mscclppDevConn_t devConn = constDevConns[remoteRank]; // volatile int *data = (volatile int *)devConn.localBuff; volatile uint64_t *localFlag = devConn.localFlag; -#if (USE_DMA_FOR_P2P == 0) - volatile uint64_t *remoteFlag = devConn.remoteFlag; -#endif - volatile uint64_t *proxyFlag = devConn.fifo.proxyFlag; + volatile uint64_t *proxyFlag = devConn.proxyFlag; uint64_t baseFlag = *localFlag; @@ -64,47 +60,21 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) *localFlag = baseFlag + 1; } - // Thread-safely obtain the head trigger - mscclppTrigger *trig = devConn.fifo.acquireTrigger(); - // Each warp receives data from different ranks -#if (USE_DMA_FOR_P2P == 1) - // Trigger sending data and flag + // get a thread-local trigger and a request for waiting on it + mscclppTrigger_t trig; + mscclppRequest_t req = devConn.fifo.getTrigger(&trig); + + // Trigger sending data, flag and synchronize after devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); - // Wait until the proxy have sent my data and flag - devConn.fifo.waitTrigger(trig); - - // Inform other threads that the tail trigger just became idle - devConn.fifo.releaseTrigger(); + // Wait on the request to make sure it is safe to reuse buffer and flag + devConn.fifo.waitTrigger(req); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} -#else // USE_DMA_FOR_P2P == 0 - - if (devConn.remoteBuff == NULL) { // IB - // Wait until the proxy have sent my data and flag - devConn.waitTrigger(trig); - - // Trigger sending data and flag - devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); - - // Wait for receiving data from remote rank - while (*proxyFlag == baseFlag) {} - } else { // P2P - // Directly read data - volatile int *remoteData = (volatile int *)devConn.remoteBuff; - - // Wait until the remote data is set - while (*remoteFlag == baseFlag) {} - - // Read remote data - data[remoteRank] = remoteData[remoteRank]; - } - -#endif } int rankToLocalRank(int rank) @@ -192,7 +162,7 @@ int main(int argc, const char *argv[]) int *data_d; uint64_t *flag_d; - size_t data_size = 1024*1024*16; + size_t data_size = 1024*1; int nelemsPerGPU = data_size / sizeof(int) / world_size; CUDACHECK(cudaMalloc(&data_d, data_size)); CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index 2bb4d4e0..a780b21b 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -54,8 +54,7 @@ __global__ void kernel(int rank, int world_size) #if (USE_DMA_FOR_P2P == 0) volatile uint64_t *remoteFlag = devConn.remoteFlag; #endif - volatile uint64_t *proxyFlag = devConn.fifo.proxyFlag; - mscclppTrigger *trig = devConn.fifo.getTrigger(); + volatile uint64_t *proxyFlag = devConn.proxyFlag; uint64_t baseFlag = *localFlag; @@ -71,14 +70,18 @@ __global__ void kernel(int rank, int world_size) *localFlag = baseFlag + 1; } + // get a thread-local trigger and a request for waiting on it + mscclppTrigger_t trig; + mscclppRequest_t req = devConn.fifo.getTrigger(&trig); + // Each warp receives data from different ranks #if (USE_DMA_FOR_P2P == 1) - // Wait until the proxy have sent my data and flag - devConn.fifo.waitTrigger(trig); + // Trigger sending data, flag and synchronize after + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * sizeof(int), sizeof(int)); - // Trigger sending data and flag - devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); + // Wait on the request to make sure it is safe to reuse buffer and flag + devConn.fifo.waitTrigger(req); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} From 4efc6e98dbe4ccde89db1e1de907c759b477adc2 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 01:26:30 +0000 Subject: [PATCH 3/9] incorrect access fixed --- src/proxy.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index 839a40fd..c11c053b 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -161,8 +161,6 @@ void* mscclppProxyServiceIb(void* _args) { // Check if we need to exit if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } - // Poll to see if we are ready to send anything - readTrigger(&trigger, &fifo[*fifoTail]); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; From 5493e22633ec7051c279a42723754e157ff488b4 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 06:09:07 +0000 Subject: [PATCH 4/9] fixed multinode bug --- src/init.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/init.cc b/src/init.cc index 0ce27e63..8aa975ef 100644 --- a/src/init.cc +++ b/src/init.cc @@ -259,6 +259,7 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i proxyState->ibContext = NULL; proxyState->stream = comm->stream; } + proxyState->transportType = transportType; comm->proxyState[firstEmptyProxyIndex] = proxyState; } if (proxyState == NULL) { From 17cbc84a14fcc6c1d0dbec09df331a0f979467b4 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 06:35:32 +0000 Subject: [PATCH 5/9] both allgather algorithms --- tests/allgather_test.cu | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index b9035e28..d26e30f6 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -7,7 +7,8 @@ #include #include -#define RANKS_PER_NODE 2 +#define RANKS_PER_NODE 8 +#define KERNEL 1 #define MSCCLPPCHECK(call) do { \ mscclppResult_t res = call; \ @@ -61,7 +62,7 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) } // Each warp receives data from different ranks - +#if 0 // get a thread-local trigger and a request for waiting on it mscclppTrigger_t trig; mscclppRequest_t req = devConn.fifo.getTrigger(&trig); @@ -73,7 +74,24 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) devConn.fifo.waitTrigger(req); // Wait for receiving data from remote rank - while (*proxyFlag == baseFlag) {} + while (*proxyFlag == baseFlag); +#else + for (int i = 1; i < world_size; i++){ + __syncthreads(); + if (remoteRank != ((rank+i) % world_size)) continue; + // get a thread-local trigger and a request for waiting on it + mscclppTrigger_t trig; + mscclppRequest_t req = devConn.fifo.getTrigger(&trig); + + // Trigger sending data, flag and synchronize after + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); + + // Wait on the request to make sure it is safe to reuse buffer and flag + devConn.fifo.waitTrigger(req); + } + // Wait for receiving data from remote rank + while (*proxyFlag == baseFlag); +#endif } @@ -162,7 +180,7 @@ int main(int argc, const char *argv[]) int *data_d; uint64_t *flag_d; - size_t data_size = 1024*1; + size_t data_size = 1024*1024*1024; int nelemsPerGPU = data_size / sizeof(int) / world_size; CUDACHECK(cudaMalloc(&data_d, data_size)); CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); From 3e8f6758e5e12e69d715ff7d9da8a360bb9a1eff Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 06:35:40 +0000 Subject: [PATCH 6/9] both allgather algorithms --- tests/allgather_test.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index d26e30f6..1bb0635a 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -180,7 +180,7 @@ int main(int argc, const char *argv[]) int *data_d; uint64_t *flag_d; - size_t data_size = 1024*1024*1024; + size_t data_size = 1024*1024; int nelemsPerGPU = data_size / sizeof(int) / world_size; CUDACHECK(cudaMalloc(&data_d, data_size)); CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); From 8a1ec28ff19f5974272853ae20dcf676228368be Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 19:27:17 +0000 Subject: [PATCH 7/9] single node allgather works very well --- tests/allgather_test.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 1bb0635a..734d679c 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -8,7 +8,6 @@ #include #define RANKS_PER_NODE 8 -#define KERNEL 1 #define MSCCLPPCHECK(call) do { \ mscclppResult_t res = call; \ @@ -173,14 +172,14 @@ int main(int argc, const char *argv[]) int ibNum = cudaNumToIbNum(cudaNum); CUDACHECK(cudaSetDevice(cudaNum)); - std::string ibDevStr = "mlx5_ib" + std::to_string(ibNum); + std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); mscclppComm_t comm; MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); int *data_d; uint64_t *flag_d; - size_t data_size = 1024*1024; + size_t data_size = 1024*1024*1024; int nelemsPerGPU = data_size / sizeof(int) / world_size; CUDACHECK(cudaMalloc(&data_d, data_size)); CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); @@ -287,6 +286,7 @@ int main(int argc, const char *argv[]) double time_in_us = ms * 1000. / (float) cudagraphlaunch / (float) cudagraphiter; printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, (double) (data_size) / 1024./1024./1024./(time_in_us/1e6)); + MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int))); MSCCLPPCHECK(mscclppProxyStop(comm)); MSCCLPPCHECK(mscclppCommDestroy(comm)); From 93afed3e54a4f7f915d208c99ca154b08a497ba0 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Sun, 19 Mar 2023 21:53:36 +0000 Subject: [PATCH 8/9] new allgather algorithm with both DMA and IB on a single node --- Makefile | 2 +- tests/allgather_test2.cu | 311 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 tests/allgather_test2.cu diff --git a/Makefile b/Makefile index b4113774..b0c78aa1 100644 --- a/Makefile +++ b/Makefile @@ -116,7 +116,7 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR) LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR) TESTSDIR := tests -TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu allgather_test.cu) +TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu allgather_test.cu allgather_test2.cu) TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS)) TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) diff --git a/tests/allgather_test2.cu b/tests/allgather_test2.cu new file mode 100644 index 00000000..290b4e9c --- /dev/null +++ b/tests/allgather_test2.cu @@ -0,0 +1,311 @@ +#include "mscclpp.h" +#ifdef MSCCLPP_USE_MPI_FOR_TESTS +#include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include +#include +#include + +#define RANKS_PER_NODE 8 + +#define MSCCLPPCHECK(call) do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \ + return res; \ + } \ +} while (0); + +// Check CUDA RT calls +#define CUDACHECK(cmd) do { \ + cudaError_t err = cmd; \ + if( err != cudaSuccess ) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ +} while(false) + +// Measure current time in second. +static double getTime(void) +{ + struct timespec tspec; + if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { + printf("clock_gettime failed\n"); + exit(EXIT_FAILURE); + } + return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; +} + +__constant__ mscclppDevConn_t constDevConns[16]; + +__global__ void kernel(int rank, int world_size, int nelemsPerGPU) +{ + if (threadIdx.x % 32 != 0) return; + + int warpId = threadIdx.x / 32; + bool isIB = false; + if (warpId >= world_size-1) isIB = true; + if (isIB) warpId = warpId - (world_size-1); + int remoteRank = (warpId < rank) ? warpId : warpId + 1; + mscclppDevConn_t devConn = constDevConns[remoteRank]; + if (isIB) devConn = constDevConns[remoteRank + world_size]; + // volatile int *data = (volatile int *)devConn.localBuff; + volatile uint64_t *localFlag = devConn.localFlag; + volatile uint64_t *proxyFlag = devConn.proxyFlag; + + uint64_t baseFlag = *localFlag; + + __syncthreads(); + if (threadIdx.x == 0) { + // Do we need a sys fence? + // __threadfence_system(); + *localFlag = baseFlag + 1; + } + + // Each warp receives data from different ranks +#if 0 + // get a thread-local trigger and a request for waiting on it + mscclppTrigger_t trig; + mscclppRequest_t req = devConn.fifo.getTrigger(&trig); + + // Trigger sending data, flag and synchronize after + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); + + // Wait on the request to make sure it is safe to reuse buffer and flag + devConn.fifo.waitTrigger(req); + + // Wait for receiving data from remote rank + while (*proxyFlag == baseFlag); +#else + for (int i = 1; i < world_size; i++){ + __syncthreads(); + if (remoteRank != ((rank+i) % world_size)) continue; + // get a thread-local trigger and a request for waiting on it + mscclppTrigger_t trig; + mscclppRequest_t req = devConn.fifo.getTrigger(&trig); + + // Trigger sending data, flag and synchronize after + int ibPortion = nelemsPerGPU/12;//nelemsPerGPU/12; + if (isIB) + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion)*sizeof(int), ibPortion*sizeof(int)); + else + devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU-ibPortion)*sizeof(int)); + // Wait on the request to make sure it is safe to reuse buffer and flag + devConn.fifo.waitTrigger(req); + } + // Wait for receiving data from remote rank + while (*proxyFlag == baseFlag); +#endif + +} + +int rankToLocalRank(int rank) +{ + return rank % RANKS_PER_NODE; +} + +int rankToNode(int rank) +{ + return rank / RANKS_PER_NODE; +} + +int cudaNumToIbNum(int cudaNum) +{ + int ibNum; + if (cudaNum == 0) { + ibNum = 0; + } else if (cudaNum == 1) { + ibNum = 4; + } else if (cudaNum == 2) { + ibNum = 1; + } else if (cudaNum == 3) { + ibNum = 5; + } else if (cudaNum == 4) { + ibNum = 2; + } else if (cudaNum == 5) { + ibNum = 6; + } else if (cudaNum == 6) { + ibNum = 3; + } else if (cudaNum == 7) { + ibNum = 7; + } else { + printf("Invalid cudaNum: %d\n", cudaNum); + exit(EXIT_FAILURE); + } + return ibNum; +} + +void print_usage(const char *prog) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT [rank nranks]\n", prog); +#else + printf("usage: %s IP:PORT rank nranks\n", prog); +#endif +} + +int main(int argc, const char *argv[]) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc != 2 && argc != 4) { + print_usage(argv[0]); + return -1; + } + const char *ip_port = argv[1]; + int rank; + int world_size; + if (argc == 4) { + rank = atoi(argv[2]); + world_size = atoi(argv[3]); + } else { + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + } +#else + if (argc != 4) { + print_usage(argv[0]); + return -1; + } + const char *ip_port = argv[1]; + int rank = atoi(argv[2]); + int world_size = atoi(argv[3]); +#endif + int localRank = rankToLocalRank(rank); + int thisNode = rankToNode(rank); + int cudaNum = localRank; + int ibNum = cudaNumToIbNum(cudaNum); + + CUDACHECK(cudaSetDevice(cudaNum)); + std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); + + mscclppComm_t comm; + MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); + + int *data_d; + uint64_t *flag_d; + size_t data_size = 1536*1024*1024; + int nelemsPerGPU = data_size / sizeof(int) / world_size; + CUDACHECK(cudaMalloc(&data_d, data_size)); + CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t))); + CUDACHECK(cudaMemset(data_d, 0, data_size)); + CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t))); + + int* data_h = new int[nelemsPerGPU*world_size]; + for (int i = 0; i < nelemsPerGPU*world_size; i++){ + int val = i + 1; + if (i / nelemsPerGPU == rank){ + data_h[i] = val; + } else { + data_h[i] = 0; + } + } + CUDACHECK(cudaMemcpy(data_d, data_h, data_size, cudaMemcpyHostToDevice)); + + mscclppDevConn_t devConns[16]; + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + mscclppTransport_t transportType; + const char* ibDev = NULL; + transportType = mscclppTransportP2P; + // Connect with all other ranks + MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, data_d, data_size, flag_d, 0, transportType, ibDev)); + } + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + mscclppTransport_t transportType; + const char* ibDev = ibDevStr.c_str(); + transportType = mscclppTransportIB; + // Connect with all other ranks + MSCCLPPCHECK(mscclppConnect(comm, &devConns[r+world_size], r, data_d, data_size, flag_d, 0, transportType, ibDev)); + } + + MSCCLPPCHECK(mscclppConnectionSetup(comm)); + + MSCCLPPCHECK(mscclppProxyLaunch(comm)); + + CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * 2 * world_size)); + + cudaStream_t stream; + CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); + + + CUDACHECK(cudaDeviceSynchronize()); + kernel<<<1, 32 * 2*(world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); + CUDACHECK(cudaDeviceSynchronize()); + CUDACHECK(cudaMemcpy(data_h, data_d, data_size, cudaMemcpyDeviceToHost)); + CUDACHECK(cudaDeviceSynchronize()); + + for (int i = 0; i < nelemsPerGPU*world_size; i++){ + int val = i + 1; + if (data_h[i] != val){ + printf("oh uh things went wrong! data_h[%d] (%d) != val (%d)\n", i, data_h[i], val); + break; + } + } + int tmp[16]; + MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int))); + +// // Perf test +// cudaEvent_t ev_start; +// cudaEvent_t ev_end; +// CUDACHECK(cudaEventCreate(&ev_start)); +// CUDACHECK(cudaEventCreate(&ev_end)); + + // warm up + // int warmupiter = 1000; + // for (int i = 0; i < warmupiter; ++i) { + // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); + // } + // CUDACHECK(cudaDeviceSynchronize()); + // MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int))); + + // cudaGraph Capture + cudaGraph_t graph; + cudaGraphExec_t instance; + cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); + int cudagraphiter = 10; + for (int i = 0; i < cudagraphiter; ++i) { + kernel<<<1, 32 * 2*(world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU); + } + cudaStreamEndCapture(stream, &graph); + cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); + + int cudagraphwarmup = 10; + for (int i = 0; i < cudagraphwarmup; ++i) { + cudaGraphLaunch(instance, stream); + } + CUDACHECK(cudaStreamSynchronize(stream)); + + // measure runtime +// CUDACHECK(cudaEventRecord(ev_start, stream)); + double t0 = getTime(); + int cudagraphlaunch = 10; + for (int i = 0; i < cudagraphlaunch; ++i) { + // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); + cudaGraphLaunch(instance, stream); + } +// CUDACHECK(cudaEventRecord(ev_end, stream)); + CUDACHECK(cudaStreamSynchronize(stream)); + + double t1 = getTime(); + float ms = (t1-t0)*1000.0; +// CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end)); + double time_in_us = ms * 1000. / (float) cudagraphlaunch / (float) cudagraphiter; + printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, (double) (data_size) / 1024./1024./1024./(time_in_us/1e6)); + + MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int))); + MSCCLPPCHECK(mscclppProxyStop(comm)); + + MSCCLPPCHECK(mscclppCommDestroy(comm)); + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc == 2) { + MPI_Finalize(); + } +#endif + printf("Succeeded! %d\n", rank); + return 0; +} From 7cb290379934a5a74ca606a7673c851547ec2e90 Mon Sep 17 00:00:00 2001 From: Saeed Maleki Date: Mon, 20 Mar 2023 21:07:58 +0000 Subject: [PATCH 9/9] some comment check ins --- tests/allgather_test.cu | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/allgather_test.cu b/tests/allgather_test.cu index 734d679c..06ec90c3 100644 --- a/tests/allgather_test.cu +++ b/tests/allgather_test.cu @@ -55,8 +55,6 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) __syncthreads(); if (threadIdx.x == 0) { - // Do we need a sys fence? - // __threadfence_system(); *localFlag = baseFlag + 1; } @@ -68,6 +66,7 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU) // Trigger sending data, flag and synchronize after devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int)); + // we cannot reuse buffer and flag until the request is completed // Wait on the request to make sure it is safe to reuse buffer and flag devConn.fifo.waitTrigger(req); @@ -284,7 +283,7 @@ int main(int argc, const char *argv[]) float ms = (t1-t0)*1000.0; // CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end)); double time_in_us = ms * 1000. / (float) cudagraphlaunch / (float) cudagraphiter; - printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, (double) (data_size) / 1024./1024./1024./(time_in_us/1e6)); + printf("rank: %d, time: %f us/iter algBW %f GBps\n", rank, time_in_us, (double) (data_size) / 1e9 /(time_in_us/1e6)); MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int))); MSCCLPPCHECK(mscclppProxyStop(comm));