From 0440096954fe9e8152645b5198e848456776950b Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Mon, 6 Mar 2023 08:34:24 +0000 Subject: [PATCH 1/2] Add proxy run states --- src/include/proxy.h | 8 +++++++- src/proxy.cc | 32 +++++++++++++++----------------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/include/proxy.h b/src/include/proxy.h index 604399e5..225e9fae 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -5,9 +5,15 @@ #include "comm.h" #include +typedef enum { + MSCCLPP_PROXY_RUN_STATE_IDLE = 0, + MSCCLPP_PROXY_RUN_STATE_RUNNING, + MSCCLPP_PROXY_RUN_STATE_EXITING, +} mscclppProxyRunState_t; + struct mscclppProxyState { pthread_t *threads; - int *runs; + mscclppProxyRunState_t *runs; }; mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); diff --git a/src/proxy.cc b/src/proxy.cc index 0e473787..48127571 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -33,7 +33,7 @@ struct proxyArgs { struct mscclppComm* comm; struct mscclppIbContext* ibCtx; cudaStream_t stream; - volatile int* run; + volatile mscclppProxyRunState_t* run; int connIdx; }; @@ -41,8 +41,7 @@ struct proxyArgs { void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; - // TODO(saemal): we perhaps need a finite state for run instead of just 0 and 1 - volatile int *run = args->run; + volatile mscclppProxyRunState_t *run = args->run; struct mscclppConn *conn = &comm->conns[args->connIdx]; cudaStream_t stream = args->stream; free(_args); @@ -60,7 +59,7 @@ void* mscclppProxyServiceP2P(void* _args) { cudaStreamCaptureStatus stat; cudaStreamIsCapturing(stream, &stat); - while (*run) { + while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { // Poll to see if we are ready to send anything trigger.value = *(volatile uint64_t *)conn->cpuTrigger; if (trigger.value == 0) continue; @@ -83,7 +82,7 @@ void* mscclppProxyServiceP2P(void* _args) { volatile uint64_t *tmp = (volatile uint64_t *)conn->cpuTrigger; *tmp = 0; } - *run = 1; + *run = MSCCLPP_PROXY_RUN_STATE_IDLE; PROXYCUDACHECK(cudaStreamDestroy(stream)); // WARN("Proxy exits: rank %d", rank); @@ -97,7 +96,7 @@ void* mscclppProxyServiceIb(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; struct mscclppIbContext *ibCtx = args->ibCtx; - volatile int *run = args->run; + volatile mscclppProxyRunState_t *run = args->run; struct mscclppConn *conn = &comm->conns[args->connIdx]; free(_args); uint64_t currentProxyFlagVlaue = *conn->cpuProxyFlag; @@ -117,7 +116,7 @@ void* mscclppProxyServiceIb(void* _args) { WARN("postRecv failed: errno %d", errno); } - while (*run) { + while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { // Try send if (sendState == SEND_STATE_INIT) { trigger.value = *(volatile uint64_t *)conn->cpuTrigger; @@ -165,7 +164,7 @@ void* mscclppProxyServiceIb(void* _args) { } } } - *run = 1; + *run = MSCCLPP_PROXY_RUN_STATE_IDLE; // WARN("Proxy exits: rank %d", rank); return NULL; } @@ -177,7 +176,7 @@ void* mscclppProxyServiceIb(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; struct mscclppIbContext *ibCtx = args->ibCtx; - volatile int *run = args->run; + volatile mscclppProxyRunState_t *run = args->run; struct mscclppConn *conn = &comm->conns[args->connIdx]; free(_args); @@ -187,7 +186,7 @@ void* mscclppProxyServiceIb(void* _args) { NumaBind(ibCtx->numaNode); - while (*run) { + while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { // Poll to see if we are ready to send anything trigger.value = *(volatile uint64_t *)conn->cpuTrigger; if (trigger.value == 0) continue; @@ -237,7 +236,7 @@ void* mscclppProxyServiceIb(void* _args) { volatile uint64_t *tmp = (volatile uint64_t *)conn->cpuTrigger; *tmp = 0; } - *run = 1; + *run = MSCCLPP_PROXY_RUN_STATE_IDLE; // WARN("Proxy exits: rank %d", rank); return NULL; } @@ -283,7 +282,7 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { args->ibCtx = comm->ibContext[i]; args->run = &comm->proxyState[i].runs[j]; args->connIdx = j; - *args->run = 1; + *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; pthread_create(&comm->proxyState[i].threads[j], NULL, mscclppProxyService, args); mscclppSetThreadName(comm->proxyState[i].threads[j], "MSCCLPP Service %2d - %4d", i, j); } @@ -306,7 +305,7 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { args->run = &proxyState->runs[j]; args->connIdx = j; CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); - *args->run = 1; + *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; pthread_create(&proxyState->threads[j], NULL, mscclppProxyService, args); mscclppSetThreadName(proxyState->threads[j], "MSCCLPP Service %2d - %4d", MSCCLPP_IB_MAX_DEVS + 1, j); } @@ -315,12 +314,11 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { static void _stopProxy(struct mscclppComm* comm, int devIdx, int connIdx) { volatile int *run = (volatile int *)&comm->proxyState[devIdx].runs[connIdx]; - if (*run == 0) return; - *run = 0; - while (*run == 0 && *comm->abortFlag == 0) { + if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) return; + *run = MSCCLPP_PROXY_RUN_STATE_EXITING; + while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) { usleep(1000); } - *run = 0; } mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { From ae7407146b778bbe8dbfaf264adc436c2e2687d4 Mon Sep 17 00:00:00 2001 From: "Saeed Maleki (saemal)" Date: Mon, 6 Mar 2023 08:03:44 -0800 Subject: [PATCH 2/2] removing unnecessary stat probe for cuda graph --- src/proxy.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/proxy.cc b/src/proxy.cc index 0e473787..df607541 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -57,9 +57,6 @@ void* mscclppProxyServiceP2P(void* _args) { PROXYCUDACHECK(cudaSetDevice(comm->cudaDev)); PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - cudaStreamCaptureStatus stat; - cudaStreamIsCapturing(stream, &stat); - while (*run) { // Poll to see if we are ready to send anything trigger.value = *(volatile uint64_t *)conn->cpuTrigger;