From 75ec82d2577e07978172daada76645dfba72c599 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 14 Mar 2023 09:00:38 +0000 Subject: [PATCH] Store fifo tail in proxy state --- src/include/proxy.h | 2 ++ src/proxy.cc | 33 +++++++++++++++------------------ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/include/proxy.h b/src/include/proxy.h index 5e616aa2..223747d1 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -16,6 +16,8 @@ struct mscclppProxyState { mscclppProxyRunState_t run; mscclppTrigger *cpuTriggerFifo; mscclppTrigger *gpuTriggerFifo; + // cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo. + unsigned int cpuTriggerFifoTail; unsigned int *gpuTriggerFifoHead; void *cpuTriggerFifoGdrDesc; }; diff --git a/src/proxy.cc b/src/proxy.cc index 4c88e6fd..81f2cf23 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -37,8 +37,7 @@ struct proxyArgs { struct mscclppComm* comm; struct mscclppIbContext* ibCtx; cudaStream_t stream; - volatile mscclppProxyRunState_t* run; - mscclppTrigger *triggerFifo; + struct mscclppProxyState *proxyState; }; static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { @@ -49,8 +48,9 @@ static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; - volatile mscclppProxyRunState_t *run = args->run; - mscclppTrigger *fifo = args->triggerFifo; + volatile mscclppProxyRunState_t *run = &args->proxyState->run; + mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; + unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; cudaStream_t stream = args->stream; free(_args); @@ -66,7 +66,6 @@ void* mscclppProxyServiceP2P(void* _args) { int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; // fifoTail indicates where CPU needs to read the head of the fifo. - int fifoTail = 0; for (;;) { if (runCheckCounter-- == 0) { runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; @@ -74,7 +73,7 @@ void* mscclppProxyServiceP2P(void* _args) { if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } // Poll to see if we are ready to send anything - readTrigger(&trigger, &fifo[fifoTail]); + readTrigger(&trigger, &fifo[*fifoTail]); if (trigger.value[0] == 0) continue; struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; @@ -94,8 +93,8 @@ void* mscclppProxyServiceP2P(void* _args) { } // Send completion: reset only the high 64 bits - *(volatile uint64_t *)(&fifo[fifoTail]) = 0; - fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; + *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; } // Need a sync in case previous copies are not completed @@ -112,8 +111,9 @@ void* mscclppProxyServiceIb(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; struct mscclppIbContext *ibCtx = args->ibCtx; - volatile mscclppProxyRunState_t *run = args->run; - mscclppTrigger *fifo = args->triggerFifo; + volatile mscclppProxyRunState_t *run = &args->proxyState->run; + mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; + unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; free(_args); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) @@ -152,8 +152,6 @@ void* mscclppProxyServiceIb(void* _args) { #endif int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; - // fifoTail indicates where CPU needs to read the head of the fifo. - int fifoTail = 0; for (;;) { if (runCheckCounter-- == 0) { runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; @@ -161,7 +159,7 @@ void* mscclppProxyServiceIb(void* _args) { if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } // Poll to see if we are ready to send anything - readTrigger(&trigger, &fifo[fifoTail]); + readTrigger(&trigger, &fifo[*fifoTail]); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; @@ -261,8 +259,8 @@ void* mscclppProxyServiceIb(void* _args) { } // Send completion: reset only the high 64 bits - *(volatile uint64_t *)(&fifo[fifoTail]) = 0; - fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; + *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; #endif } *run = MSCCLPP_PROXY_RUN_STATE_IDLE; @@ -297,12 +295,11 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { MSCCLPPCHECK(mscclppCalloc(&args, 1)); args->comm = comm; args->ibCtx = is_p2p ? NULL : comm->ibContext[i]; - args->run = &comm->proxyState[i].run; - args->triggerFifo = comm->proxyState[i].cpuTriggerFifo; + args->proxyState = &comm->proxyState[i]; if (is_p2p) { CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); } - *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; + comm->proxyState[i].run = MSCCLPP_PROXY_RUN_STATE_RUNNING; pthread_create(&comm->proxyState[i].thread, NULL, mscclppProxyService, args); if (is_p2p) { mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service P2P - %02d", comm->cudaDev);