Merge pull request #17 from microsoft/saemal/fix-trigger

New trigger FIFO
This commit is contained in:
Saeed Maleki
2023-03-20 16:05:56 -07:00
committed by GitHub
9 changed files with 495 additions and 147 deletions

View File

@@ -116,7 +116,7 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR)
LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR)
TESTSDIR := tests
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu allgather_test.cu)
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu allgather_test.cu allgather_test2.cu)
TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS))
TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS))

View File

@@ -55,6 +55,7 @@ struct mscclppComm {
volatile uint32_t *abortFlag;
struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS];
cudaStream_t stream; // DMA engine stream for P2P
struct mscclppProxyState *proxyState[MSCCLPP_PROXY_MAX_NUM];
};

View File

@@ -42,7 +42,38 @@ union alignas(16) mscclppTrigger {
} fields;
};
/**************************************
typedef uint64_t mscclppRequest_t;
typedef mscclppTrigger* mscclppTrigger_t;
struct mscclppConcurrentFifo {
#ifdef __CUDACC__
__forceinline__ __device__ mscclppRequest_t getTrigger(mscclppTrigger_t* trig) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->triggerFifoHead,1);
while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->triggerFifoTail));
*trig = &this->triggerFifo[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE];
return curFifoHead;
}
__forceinline__ __device__ void setTrigger(mscclppTrigger_t trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) {
asm volatile(
"st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value),
"l"((dataOffset << (MSCCLPP_BITS_SIZE)) +
(dataSize)),
"l"((type << MSCCLPP_BITS_CONNID) + this->connId));
}
__forceinline__ __device__ void waitTrigger(mscclppRequest_t req) {
while (*(volatile uint64_t *)triggerFifoTail <= req);
}
#endif // __CUDACC__
mscclppTrigger* triggerFifo;
uint64_t* triggerFifoTail; // read by both device and host. written only by host
uint64_t* triggerFifoHead; // read by both device and host. written only by device
int connId;
};
/***************************************************************************************************************
* A mscclppDevConn provides a zero-copy connection between a sender and a receiver that are
* connected via P2P NVLink or IB.
* The communication API is one-sided meaning that not both side of a connection are involved
@@ -72,45 +103,8 @@ union alignas(16) mscclppTrigger {
* (note that an atomicInc is used to enable concurrent calls to getTrigger). setTrigger rights the right work element to the fifo
* so that the CPU proxy can consume it.
*
***************************************/
**************************************************************************************************************/
struct mscclppDevConn {
#ifdef __CUDACC__
__forceinline__ __device__ mscclppTrigger *getTrigger() {
unsigned int curFifoHead = atomicInc(this->triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1);
return &this->triggerFifo[curFifoHead];
}
__forceinline__ __device__ mscclppTrigger *acquireTrigger() {
unsigned int *cnt = this->triggerFifoCounter;
unsigned int old = atomicAdd(cnt, 1);
while (old >= MSCCLPP_PROXY_FIFO_SIZE) {
atomicSub(cnt, 1);
while (*(volatile unsigned int *)cnt >= MSCCLPP_PROXY_FIFO_SIZE) {}
old = atomicAdd(cnt, 1);
}
// Up to MSCCLPP_PROXY_FIFO_SIZE threads can enter here at the same time
return getTrigger();
}
__forceinline__ __device__ void releaseTrigger() {
atomicSub(this->triggerFifoCounter, 1);
}
__forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) {
asm volatile(
"st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value),
"l"((dataOffset << (MSCCLPP_BITS_SIZE)) +
(dataSize)),
"l"((type << MSCCLPP_BITS_CONNID) + this->connId));
}
__forceinline__ __device__ void waitTrigger(mscclppTrigger *trig) {
// Check only the first 64 bits
while (*(volatile uint64_t *)trig->value != 0) {}
}
#endif // __CUDACC__
int tag;
void* localBuff;
@@ -118,12 +112,10 @@ struct mscclppDevConn {
void* remoteBuff;
uint64_t* remoteFlag;
uint64_t* proxyFlag; // this is only written by the proxy thread
unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic
mscclppTrigger* triggerFifo;
unsigned int* triggerFifoCounter;
uint64_t* proxyFlag;
int connId;
// multiple threads can access the fifo concurrently
struct mscclppConcurrentFifo fifo;
};
typedef struct mscclppComm* mscclppComm_t;

View File

@@ -13,18 +13,25 @@ typedef enum {
MSCCLPP_PROXY_RUN_STATE_EXITING,
} mscclppProxyRunState_t;
template <typename T>
struct mscclppGDRState {
T* hostPtr;
T* devPtr;
void* desc;
};
struct mscclppProxyState {
mscclppTransport_t transportType;
pthread_t thread;
mscclppProxyRunState_t run;
mscclppTrigger *cpuTriggerFifo;
mscclppTrigger *gpuTriggerFifo;
// cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo.
unsigned int cpuTriggerFifoTail;
unsigned int *gpuTriggerFifoHead;
unsigned int *gpuTriggerFifoCounter;
void *cpuTriggerFifoGdrDesc;
// NULL for the P2P proxy.
struct mscclppIbContext *ibContext;
// fifo allocation that is accessible on both host and device
mscclppGDRState<mscclppTrigger> triggerFifo;
mscclppGDRState<uint64_t> fifoHead;
mscclppGDRState<uint64_t> fifoTail;
struct mscclppIbContext *ibContext; // For IB connection only
cudaStream_t stream; // for P2P DMA engine only
};
mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm);

View File

@@ -87,6 +87,7 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, c
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail);
_comm->rank = rank;
_comm->nRanks = nranks;
// We assume that the user has set the device to the intended one already
CUDACHECK(cudaGetDevice(&_comm->cudaDev));
MSCCLPPCHECK(bootstrapNetInit(ip_port_pair));
@@ -179,7 +180,7 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i
conn->ibCtx = NULL;
conn->ibQp = NULL;
int ibDevIdx = -1;
if (ibDev != NULL) {
if (transportType == mscclppTransportIB) {
// Check if an IB context exists
int firstNullIdx = -1;
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
@@ -192,6 +193,8 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i
break;
}
}
// If not, create a new one
if (ibDevIdx == -1) {
// Create a new context.
ibDevIdx = firstNullIdx;
@@ -200,41 +203,78 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i
return mscclppInternalError;
}
}
// Set the ib context for this conn
conn->ibCtx = comm->ibContext[ibDevIdx];
} else if (transportType == mscclppTransportP2P){
// Check if a DMA context/stream exists
if (comm->stream == NULL){
CUDACHECK(cudaStreamCreateWithFlags(&comm->stream, cudaStreamNonBlocking));
}
} else if (transportType == mscclppTransportSHM){
WARN("Shared memory interconnection is not implemented yet!");
return mscclppInternalError;
} else {
WARN("Unexpected connection type!");
return mscclppInvalidUsage;
}
// Find a proxy state that uses the given IB device
// Find/create a proxy state for the given connection
struct mscclppProxyState *proxyState = NULL;
// First see if there is a matching context
// If not, find the first empty proxy
int firstEmptyProxyIndex = -1;
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
if (comm->proxyState[i] == NULL) {
// Cannot find, create a new one
MSCCLPPCHECK(mscclppCalloc(&proxyState, 1));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo,
MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc));
MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1));
MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoCounter, 1));
struct mscclppProxyState *curProxy = comm->proxyState[i];
if (curProxy && (curProxy->transportType == transportType)){
if ((transportType == mscclppTransportIB && curProxy->ibContext == conn->ibCtx) || (transportType == mscclppTransportP2P)){
proxyState = curProxy;
break; // we found the matching context
}
}
if (curProxy == NULL && firstEmptyProxyIndex == -1){
firstEmptyProxyIndex = i;
}
}
if (proxyState == NULL && firstEmptyProxyIndex == -1){
WARN("Too many proxies have been allocated!");
return mscclppInvalidUsage;
}
// If we couldn't find a matching context, create one
if (proxyState == NULL){
MSCCLPPCHECK(mscclppCalloc(&proxyState, 1));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->triggerFifo.hostPtr, &proxyState->triggerFifo.devPtr,
MSCCLPP_PROXY_FIFO_SIZE, &proxyState->triggerFifo.desc));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->fifoHead.hostPtr, &proxyState->fifoHead.devPtr,
1, &proxyState->fifoHead.desc));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->fifoTail.hostPtr, &proxyState->fifoTail.devPtr,
1, &proxyState->fifoTail.desc));
if (transportType == mscclppTransportIB){
proxyState->ibContext = conn->ibCtx;
comm->proxyState[i] = proxyState;
break;
}
if (comm->proxyState[i]->ibContext == conn->ibCtx) {
// `conn->ibCtx == NULL` indicatess the P2P proxy.
proxyState = comm->proxyState[i];
break;
proxyState->stream = NULL;
} else if (transportType == mscclppTransportP2P){
proxyState->ibContext = NULL;
proxyState->stream = comm->stream;
}
proxyState->transportType = transportType;
comm->proxyState[firstEmptyProxyIndex] = proxyState;
}
if (proxyState == NULL) {
// Cannot reach
WARN("Unexpected error");
WARN("Proxy allocation failed!");
return mscclppInternalError;
}
conn->devConn = devConnOut;
conn->devConn->localBuff = localBuff;
conn->devConn->localFlag = localFlag;
conn->devConn->tag = tag;
conn->devConn->connId = comm->nConns;
conn->devConn->triggerFifo = proxyState->gpuTriggerFifo;
conn->devConn->triggerFifoHead = proxyState->gpuTriggerFifoHead;
conn->devConn->triggerFifoCounter = proxyState->gpuTriggerFifoCounter;
conn->devConn->fifo.connId = comm->nConns;
conn->devConn->fifo.triggerFifo = proxyState->triggerFifo.devPtr;
conn->devConn->fifo.triggerFifoHead = proxyState->fifoHead.devPtr;
conn->devConn->fifo.triggerFifoTail = proxyState->fifoTail.devPtr;
comm->nConns++;
return mscclppSuccess;
@@ -257,9 +297,9 @@ mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*outpu
}
struct mscclppDevConn *devConn = conn->devConn;
MSCCLPPCHECK(mscclppCudaCalloc(&devConn->proxyFlag, 1));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->proxyFlag));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleBuff, devConn->localBuff));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleFlag, devConn->localFlag));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleProxyFlag, devConn->proxyFlag));
return mscclppSuccess;
}

View File

@@ -48,9 +48,11 @@ void* mscclppProxyServiceP2P(void* _args) {
struct proxyArgs *args = (struct proxyArgs *)_args;
struct mscclppComm *comm = args->comm;
volatile mscclppProxyRunState_t *run = &args->proxyState->run;
mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo;
unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail;
cudaStream_t stream = args->stream;
mscclppTrigger *fifo = args->proxyState->triggerFifo.hostPtr;
volatile uint64_t *fifoTail = args->proxyState->fifoTail.hostPtr;
volatile uint64_t *fifoHead = args->proxyState->fifoHead.hostPtr;
cudaStream_t stream = args->proxyState->stream;
free(_args);
// int rank = comm->rank;
@@ -60,9 +62,7 @@ void* mscclppProxyServiceP2P(void* _args) {
// TODO(saemal): either ask user or detect it automatically
NumaBind((comm->cudaDev / 2) ^ 1);
PROXYCUDACHECK(cudaSetDevice(comm->cudaDev));
PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
uint64_t cachedFifoTail = *fifoTail;
int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
// fifoTail indicates where CPU needs to read the head of the fifo.
for (;;) {
@@ -72,9 +72,11 @@ void* mscclppProxyServiceP2P(void* _args) {
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break;
}
// Poll to see if we are ready to send anything
readTrigger(&trigger, &fifo[*fifoTail]);
if (trigger.value[0] == 0) continue;
if (cachedFifoTail == *fifoHead) continue; // no need trigger
readTrigger(&trigger, &fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]);
if (trigger.value[0] == 0) continue; // there is one in progreess
// there is a trigger value ready to be consumed
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
// Iterate over what send is needed
@@ -92,15 +94,15 @@ void* mscclppProxyServiceP2P(void* _args) {
}
// Send completion: reset only the high 64 bits
*(volatile uint64_t *)(&fifo[*fifoTail]) = 0;
*fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
*(volatile uint64_t *)(&fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]) = 0;
cachedFifoTail++;
*fifoTail = cachedFifoTail;
}
// Need a sync in case previous copies are not completed
PROXYCUDACHECK(cudaStreamSynchronize(stream));
*run = MSCCLPP_PROXY_RUN_STATE_IDLE;
PROXYCUDACHECK(cudaStreamDestroy(stream));
// WARN("Proxy exits: rank %d", rank);
return NULL;
@@ -111,8 +113,9 @@ void* mscclppProxyServiceIb(void* _args) {
struct mscclppComm *comm = args->comm;
struct mscclppIbContext *ibCtx = args->proxyState->ibContext;
volatile mscclppProxyRunState_t *run = &args->proxyState->run;
mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo;
unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail;
mscclppTrigger *fifo = args->proxyState->triggerFifo.hostPtr;
volatile uint64_t *fifoTail = args->proxyState->fifoTail.hostPtr;
volatile uint64_t *fifoHead = args->proxyState->fifoHead.hostPtr;
free(_args);
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
@@ -150,6 +153,7 @@ void* mscclppProxyServiceIb(void* _args) {
}
#endif
uint64_t cachedFifoTail = *fifoTail;
int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
for (;;) {
if (runCheckCounter-- == 0) {
@@ -157,8 +161,6 @@ void* mscclppProxyServiceIb(void* _args) {
// Check if we need to exit
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break;
}
// Poll to see if we are ready to send anything
readTrigger(&trigger, &fifo[*fifoTail]);
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
@@ -210,7 +212,11 @@ void* mscclppProxyServiceIb(void* _args) {
}
}
#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1)
if (trigger.value[0] == 0) continue;
// Poll to see if we are ready to send anything
if (cachedFifoTail == *fifoHead) continue; // no need trigger
readTrigger(&trigger, &fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]);
if (trigger.value[0] == 0) continue; // there is one in progreess
// there is a trigger value ready to be consumed
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
@@ -258,10 +264,14 @@ void* mscclppProxyServiceIb(void* _args) {
}
// Send completion: reset only the high 64 bits
*(volatile uint64_t *)(&fifo[*fifoTail]) = 0;
*fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
*(volatile uint64_t *)(&fifo[cachedFifoTail % MSCCLPP_PROXY_FIFO_SIZE]) = 0;
cachedFifoTail++;
*fifoTail = cachedFifoTail;
#endif
}
//TODO(saemal): we need to wait for completion of wc here too
*run = MSCCLPP_PROXY_RUN_STATE_IDLE;
// WARN("Proxy exits: rank %d", rank);
return NULL;
@@ -282,20 +292,17 @@ mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) {
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState *proxyState = comm->proxyState[i];
if (proxyState == NULL) break;
bool is_p2p = (proxyState->ibContext == NULL);
struct proxyArgs *args;
MSCCLPPCHECK(mscclppCalloc(&args, 1));
args->comm = comm;
args->proxyState = proxyState;
if (is_p2p) {
CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking));
}
proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
pthread_create(&proxyState->thread, NULL, mscclppProxyService, args);
if (is_p2p) {
if (proxyState->transportType == mscclppTransportP2P) {
mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev);
} else {
} else if (proxyState->transportType == mscclppTransportIB) {
mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i);
}
}

View File

@@ -8,7 +8,6 @@
#include <string>
#define RANKS_PER_NODE 8
#define USE_DMA_FOR_P2P 1
#define MSCCLPPCHECK(call) do { \
mscclppResult_t res = call; \
@@ -50,61 +49,48 @@ __global__ void kernel(int rank, int world_size, int nelemsPerGPU)
mscclppDevConn_t devConn = constDevConns[remoteRank];
// volatile int *data = (volatile int *)devConn.localBuff;
volatile uint64_t *localFlag = devConn.localFlag;
#if (USE_DMA_FOR_P2P == 0)
volatile uint64_t *remoteFlag = devConn.remoteFlag;
#endif
volatile uint64_t *proxyFlag = devConn.proxyFlag;
uint64_t baseFlag = *localFlag;
__syncthreads();
if (threadIdx.x == 0) {
// Do we need a sys fence?
// __threadfence_system();
*localFlag = baseFlag + 1;
}
// Thread-safely obtain the head trigger
mscclppTrigger *trig = devConn.acquireTrigger();
// Each warp receives data from different ranks
#if (USE_DMA_FOR_P2P == 1)
#if 0
// get a thread-local trigger and a request for waiting on it
mscclppTrigger_t trig;
mscclppRequest_t req = devConn.fifo.getTrigger(&trig);
// Trigger sending data and flag
devConn.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int));
// Trigger sending data, flag and synchronize after
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int));
// we cannot reuse buffer and flag until the request is completed
// Wait until the proxy have sent my data and flag
devConn.waitTrigger(trig);
// Inform other threads that the tail trigger just became idle
devConn.releaseTrigger();
// Wait on the request to make sure it is safe to reuse buffer and flag
devConn.fifo.waitTrigger(req);
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {}
while (*proxyFlag == baseFlag);
#else
for (int i = 1; i < world_size; i++){
__syncthreads();
if (remoteRank != ((rank+i) % world_size)) continue;
// get a thread-local trigger and a request for waiting on it
mscclppTrigger_t trig;
mscclppRequest_t req = devConn.fifo.getTrigger(&trig);
#else // USE_DMA_FOR_P2P == 0
// Trigger sending data, flag and synchronize after
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int));
if (devConn.remoteBuff == NULL) { // IB
// Wait until the proxy have sent my data and flag
devConn.waitTrigger(trig);
// Trigger sending data and flag
devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int));
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {}
} else { // P2P
// Directly read data
volatile int *remoteData = (volatile int *)devConn.remoteBuff;
// Wait until the remote data is set
while (*remoteFlag == baseFlag) {}
// Read remote data
data[remoteRank] = remoteData[remoteRank];
// Wait on the request to make sure it is safe to reuse buffer and flag
devConn.fifo.waitTrigger(req);
}
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag);
#endif
}
int rankToLocalRank(int rank)
@@ -185,14 +171,14 @@ int main(int argc, const char *argv[])
int ibNum = cudaNumToIbNum(cudaNum);
CUDACHECK(cudaSetDevice(cudaNum));
std::string ibDevStr = "mlx5_ib" + std::to_string(ibNum);
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
int *data_d;
uint64_t *flag_d;
size_t data_size = 1024*1024*16;
size_t data_size = 1024*1024*1024;
int nelemsPerGPU = data_size / sizeof(int) / world_size;
CUDACHECK(cudaMalloc(&data_d, data_size));
CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t)));
@@ -297,8 +283,9 @@ int main(int argc, const char *argv[])
float ms = (t1-t0)*1000.0;
// CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end));
double time_in_us = ms * 1000. / (float) cudagraphlaunch / (float) cudagraphiter;
printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, (double) (data_size) / 1024./1024./1024./(time_in_us/1e6));
printf("rank: %d, time: %f us/iter algBW %f GBps\n", rank, time_in_us, (double) (data_size) / 1e9 /(time_in_us/1e6));
MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int)));
MSCCLPPCHECK(mscclppProxyStop(comm));
MSCCLPPCHECK(mscclppCommDestroy(comm));

311
tests/allgather_test2.cu Normal file
View File

@@ -0,0 +1,311 @@
#include "mscclpp.h"
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string>
#define RANKS_PER_NODE 8
#define MSCCLPPCHECK(call) do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
// Check CUDA RT calls
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if( err != cudaSuccess ) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while(false)
// Measure current time in second.
static double getTime(void)
{
struct timespec tspec;
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
printf("clock_gettime failed\n");
exit(EXIT_FAILURE);
}
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}
__constant__ mscclppDevConn_t constDevConns[16];
__global__ void kernel(int rank, int world_size, int nelemsPerGPU)
{
if (threadIdx.x % 32 != 0) return;
int warpId = threadIdx.x / 32;
bool isIB = false;
if (warpId >= world_size-1) isIB = true;
if (isIB) warpId = warpId - (world_size-1);
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
mscclppDevConn_t devConn = constDevConns[remoteRank];
if (isIB) devConn = constDevConns[remoteRank + world_size];
// volatile int *data = (volatile int *)devConn.localBuff;
volatile uint64_t *localFlag = devConn.localFlag;
volatile uint64_t *proxyFlag = devConn.proxyFlag;
uint64_t baseFlag = *localFlag;
__syncthreads();
if (threadIdx.x == 0) {
// Do we need a sys fence?
// __threadfence_system();
*localFlag = baseFlag + 1;
}
// Each warp receives data from different ranks
#if 0
// get a thread-local trigger and a request for waiting on it
mscclppTrigger_t trig;
mscclppRequest_t req = devConn.fifo.getTrigger(&trig);
// Trigger sending data, flag and synchronize after
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU*sizeof(int));
// Wait on the request to make sure it is safe to reuse buffer and flag
devConn.fifo.waitTrigger(req);
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag);
#else
for (int i = 1; i < world_size; i++){
__syncthreads();
if (remoteRank != ((rank+i) % world_size)) continue;
// get a thread-local trigger and a request for waiting on it
mscclppTrigger_t trig;
mscclppRequest_t req = devConn.fifo.getTrigger(&trig);
// Trigger sending data, flag and synchronize after
int ibPortion = nelemsPerGPU/12;//nelemsPerGPU/12;
if (isIB)
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int) + (nelemsPerGPU - ibPortion)*sizeof(int), ibPortion*sizeof(int));
else
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU-ibPortion)*sizeof(int));
// Wait on the request to make sure it is safe to reuse buffer and flag
devConn.fifo.waitTrigger(req);
}
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag);
#endif
}
int rankToLocalRank(int rank)
{
return rank % RANKS_PER_NODE;
}
int rankToNode(int rank)
{
return rank / RANKS_PER_NODE;
}
int cudaNumToIbNum(int cudaNum)
{
int ibNum;
if (cudaNum == 0) {
ibNum = 0;
} else if (cudaNum == 1) {
ibNum = 4;
} else if (cudaNum == 2) {
ibNum = 1;
} else if (cudaNum == 3) {
ibNum = 5;
} else if (cudaNum == 4) {
ibNum = 2;
} else if (cudaNum == 5) {
ibNum = 6;
} else if (cudaNum == 6) {
ibNum = 3;
} else if (cudaNum == 7) {
ibNum = 7;
} else {
printf("Invalid cudaNum: %d\n", cudaNum);
exit(EXIT_FAILURE);
}
return ibNum;
}
void print_usage(const char *prog)
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
printf("usage: %s IP:PORT rank nranks\n", prog);
#endif
}
int main(int argc, const char *argv[])
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc != 2 && argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank;
int world_size;
if (argc == 4) {
rank = atoi(argv[2]);
world_size = atoi(argv[3]);
} else {
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
}
#else
if (argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank = atoi(argv[2]);
int world_size = atoi(argv[3]);
#endif
int localRank = rankToLocalRank(rank);
int thisNode = rankToNode(rank);
int cudaNum = localRank;
int ibNum = cudaNumToIbNum(cudaNum);
CUDACHECK(cudaSetDevice(cudaNum));
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
int *data_d;
uint64_t *flag_d;
size_t data_size = 1536*1024*1024;
int nelemsPerGPU = data_size / sizeof(int) / world_size;
CUDACHECK(cudaMalloc(&data_d, data_size));
CUDACHECK(cudaMalloc(&flag_d, sizeof(uint64_t)));
CUDACHECK(cudaMemset(data_d, 0, data_size));
CUDACHECK(cudaMemset(flag_d, 0, sizeof(uint64_t)));
int* data_h = new int[nelemsPerGPU*world_size];
for (int i = 0; i < nelemsPerGPU*world_size; i++){
int val = i + 1;
if (i / nelemsPerGPU == rank){
data_h[i] = val;
} else {
data_h[i] = 0;
}
}
CUDACHECK(cudaMemcpy(data_d, data_h, data_size, cudaMemcpyHostToDevice));
mscclppDevConn_t devConns[16];
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
mscclppTransport_t transportType;
const char* ibDev = NULL;
transportType = mscclppTransportP2P;
// Connect with all other ranks
MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, data_d, data_size, flag_d, 0, transportType, ibDev));
}
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
mscclppTransport_t transportType;
const char* ibDev = ibDevStr.c_str();
transportType = mscclppTransportIB;
// Connect with all other ranks
MSCCLPPCHECK(mscclppConnect(comm, &devConns[r+world_size], r, data_d, data_size, flag_d, 0, transportType, ibDev));
}
MSCCLPPCHECK(mscclppConnectionSetup(comm));
MSCCLPPCHECK(mscclppProxyLaunch(comm));
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * 2 * world_size));
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
CUDACHECK(cudaDeviceSynchronize());
kernel<<<1, 32 * 2*(world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU);
CUDACHECK(cudaDeviceSynchronize());
CUDACHECK(cudaMemcpy(data_h, data_d, data_size, cudaMemcpyDeviceToHost));
CUDACHECK(cudaDeviceSynchronize());
for (int i = 0; i < nelemsPerGPU*world_size; i++){
int val = i + 1;
if (data_h[i] != val){
printf("oh uh things went wrong! data_h[%d] (%d) != val (%d)\n", i, data_h[i], val);
break;
}
}
int tmp[16];
MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int)));
// // Perf test
// cudaEvent_t ev_start;
// cudaEvent_t ev_end;
// CUDACHECK(cudaEventCreate(&ev_start));
// CUDACHECK(cudaEventCreate(&ev_end));
// warm up
// int warmupiter = 1000;
// for (int i = 0; i < warmupiter; ++i) {
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU);
// }
// CUDACHECK(cudaDeviceSynchronize());
// MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int)));
// cudaGraph Capture
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
int cudagraphiter = 10;
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, 32 * 2*(world_size - 1), 0, stream>>>(rank, world_size, nelemsPerGPU);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
int cudagraphwarmup = 10;
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
}
CUDACHECK(cudaStreamSynchronize(stream));
// measure runtime
// CUDACHECK(cudaEventRecord(ev_start, stream));
double t0 = getTime();
int cudagraphlaunch = 10;
for (int i = 0; i < cudagraphlaunch; ++i) {
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
cudaGraphLaunch(instance, stream);
}
// CUDACHECK(cudaEventRecord(ev_end, stream));
CUDACHECK(cudaStreamSynchronize(stream));
double t1 = getTime();
float ms = (t1-t0)*1000.0;
// CUDACHECK(cudaEventElapsedTime(&ms, ev_start, ev_end));
double time_in_us = ms * 1000. / (float) cudagraphlaunch / (float) cudagraphiter;
printf("rank: %d, time: %f us/iter algBW %f\n", rank, time_in_us, (double) (data_size) / 1024./1024./1024./(time_in_us/1e6));
MSCCLPPCHECK(mscclppBootStrapAllGather(comm, tmp, sizeof(int)));
MSCCLPPCHECK(mscclppProxyStop(comm));
MSCCLPPCHECK(mscclppCommDestroy(comm));
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc == 2) {
MPI_Finalize();
}
#endif
printf("Succeeded! %d\n", rank);
return 0;
}

View File

@@ -55,7 +55,6 @@ __global__ void kernel(int rank, int world_size)
volatile uint64_t *remoteFlag = devConn.remoteFlag;
#endif
volatile uint64_t *proxyFlag = devConn.proxyFlag;
mscclppTrigger *trig = devConn.getTrigger();
uint64_t baseFlag = *localFlag;
@@ -71,14 +70,18 @@ __global__ void kernel(int rank, int world_size)
*localFlag = baseFlag + 1;
}
// get a thread-local trigger and a request for waiting on it
mscclppTrigger_t trig;
mscclppRequest_t req = devConn.fifo.getTrigger(&trig);
// Each warp receives data from different ranks
#if (USE_DMA_FOR_P2P == 1)
// Wait until the proxy have sent my data and flag
devConn.waitTrigger(trig);
// Trigger sending data, flag and synchronize after
devConn.fifo.setTrigger(trig, mscclppFlag | mscclppData | mscclppSync, rank * sizeof(int), sizeof(int));
// Trigger sending data and flag
devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int));
// Wait on the request to make sure it is safe to reuse buffer and flag
devConn.fifo.waitTrigger(req);
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {}