mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-27 08:24:22 +00:00
Store fifo tail in proxy state
This commit is contained in:
@@ -16,6 +16,8 @@ struct mscclppProxyState {
|
||||
mscclppProxyRunState_t run;
|
||||
mscclppTrigger *cpuTriggerFifo;
|
||||
mscclppTrigger *gpuTriggerFifo;
|
||||
// cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo.
|
||||
unsigned int cpuTriggerFifoTail;
|
||||
unsigned int *gpuTriggerFifoHead;
|
||||
void *cpuTriggerFifoGdrDesc;
|
||||
};
|
||||
|
||||
33
src/proxy.cc
33
src/proxy.cc
@@ -37,8 +37,7 @@ struct proxyArgs {
|
||||
struct mscclppComm* comm;
|
||||
struct mscclppIbContext* ibCtx;
|
||||
cudaStream_t stream;
|
||||
volatile mscclppProxyRunState_t* run;
|
||||
mscclppTrigger *triggerFifo;
|
||||
struct mscclppProxyState *proxyState;
|
||||
};
|
||||
|
||||
static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) {
|
||||
@@ -49,8 +48,9 @@ static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) {
|
||||
void* mscclppProxyServiceP2P(void* _args) {
|
||||
struct proxyArgs *args = (struct proxyArgs *)_args;
|
||||
struct mscclppComm *comm = args->comm;
|
||||
volatile mscclppProxyRunState_t *run = args->run;
|
||||
mscclppTrigger *fifo = args->triggerFifo;
|
||||
volatile mscclppProxyRunState_t *run = &args->proxyState->run;
|
||||
mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo;
|
||||
unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail;
|
||||
cudaStream_t stream = args->stream;
|
||||
free(_args);
|
||||
|
||||
@@ -66,7 +66,6 @@ void* mscclppProxyServiceP2P(void* _args) {
|
||||
|
||||
int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
|
||||
// fifoTail indicates where CPU needs to read the head of the fifo.
|
||||
int fifoTail = 0;
|
||||
for (;;) {
|
||||
if (runCheckCounter-- == 0) {
|
||||
runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
|
||||
@@ -74,7 +73,7 @@ void* mscclppProxyServiceP2P(void* _args) {
|
||||
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break;
|
||||
}
|
||||
// Poll to see if we are ready to send anything
|
||||
readTrigger(&trigger, &fifo[fifoTail]);
|
||||
readTrigger(&trigger, &fifo[*fifoTail]);
|
||||
if (trigger.value[0] == 0) continue;
|
||||
|
||||
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
|
||||
@@ -94,8 +93,8 @@ void* mscclppProxyServiceP2P(void* _args) {
|
||||
}
|
||||
|
||||
// Send completion: reset only the high 64 bits
|
||||
*(volatile uint64_t *)(&fifo[fifoTail]) = 0;
|
||||
fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
|
||||
*(volatile uint64_t *)(&fifo[*fifoTail]) = 0;
|
||||
*fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
|
||||
}
|
||||
|
||||
// Need a sync in case previous copies are not completed
|
||||
@@ -112,8 +111,9 @@ void* mscclppProxyServiceIb(void* _args) {
|
||||
struct proxyArgs *args = (struct proxyArgs *)_args;
|
||||
struct mscclppComm *comm = args->comm;
|
||||
struct mscclppIbContext *ibCtx = args->ibCtx;
|
||||
volatile mscclppProxyRunState_t *run = args->run;
|
||||
mscclppTrigger *fifo = args->triggerFifo;
|
||||
volatile mscclppProxyRunState_t *run = &args->proxyState->run;
|
||||
mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo;
|
||||
unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail;
|
||||
free(_args);
|
||||
|
||||
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
|
||||
@@ -152,8 +152,6 @@ void* mscclppProxyServiceIb(void* _args) {
|
||||
#endif
|
||||
|
||||
int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
|
||||
// fifoTail indicates where CPU needs to read the head of the fifo.
|
||||
int fifoTail = 0;
|
||||
for (;;) {
|
||||
if (runCheckCounter-- == 0) {
|
||||
runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
|
||||
@@ -161,7 +159,7 @@ void* mscclppProxyServiceIb(void* _args) {
|
||||
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break;
|
||||
}
|
||||
// Poll to see if we are ready to send anything
|
||||
readTrigger(&trigger, &fifo[fifoTail]);
|
||||
readTrigger(&trigger, &fifo[*fifoTail]);
|
||||
|
||||
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
|
||||
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
|
||||
@@ -261,8 +259,8 @@ void* mscclppProxyServiceIb(void* _args) {
|
||||
}
|
||||
|
||||
// Send completion: reset only the high 64 bits
|
||||
*(volatile uint64_t *)(&fifo[fifoTail]) = 0;
|
||||
fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
|
||||
*(volatile uint64_t *)(&fifo[*fifoTail]) = 0;
|
||||
*fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
|
||||
#endif
|
||||
}
|
||||
*run = MSCCLPP_PROXY_RUN_STATE_IDLE;
|
||||
@@ -297,12 +295,11 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) {
|
||||
MSCCLPPCHECK(mscclppCalloc(&args, 1));
|
||||
args->comm = comm;
|
||||
args->ibCtx = is_p2p ? NULL : comm->ibContext[i];
|
||||
args->run = &comm->proxyState[i].run;
|
||||
args->triggerFifo = comm->proxyState[i].cpuTriggerFifo;
|
||||
args->proxyState = &comm->proxyState[i];
|
||||
if (is_p2p) {
|
||||
CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking));
|
||||
}
|
||||
*args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
|
||||
comm->proxyState[i].run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
|
||||
pthread_create(&comm->proxyState[i].thread, NULL, mscclppProxyService, args);
|
||||
if (is_p2p) {
|
||||
mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service P2P - %02d", comm->cudaDev);
|
||||
|
||||
Reference in New Issue
Block a user