diff --git a/Makefile b/Makefile index d23758ad..c4c233e1 100644 --- a/Makefile +++ b/Makefile @@ -108,7 +108,7 @@ LIBSRCS += $(addprefix src/bootstrap/,bootstrap.cc socket.cc) LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS)) LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) -INCEXPORTS := mscclpp.h mscclpp_net.h +INCEXPORTS := mscclpp.h INCTARGETS := $(INCEXPORTS:%=$(BUILDDIR)/$(INCDIR)/%) LIBNAME := libmscclpp.so diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index 7c48a78b..6b1cfa9c 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -8,7 +8,6 @@ #include "core.h" #include "utils.h" #include "bootstrap.h" -#include "net.h" #include #include @@ -170,7 +169,7 @@ out: return NULL; } -mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle, bool idFromEnv) { +mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle) { struct mscclppSocket* listenSock; struct bootstrapRootArgs* args; pthread_t thread; @@ -209,10 +208,10 @@ mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool return mscclppInvalidArgument; } if (isRoot) - MSCCLPPCHECK(bootstrapCreateRoot(handle, false)); + MSCCLPPCHECK(bootstrapCreateRoot(handle)); } else { memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union mscclppSocketAddress)); - MSCCLPPCHECK(bootstrapCreateRoot(handle, false)); + MSCCLPPCHECK(bootstrapCreateRoot(handle)); } // printf("addr = %s port = %d\n", inet_ntoa(handle->addr.sin.sin_addr), (int)ntohs(handle->addr.sin.sin_port)); // printf("addr = %s\n", inet_ntoa((*(struct sockaddr_in*)&handle->addr.sa).sin_addr)); @@ -248,7 +247,7 @@ mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscc struct mscclppSocket* proxySocket; mscclppSocketAddress nextAddr; struct mscclppSocket sock, listenSockRoot; - struct extInfo info = { 0 }; + struct extInfo info; MSCCLPPCHECK(mscclppCalloc(&state, 1)); state->rank = rank; diff --git a/src/debug.cc b/src/debug.cc index 3c0f663b..6a7235a4 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -5,7 +5,7 @@ ************************************************************************/ #include "core.h" -#include "mscclpp_net.h" +#include "debug.h" #include #include #include diff --git a/src/ib.cc b/src/ib.cc index 8423819d..51a5820c 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -186,8 +186,8 @@ mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct ms qp_init_attr.send_cq = cq; qp_init_attr.recv_cq = cq; qp_init_attr.qp_type = IBV_QPT_RC; - qp_init_attr.cap.max_send_wr = MAXCONNECTIONS; - qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS; + qp_init_attr.cap.max_send_wr = MAXCONNECTIONS * MSCCLPP_PROXY_FIFO_SIZE; + qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS * MSCCLPP_PROXY_FIFO_SIZE; qp_init_attr.cap.max_send_sge = 1; qp_init_attr.cap.max_recv_sge = 1; qp_init_attr.cap.max_inline_data = 0; @@ -381,25 +381,23 @@ int mscclppIbQp::stageSendWithImm(struct mscclppIbMr *ibMr, const mscclppIbMrInf int mscclppIbQp::postSend() { - struct ibv_send_wr *bad_wr; - int ret = ibv_post_send(this->qp, this->wrs, &bad_wr); - if (ret != 0) { - return ret; - } - // std::memset(this->wrs, 0, sizeof(struct ibv_send_wr) * this->wrn); - // std::memset(this->sges, 0, sizeof(struct ibv_sge) * this->wrn); - this->wrn = 0; - return 0; + struct ibv_send_wr *bad_wr; + int ret = ibv_post_send(this->qp, this->wrs, &bad_wr); + if (ret != 0) { + return ret; + } + this->wrn = 0; + return 0; } int mscclppIbQp::postRecv(uint64_t wrId) { - struct ibv_recv_wr wr, *bad_wr; - wr.wr_id = wrId; - wr.sg_list = nullptr; - wr.num_sge = 0; - wr.next = nullptr; - return ibv_post_recv(this->qp, &wr, &bad_wr); + struct ibv_recv_wr wr, *bad_wr; + wr.wr_id = wrId; + wr.sg_list = nullptr; + wr.num_sge = 0; + wr.next = nullptr; + return ibv_post_recv(this->qp, &wr, &bad_wr); } int mscclppIbQp::pollCq() diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index e7519085..90588e10 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -19,7 +19,7 @@ struct mscclppBootstrapHandle { static_assert(sizeof(struct mscclppBootstrapHandle) <= sizeof(mscclppUniqueId), "Bootstrap handle is too large to fit inside MSCCLPP unique ID"); mscclppResult_t bootstrapNetInit(const char* ip_port_pair = NULL); -mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle, bool idFromEnv); +mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle); mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot = true, const char* ip_port_pair = NULL); mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm); mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size); diff --git a/src/include/comm.h b/src/include/comm.h index 8da70875..46c910d0 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -7,24 +7,9 @@ #ifndef MSCCLPP_COMM_H_ #define MSCCLPP_COMM_H_ -// #include "transport.h" -// #include "p2p.h" -// #include "collectives.h" #include "proxy.h" -// #include "strongstream.h" #include "ib.h" -// #if CUDART_VERSION < 9000 -// struct cudaLaunchParams { -// void *func; -// dim3 gridDim; -// dim3 blockDim; -// void **args; -// size_t sharedMem; -// cudaStream_t stream; -// }; -// #endif - // #define CACHE_LINE_SIZE 128 // #define MEM_ALIGN 4096 // #define CUDA_IPC_MIN 2097152UL @@ -36,139 +21,12 @@ #define MAXCONNECTIONS 1024 -// struct mscclppSendMem { -// union { -// struct { -// uint64_t head; -// char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)]; -// void* ptrExchange; -// uint64_t redOpArgExchange[2]; -// char pad2[CACHE_LINE_SIZE-sizeof(void*)-2*sizeof(uint64_t)]; -// int offsFifo[MSCCLPP_STEPS]; -// }; -// char pad3[MEM_ALIGN]; -// }; -// }; - -// struct mscclppRecvMem { -// union { -// struct { -// uint64_t tail; -// char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)]; -// int sizesFifo[MSCCLPP_STEPS]; -// int offsFifo[MSCCLPP_STEPS]; -// int flush; // For GDRCopy-based flush -// }; -// char pad4[MEM_ALIGN]; -// }; -// }; - -// enum helperThreadState {ThreadStart, ThreadStop}; - -// #define MSCCLPP_IPC_POOL_SIZE (2*MSCCLPP_MAX_LOCAL_RANKS*MSCCLPP_MAX_OPS) - -// struct mscclppGraphHelperResources { -// mscclppComm* comm; -// pthread_mutex_t threadLock; -// pthread_cond_t threadCond; -// enum helperThreadState threadState; -// void* ipcBases[MSCCLPP_IPC_POOL_SIZE]; -// int ipcTail; -// int ipcHead; -// }; - -// struct mscclppUserRedOp { -// int freeNext; // -1=allocated, otherwise index of next free entry in array -// mscclppDataType_t datatype; -// mscclppDevRedOpFull opFull; -// }; - -// struct mscclppNodeRanks { -// int localRanks; -// int* localRankToRank; -// }; - -// struct mscclppDestructor { -// struct mscclppDestructor* next; -// void* obj; -// mscclppResult_t(*fn)(struct mscclppDestructor* me); -// }; - -// struct mscclppCommCallback { -// struct mscclppCommCallback* next; -// mscclppResult_t(*fn)(struct mscclppComm* comm, struct mscclppCommCallback* cb); -// }; - -// struct mscclppChannel { -// struct mscclppChannelPeer* peers; -// struct mscclppDevChannelPeer* devPeers; -// struct mscclppRing ring; -// int* devRingUserRanks; -// struct mscclppTree tree; -// struct mscclppTree collnetChain; -// struct mscclppDirect collnetDirect; -// int id; // index of this channel -// uint32_t workFifoSent; // last used work index+1 -// uint64_t p2pOpCount; -// }; - -// struct mscclppWorkList { -// struct mscclppWorkList* next; -// struct mscclppWork work; -// }; - -// struct mscclppPointerList { -// struct mscclppPointerList* next; -// void *ptr; -// }; - -// struct mscclppKernelPlan { -// // A kernel plan is also a callback that reclaims itself. Hence this must -// // be the first member. -// struct mscclppCommCallback reclaimer; -// struct mscclppMemoryPool memPool_mscclppProxyOp; // memory to return to comm in cleanup - -// struct mscclppComm* comm; -// struct mscclppKernelPlan* next; - -// bool persistent; // aka captured in a graph -// bool kernelSpecialized; -// void *kernelFn; -// int channelUbound; // only channels c < channelUbound are present -// int channelCount; // number of channels present -// uint64_t channelMask; // which channels are present, channelCount == popcount(channelMask) -// bool hasProxyOps; // does any channel have a non-empty proxyOpQueue -// int threadPerBlock; -// // workHeap fields are null until uploadWorkFifo() or preparePersistentKernel() -// struct mscclppWork* workHead; - -// int collOpCount; // zero based for this plan - -// struct mscclppIntruQueue ipcMemQueue; - -// struct Channel { -// int nWork; -// union { -// int nWorkElem; // used for coll and reg coll -// int p2pTailElem[2]; // used for p2p, indexed by mscclppWorkElemP2pType-1 -// }; -// size_t collBytes; -// struct mscclppIntruQueue workQueue; -// struct mscclppIntruQueue proxyOpQueue; -// } channels[MAXCHANNELS]; -// }; - struct mscclppConn { mscclppTransport_t transport; int remoteRank; int buffSize; - mscclppTrigger *cpuTriggerFifo; - // fifoTail indicates where CPU needs to read the head of the fifo. only accessible by CPU - // No atomicity is required for fifoTail as only a single CPU thread accesses it. - int fifoTail; uint64_t *remoteProxyFlag; uint64_t *cpuProxyFlag; - void *cpuTriggerFifoGdrDesc; void *cpuProxyFlagGdrDesc; struct mscclppDevConn *devConn; struct mscclppIbContext *ibCtx; @@ -182,231 +40,22 @@ struct mscclppConn { }; struct mscclppComm { -// struct mscclppMemoryStack memPermanent, memScoped; -// // List of destructors to run when comm is destructed -// struct mscclppDestructor* destructorHead; - -// struct mscclppChannel channels[MAXCHANNELS]; -// struct mscclppPeerInfo* peerInfo; -// struct mscclppTopoSystem* topo; - struct mscclppConn conns[MAXCONNECTIONS]; int nConns; -// mscclppNet_t* mscclppNet; -// mscclppCollNet_t* mscclppCollNet; void* bootstrap; -// // Bitmasks for mscclppTransportP2pSetup -// uint64_t* connectSend; -// uint64_t* connectRecv; uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches. int rank; // my rank in the communicator int nRanks; // number of GPUs in communicator int cudaDev; // my cuda device index -// int compCap; // compute capability of the GPU -// int64_t busId; // my PCI bus ID in int format -// cpu_set_t cpuAffinity; // CPU affinity of the GPU - - // int node; - // int nNodes; - // int localRank; - // int localRanks; - // int maxLocalRanks; - // int* rankToNode; - // int* rankToLocalRank; - // int* localRankToRank; -// // localRanks and localRanktoRank for all nodes -// struct mscclppNodeRanks* nodeRanks; - -// bool checkPointers; -// bool dmaBufSupport; - -// // Counter for tracking CUDA launches (P2P and collectives included) -// uint64_t opCount; -// // Collective operation counter -// uint64_t collOpCount; - -// // Channels for collectives -// int nChannels; -// // Channels (per peer) for p2p -// int p2pnChannels; -// int p2pnChannelsPerPeer; -// int p2pChannels[MAXCHANNELS]; - -// // Should this comm allocate LL buffers for network P2P connections? -// bool allocP2pNetLLBuffers; - -// // Buffer sizes -// int buffSizes[MSCCLPP_NUM_PROTOCOLS]; -// int p2pChunkSize; - -// // Algorithm/Protocols thresholds -// ssize_t threadThresholds[MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; -// float latencies[MSCCLPP_NUM_FUNCTIONS][MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; -// float bandwidths[MSCCLPP_NUM_FUNCTIONS][MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; -// int maxThreads[MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS]; - -// /* This attribute can indicate the states of communicators and return code of -// * asynchronous MSCCLPP operations. */ -// mscclppResult_t asyncResult; // Flag to ask MSCCLPP kernels to abort volatile uint32_t *abortFlag; -// // Device side of the communicator (for cudaFree's) -// struct mscclppDevComm* devComm; // actually = &mscclppDevCommAndChannels::comm - -// // Operation pool. -// int workFifoDepth; // size of workFifoHeap[], power of 2 -// struct mscclppWork* workFifoHeap; -// struct mscclppWork* devWorkFifoHeap; -// void* workFifoHeapGdrHandle; - -// // Work completion notificaion -// uint32_t* workFifoDone/*[MAXCHANNELS]*/; // in cudaHost memory -// uint32_t workFifoSent; // Monotonic (mod 1<<32) index of next unused fifo slot. -// uint32_t workFifoAckdMin; // Monotonic index of least unprocessed fifo slot over all channels. - -// // Intra-process sync -// struct mscclppComm* intraComm0; // leader of intra-process comms (self possible) -// struct mscclppComm* intraNext; // next of intra-process comms, intraComm0 is head -// int intraRank; -// int intraRanks; -// uint32_t intraBarrierPhase; -// char intraPad1[64 - sizeof(uint64_t)]; -// uint64_t intraBarrierCounter; // only used if this is intraComm0 -// char intraPad2[64 - sizeof(uint64_t)]; -// uint64_t intraBarrierGate; // only used if this is intraComm0 - struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS]; - - // Last one is for P2P proxies. - struct mscclppProxyState proxyState[MSCCLPP_IB_MAX_DEVS + 1]; - -// // Whether this communicator uses collNet -// int collNetSupport; -// int intraHighestTransportType; - -// size_t channelSize; // User requested work size (bytes) for channel partitions - -// // Internal streams -// struct mscclppStrongStream deviceStream, hostStream; - -// // pools backed by comm->memPermanent -// struct mscclppMemoryPool memPool_mscclppProxyOp; -// struct mscclppMemoryPool memPool_mscclppKernelPlan; -// struct mscclppMemoryPool memPool_mscclppPointerList; -// // Next comm in this thread's active mscclppGroup[Start|End](). Holds "0x1" when -// // this comm is not yet in a group. -// struct mscclppComm* groupNext; -// // Subset of those in groupNext list. Holds 0x1 if not needing preconnect. -// struct mscclppComm* preconnectNext; -// int persistentRefs; // number of persistent plan-lists capturing this comm -// struct mscclppTasks tasks; - -// // user-created reduction ops -// int userRedOpCapacity, userRedOpFreeHead; -// mscclppUserRedOp *userRedOps; - -// // Queue of things for the main thread to do -// struct mscclppIntruQueueMpsc callbackQueue; - -// // List of kernel plans built form tasks. -// struct mscclppIntruQueue planQueue; -// // First of the unlaunched kernels in `planQueue` -// struct mscclppKernelPlan* unlaunchedPlansHead; - -// // communicator mode -// int blocking; -// // initState is to more conveniently reclaim resources when errors happen. -// mscclppResult_t initState; -// // flag to indicate if mscclppCommFinalize() is called -// bool finalizeCalled; -// // shared structures for finalization -// int finalizeRankCnt; + struct mscclppProxyState *proxyState[MSCCLPP_PROXY_MAX_NUM]; }; -// enum mscclppLaunchMode { -// mscclppLaunchModeInvalid=0, -// mscclppLaunchModeParallel, -// mscclppLaunchModeGroup -// }; -// extern enum mscclppLaunchMode mscclppParamLaunchMode; - -// void mscclppCommPushFree(struct mscclppComm* comm, void* buf); -// void mscclppCommPushCudaFree(struct mscclppComm* comm, void* buf); -// void mscclppCommPushCudaHostFree(struct mscclppComm* comm, void* buf); -// void mscclppCommPushCudaGdrFree(struct mscclppComm* comm, void* handle); - -// inline mscclppResult_t mscclppCommPollCallbacks(struct mscclppComm* comm, bool waitSome) { -// mscclppResult_t result = mscclppSuccess; -// struct mscclppCommCallback* cb = mscclppIntruQueueMpscDequeueAll(&comm->callbackQueue, waitSome); -// while (cb != nullptr) { -// struct mscclppCommCallback* next = cb->next; -// mscclppResult_t res1 = cb->fn(comm, cb); // may reclaim memory of cb -// if (res1 != mscclppSuccess) result = res1; -// cb = next; -// } -// MSCCLPPCHECK(result); -// return mscclppSuccess; -// } - -// inline void mscclppCommIntraBarrierIn(struct mscclppComm* comm, uint32_t x) { -// int phase = comm->intraBarrierPhase; -// if (comm->intraRanks == 1) { -// // Release everyone (just me). -// comm->intraBarrierGate = (uint64_t(x)<<32) | (phase^1); -// } else { -// struct mscclppComm* comm0 = comm->intraComm0; -// uint64_t count = __atomic_add_fetch(&comm0->intraBarrierCounter, (uint64_t(x)<<32) + 1, __ATOMIC_RELEASE); -// if (uint32_t(count) == uint32_t(comm->intraRanks)) { -// // Reset. -// __atomic_store_n(&comm0->intraBarrierCounter, 0, __ATOMIC_RELAXED); -// // Release everyone. -// __atomic_store_n(&comm0->intraBarrierGate, (count>>32<<32) | (phase^1), __ATOMIC_RELEASE); -// } -// } -// } - -// // returns sum of x values contributed to mscclppCommIntraBarrierIn(comm, x) -// inline uint32_t mscclppCommIntraBarrierOut(struct mscclppComm* comm) { -// struct mscclppComm* comm0 = comm->intraComm0; -// comm->intraBarrierPhase ^= 1; -// uint32_t phase = comm->intraBarrierPhase; -// uint64_t gate = __atomic_load_n(&comm0->intraBarrierGate, __ATOMIC_RELAXED); -// if ((gate & 1) != phase) { -// uint64_t t0 = clockNano(); -// do { -// // Spin vigorously for first 5us. -// if (clockNano()-t0 >= 5*1000) sched_yield(); -// gate = __atomic_load_n(&comm0->intraBarrierGate, __ATOMIC_RELAXED); -// } while ((gate & 1) != phase); -// } -// if (comm->intraRanks != 1) __atomic_thread_fence(__ATOMIC_ACQUIRE); -// return gate>>32; -// } - -// // Scrambles the bits of non-builtin values of mscclppRedOp_t according to the -// // communicator memory address. Used to catch bugs so that integer handles -// // associated with this communicator won't collide with handles of other -// // communicatrs. This function is its own inverse. -// static inline mscclppRedOp_t mscclppUserRedOpMangle(mscclppComm *comm, mscclppRedOp_t op) { -// // Preserve the built-in values. -// if(int(op) < int(mscclppNumOps)) -// return op; -// uint64_t h = reinterpret_cast(comm); -// h ^= h >> 32; -// h *= 0x9e3779b97f4a7c13u; // Knuth's 64-bit magical hash constant -// h >>= 32; // h is now an excellent 32-bit hash of the comm pointer -// h &= int(mscclppMaxRedOp); // mscclppMaxRedOp is a power of 2 minus 1 -// int op1 = int(h) ^ int(op); -// // Since builtin values are preserved, we also have to preserve their preimage. -// return op1 < int(mscclppNumOps) ? op : mscclppRedOp_t(op1); -// } - -// mscclppResult_t mscclppCommEnsureReady(mscclppComm_t comm); -// mscclppResult_t mscclppCommSetAsyncError(mscclppComm_t comm, mscclppResult_t nextState); - #endif diff --git a/src/include/debug.h b/src/include/debug.h index 6c06748c..326ff42e 100644 --- a/src/include/debug.h +++ b/src/include/debug.h @@ -7,7 +7,7 @@ #ifndef MSCCLPP_DEBUG_H_ #define MSCCLPP_DEBUG_H_ -#include "mscclpp_net.h" +#include "mscclpp.h" #include #include #include @@ -19,6 +19,9 @@ // Conform to pthread and NVTX standard #define MSCCLPP_THREAD_NAMELEN 16 +typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel; +typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys; + extern int mscclppDebugLevel; extern uint64_t mscclppDebugMask; extern pthread_mutex_t mscclppDebugLock; diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index a47593eb..85a6df24 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -22,18 +22,23 @@ typedef enum : uint64_t { mscclppData = 0x1, mscclppFlag = 0x2, mscclppSync = 0x4} mscclppTriggerType_t; -#define MSCCLPP_SIZE_BITS 30 -#define MSCCLPP_OFFSET_BITS 31 +#define MSCCLPP_BITS_SIZE 32 +#define MSCCLPP_BITS_OFFSET 32 +#define MSCCLPP_BITS_TYPE 3 +#define MSCCLPP_BITS_CONNID 10 -#define TRIGGER_VALUE(__TYPE__,__OFFSET__,__SIZE__) (((((__TYPE__) << MSCCLPP_OFFSET_BITS) + (__OFFSET__)) << MSCCLPP_SIZE_BITS) + __SIZE__ ) - -// the summation of number of bits must be 64 or less -union alignas(8) mscclppTrigger { - uint64_t value; +// the summation of number of bits must be 128 or less +union alignas(16) mscclppTrigger { + uint64_t value[2]; struct { - uint64_t dataSize : MSCCLPP_SIZE_BITS; - uint64_t dataOffset : MSCCLPP_OFFSET_BITS; - uint64_t type : 3; + // first 64 bits: value[0] + uint64_t dataSize : MSCCLPP_BITS_SIZE; + uint64_t dataOffset : MSCCLPP_BITS_OFFSET; + uint64_t : (64-MSCCLPP_BITS_SIZE-MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment + // second 64 bits: value[1] + uint64_t connId : MSCCLPP_BITS_CONNID; + uint64_t type : MSCCLPP_BITS_TYPE; + uint64_t : (64-MSCCLPP_BITS_CONNID-MSCCLPP_BITS_TYPE); // ensure 64-bit alignment } fields; }; @@ -64,6 +69,26 @@ union alignas(8) mscclppTrigger { ***************************************/ struct mscclppDevConn { +#ifdef __CUDACC__ + __forceinline__ __device__ mscclppTrigger *getTrigger() { + unsigned int curFifoHead = atomicInc(this->triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1); + return &this->trigger[curFifoHead]; + } + + __forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) { + asm volatile( + "st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value), + "l"((dataOffset << (MSCCLPP_BITS_SIZE)) + + (dataSize)), + "l"((type << MSCCLPP_BITS_CONNID) + this->connId)); + } + + __forceinline__ __device__ void waitTrigger(mscclppTrigger *trig) { + // Check only the first 64 bits + while (*(volatile uint64_t *)trig->value != 0) {} + } +#endif // __CUDACC__ + int tag; void* localBuff; @@ -80,9 +105,10 @@ struct mscclppDevConn { // // localBuff[srcOffset..srcOffset+size-1] <- remoteBuff[dstOffset..dstOffset+size-1] // virtual void pullRemoteBuff(size_t srcOffset, size_t dstOffset, size_t size); - int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic + unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic mscclppTrigger* trigger; uint64_t* proxyFlag; + int connId; }; typedef struct mscclppComm* mscclppComm_t; diff --git a/src/include/mscclpp_net.h b/src/include/mscclpp_net.h deleted file mode 100644 index 7208ac4b..00000000 --- a/src/include/mscclpp_net.h +++ /dev/null @@ -1,313 +0,0 @@ -/************************************************************************* - * Copyright (c) 2017-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef MSCCLPP_NET_H_ -#define MSCCLPP_NET_H_ - -#include "mscclpp.h" -#include - -#define MSCCLPP_NET_HANDLE_MAXSIZE 128 - -#define MSCCLPP_PTR_HOST 0x1 -#define MSCCLPP_PTR_CUDA 0x2 -#define MSCCLPP_PTR_DMABUF 0x4 - -// Maximum number of requests per comm object -#define MSCCLPP_NET_MAX_REQUESTS 8 - -typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel; -typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys; - -typedef void (*mscclppDebugLogger_t)(mscclppDebugLogLevel level, unsigned long flags, const char *file, int line, const char *fmt, ...); - -typedef struct { - char* name; // Used mostly for logging. - char* pciPath; // Path to the PCI device in /sys. - uint64_t guid; // Unique identifier for the NIC chip. Important for - // cards with multiple PCI functions (Physical or virtual). - int ptrSupport; // [MSCCLPP_PTR_HOST|MSCCLPP_PTR_CUDA|MSCCLPP_PTR_DMABUF] - int speed; // Port speed in Mbps. - int port; // Port number. - float latency; // Network latency - int maxComms; // Maximum number of comms we can create - int maxRecvs; // Maximum number of grouped receives. -}mscclppNetProperties_v6_t; - -typedef mscclppNetProperties_v6_t mscclppNetProperties_t; - -typedef struct { - // Name of the network (mainly for logs) - const char* name; - // Initialize the network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create a connection. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Connect to a handle and return a sending comm object for that peer. - // This call must not block for the connection to be established, and instead - // should return successfully with sendComm == NULL with the expectation that - // it will be called again until sendComm != NULL. - mscclppResult_t (*connect)(int dev, void* handle, void** sendComm); - // Finalize connection establishment after remote peer has called connect. - // This call must not block for the connection to be established, and instead - // should return successfully with recvComm == NULL with the expectation that - // it will be called again until recvComm != NULL. - mscclppResult_t (*accept)(void* listenComm, void** recvComm); - // Register/Deregister memory. Comm can be either a sendComm or a recvComm. - // Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle); - /* DMA-BUF support */ - mscclppResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle); - mscclppResult_t (*deregMr)(void* comm, void* mhandle); - // Asynchronous send to a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request); - // Asynchronous recv from a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* sizes); - // Close and free send/recv comm objects - mscclppResult_t (*closeSend)(void* sendComm); - mscclppResult_t (*closeRecv)(void* recvComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppNet_v6_t; - -typedef mscclppNet_v6_t mscclppNet_t; - -#define MSCCLPP_PLUGIN_SYMBOL mscclppNetPlugin_v6 - -typedef struct { - // Name of the collective network (mainly for logs) - const char* name; - // Initialize the collective network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters capable of doing collective operations. - // If ndev returns 0, all other functions might be set to NULL. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create connections. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Create a group for collective operations. handles have been created - // using listen() above. rank indicates caller's rank in the collective network. - mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm); - // Returns whether a reduction operation on a data type is supported. - // 1 for supported, 0 otherwise. - mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported); - // Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle); - /* DMA-BUF support */ - mscclppResult_t (*regMrDmaBuf)(void* collComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle); - mscclppResult_t (*deregMr)(void* collComm, void* mhandle); - // Performs an asynchronous allreduce operation on the collective group. - // May return request == NULL if the call cannot be performed (or would block). - mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count, - mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free collective comm objects - mscclppResult_t (*closeColl)(void* collComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppCollNet_v6_t; - -typedef mscclppCollNet_v6_t mscclppCollNet_t; - -#define MSCCLPP_COLLNET_PLUGIN_SYMBOL mscclppCollNetPlugin_v6 - -// v5 struct for backwards compatibility -typedef struct { - // Name of the network (mainly for logs) - const char* name; - // Initialize the network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create a connection. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Connect to a handle and return a sending comm object for that peer. - // This call must not block for the connection to be established, and instead - // should return successfully with sendComm == NULL with the expectation that - // it will be called again until sendComm != NULL. - mscclppResult_t (*connect)(int dev, void* handle, void** sendComm); - // Finalize connection establishment after remote peer has called connect. - // This call must not block for the connection to be established, and instead - // should return successfully with recvComm == NULL with the expectation that - // it will be called again until recvComm != NULL. - mscclppResult_t (*accept)(void* listenComm, void** recvComm); - // Register/Deregister memory. Comm can be either a sendComm or a recvComm. - // Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* comm, void* mhandle); - // Asynchronous send to a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request); - // Asynchronous recv from a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* sizes); - // Close and free send/recv comm objects - mscclppResult_t (*closeSend)(void* sendComm); - mscclppResult_t (*closeRecv)(void* recvComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppNet_v5_t; - -// v5 struct for backwards compatibility -typedef struct { - // Name of the collective network (mainly for logs) - const char* name; - // Initialize the collective network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters capable of doing collective operations. - // If ndev returns 0, all other functions might be set to NULL. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create connections. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Create a group for collective operations. handles have been created - // using listen() above. rank indicates caller's rank in the collective network. - mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm); - // Returns whether a reduction operation on a data type is supported. - // 1 for supported, 0 otherwise. - mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported); - // Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* collComm, void* mhandle); - // Performs an asynchronous allreduce operation on the collective group. - // May return request == NULL if the call cannot be performed (or would block). - mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count, - mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free collective comm objects - mscclppResult_t (*closeColl)(void* collComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppCollNet_v5_t; - -// v4 struct for backwards compatibility -typedef struct { - char* name; // Used mostly for logging. - char* pciPath; // Path to the PCI device in /sys. - uint64_t guid; // Unique identifier for the NIC chip. Important for - // cards with multiple PCI functions (Physical or virtual). - int ptrSupport; // MSCCLPP_PTR_HOST or MSCCLPP_PTR_HOST|MSCCLPP_PTR_CUDA - int speed; // Port speed in Mbps. - int port; // Port number. - int maxComms; // Maximum number of comms we can create -} mscclppNetProperties_v4_t; - -// v4 struct for backwards compatibility -typedef struct { - // Name of the network (mainly for logs) - const char* name; - // Initialize the network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v4_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create a connection. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Connect to a handle and return a sending comm object for that peer. - mscclppResult_t (*connect)(int dev, void* handle, void** sendComm); - // Finalize connection establishment after remote peer has called connectHandle - mscclppResult_t (*accept)(void* listenComm, void** recvComm); - // Register/Deregister memory. Comm can be either a sendComm or a recvComm. - // Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* comm, void* mhandle); - // Asynchronous send to a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*isend)(void* sendComm, void* data, int size, void* mhandle, void** request); - // Asynchronous recv from a peer. - // May return request == NULL if the call cannot be performed (or would block) - mscclppResult_t (*irecv)(void* recvComm, void* data, int size, void* mhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* recvComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free send/recv comm objects - mscclppResult_t (*closeSend)(void* sendComm); - mscclppResult_t (*closeRecv)(void* recvComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppNet_v4_t; - -// v4 struct for backwards compatibility -typedef struct { - // Name of the collective network (mainly for logs) - const char* name; - // Initialize the collective network. - mscclppResult_t (*init)(mscclppDebugLogger_t logFunction); - // Return the number of adapters capable of doing collective operations. - // If ndev returns 0, all other functions might be set to NULL. - mscclppResult_t (*devices)(int* ndev); - // Get various device properties. - mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v4_t* props); - // Create a receiving object and provide a handle to connect to it. The - // handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged - // between ranks to create connections. - mscclppResult_t (*listen)(int dev, void* handle, void** listenComm); - // Create a group for collective operations. handles have been created - // using listen() above. rank indicates caller's rank in the collective network. - mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm); - // Returns whether a reduction operation on a data type is supported. - // 1 for supported, 0 otherwise. - mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported); - // Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA. - mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle); - mscclppResult_t (*deregMr)(void* collComm, void* mhandle); - // Performs an asynchronous allreduce operation on the collective group. - // May return request == NULL if the call cannot be performed (or would block). - mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count, - mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request); - // Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is - // visible to the GPU - mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request); - // Test whether a request is complete. If size is not NULL, it returns the - // number of bytes sent/received. - mscclppResult_t (*test)(void* request, int* done, int* size); - // Close and free collective comm objects - mscclppResult_t (*closeColl)(void* collComm); - mscclppResult_t (*closeListen)(void* listenComm); -} mscclppCollNet_v4_t; - -#endif // end include guard diff --git a/src/include/net.h b/src/include/net.h deleted file mode 100644 index 82b8a423..00000000 --- a/src/include/net.h +++ /dev/null @@ -1,46 +0,0 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - -#ifndef MSCCLPP_INT_NET_H_ -#define MSCCLPP_INT_NET_H_ - -#include "mscclpp.h" -#include "mscclpp_net.h" -// #include "comm.h" -#include "checks.h" - -typedef char mscclppNetHandle_t[MSCCLPP_NET_HANDLE_MAXSIZE]; - -mscclppResult_t mscclppNetPluginInit(); -// mscclppResult_t mscclppNetInit(struct mscclppComm* comm); -// int mscclppNetVersion(struct mscclppComm* comm); - -// // Translation to external API -// static const char* mscclppNetName(struct mscclppComm* comm) { return comm->mscclppNet->name; } -// static mscclppResult_t mscclppNetDevices(struct mscclppComm* comm, int* ndev) { MSCCLPPCHECK(comm->mscclppNet->devices(ndev)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetGetProperties(struct mscclppComm* comm, int dev, mscclppNetProperties_t* props) { MSCCLPPCHECK(comm->mscclppNet->getProperties(dev, props)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetListen(struct mscclppComm* comm, int dev, void* handle, void** listenComm) { MSCCLPPCHECK(comm->mscclppNet->listen(dev, handle, listenComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetConnect(struct mscclppComm* comm, int dev, void* handle, void** sendComm) { MSCCLPPCHECK(comm->mscclppNet->connect(dev, handle, sendComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetAccept(struct mscclppComm* comm, void* listenComm, void** recvComm) { MSCCLPPCHECK(comm->mscclppNet->accept(listenComm, recvComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetRegMr(struct mscclppComm* comm, void* netComm, void* data, int size, int type, void** mhandle) { MSCCLPPCHECK(comm->mscclppNet->regMr(netComm, data, size, type, mhandle)); return mscclppSuccess; } -// /* DMA-BUF support */ -// static mscclppResult_t mscclppNetRegMrDmaBuf(struct mscclppComm* comm, void* netComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle) { MSCCLPPCHECK(comm->mscclppNet->regMrDmaBuf(netComm, data, size, type, offset, fd, mhandle)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetDeregMr(struct mscclppComm* comm, void* netComm, void* mhandle) { MSCCLPPCHECK(comm->mscclppNet->deregMr(netComm, mhandle)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetIsend(struct mscclppComm* comm, void* sendComm, void* data, int size, int tag, void* mhandle, void** request) { MSCCLPPCHECK(comm->mscclppNet->isend(sendComm, data, size, tag, mhandle, request)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetIrecv(struct mscclppComm* comm, void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request) { MSCCLPPCHECK(comm->mscclppNet->irecv(recvComm, n, data, sizes, tags, mhandles, request)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetIflush(struct mscclppComm* comm, void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) { MSCCLPPCHECK(comm->mscclppNet->iflush(recvComm, n, data, sizes, mhandles, request)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetTest(struct mscclppComm* comm, void* request, int* done, int* sizes) { MSCCLPPCHECK(comm->mscclppNet->test(request, done, sizes)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetCloseSend(struct mscclppComm* comm, void* sendComm) { MSCCLPPCHECK(comm->mscclppNet->closeSend(sendComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetCloseRecv(struct mscclppComm* comm, void* recvComm) { MSCCLPPCHECK(comm->mscclppNet->closeRecv(recvComm)); return mscclppSuccess; } -// static mscclppResult_t mscclppNetCloseListen(struct mscclppComm* comm, void* listenComm) { MSCCLPPCHECK(comm->mscclppNet->closeListen(listenComm)); return mscclppSuccess; } - -// // Test whether the current GPU support GPU Direct RDMA. -// mscclppResult_t mscclppGpuGdrSupport(struct mscclppComm* comm, int* gdrSupport); - -// extern mscclppNet_t mscclppNetIb; -// extern mscclppNet_t mscclppNetSocket; - -#endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 225e9fae..61f9ea24 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -5,6 +5,8 @@ #include "comm.h" #include +#define MSCCLPP_PROXY_MAX_NUM (MSCCLPP_IB_MAX_DEVS + 1) // One is for a P2P proxy. + typedef enum { MSCCLPP_PROXY_RUN_STATE_IDLE = 0, MSCCLPP_PROXY_RUN_STATE_RUNNING, @@ -12,8 +14,16 @@ typedef enum { } mscclppProxyRunState_t; struct mscclppProxyState { - pthread_t *threads; - mscclppProxyRunState_t *runs; + pthread_t thread; + mscclppProxyRunState_t run; + mscclppTrigger *cpuTriggerFifo; + mscclppTrigger *gpuTriggerFifo; + // cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo. + unsigned int cpuTriggerFifoTail; + unsigned int *gpuTriggerFifoHead; + void *cpuTriggerFifoGdrDesc; + // NULL for the P2P proxy. + struct mscclppIbContext *ibContext; }; mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); diff --git a/src/init.cc b/src/init.cc index 2cb063a6..5cb2cdcd 100644 --- a/src/init.cc +++ b/src/init.cc @@ -171,24 +171,16 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i WARN("devConnOut is the output of this function and needs to be allocated by the user"); return mscclppInvalidUsage; } - struct mscclppConn *conn = &comm->conns[comm->nConns++]; + struct mscclppConn *conn = &comm->conns[comm->nConns]; conn->transport = transportType; conn->remoteRank = remoteRank; conn->buffSize = buffSize; - conn->devConn = devConnOut; - conn->devConn->localBuff = localBuff; - conn->devConn->localFlag = localFlag; - conn->devConn->tag = tag; - - // TODO(saemal): these two should be shared for all P2P-DMA connections made from each GPU. Same for each IB driver. - MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuTriggerFifo, &conn->devConn->trigger, MSCCLPP_PROXY_FIFO_SIZE, &conn->cpuTriggerFifoGdrDesc)); - MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->triggerFifoHead, 1)); conn->ibCtx = NULL; conn->ibQp = NULL; + int ibDevIdx = -1; if (ibDev != NULL) { // Check if an IB context exists - int ibDevIdx = -1; int firstNullIdx = -1; for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { if (comm->ibContext[i] == NULL) { @@ -210,6 +202,39 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i } conn->ibCtx = comm->ibContext[ibDevIdx]; } + // Find a proxy state that uses the given IB device + struct mscclppProxyState *proxyState = NULL; + for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { + if (comm->proxyState[i] == NULL) { + // Cannot find, create a new one + MSCCLPPCHECK(mscclppCalloc(&proxyState, 1)); + MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo, + MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc)); + MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1)); + proxyState->ibContext = conn->ibCtx; + comm->proxyState[i] = proxyState; + break; + } + if (comm->proxyState[i]->ibContext == conn->ibCtx) { + // `conn->ibCtx == NULL` indicatess the P2P proxy. + proxyState = comm->proxyState[i]; + break; + } + } + if (proxyState == NULL) { + // Cannot reach + WARN("Unexpected error"); + return mscclppInternalError; + } + conn->devConn = devConnOut; + conn->devConn->localBuff = localBuff; + conn->devConn->localFlag = localFlag; + conn->devConn->tag = tag; + conn->devConn->connId = comm->nConns; + conn->devConn->trigger = proxyState->gpuTriggerFifo; + conn->devConn->triggerFifoHead = proxyState->gpuTriggerFifoHead; + + comm->nConns++; return mscclppSuccess; } diff --git a/src/proxy.cc b/src/proxy.cc index aaf6d6c5..8e4bdfed 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -5,12 +5,14 @@ #include "ib.h" #include "checks.h" +#include #include #include #include #include #include +#define MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD 100 // TODO(chhwang): verify if MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0 is useful, otherwise delete this option. #define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 1 @@ -33,25 +35,21 @@ static void NumaBind(int node) struct proxyArgs { struct mscclppComm* comm; - struct mscclppIbContext* ibCtx; + struct mscclppProxyState *proxyState; cudaStream_t stream; - volatile mscclppProxyRunState_t* run; - int connIdx; }; -// TODO(saemal) We need to add a fifo for each DMA engine +static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) { + __m128i xmm0 = _mm_load_si128((__m128i *)src); + _mm_store_si128((__m128i *)dst, xmm0); +} + void* mscclppProxyServiceP2P(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; - volatile mscclppProxyRunState_t *run = args->run; - std::vector conns; - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn *conn = &comm->conns[i]; - // TODO(saemal): we need to create another transport type which doesn't need a proxy. - if (conn->transport == mscclppTransportP2P) { - conns.push_back(conn); - } - } + volatile mscclppProxyRunState_t *run = &args->proxyState->run; + mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; + unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; cudaStream_t stream = args->stream; free(_args); @@ -65,33 +63,37 @@ void* mscclppProxyServiceP2P(void* _args) { PROXYCUDACHECK(cudaSetDevice(comm->cudaDev)); PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { - for (struct mscclppConn *conn : conns) { - // Poll to see if we are ready to send anything - trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - if (trigger.value == 0) continue; - - // Iterate over what send is needed - if (trigger.fields.type & mscclppData){ - void *srcBuff = (void *)((char *)conn->devConn->localBuff + trigger.fields.dataOffset); - void *dstBuff = (void *)((char *)conn->devConn->remoteBuff + trigger.fields.dataOffset); - PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, stream)); - } - if (trigger.fields.type & mscclppFlag) { - PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->localFlag, sizeof(uint64_t), cudaMemcpyDeviceToDevice, stream)); - } - // Wait for completion - if (trigger.fields.type & mscclppSync){ - PROXYCUDACHECK(cudaStreamSynchronize(stream)); - } - - // Send completion - volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - *tmp = 0; - conn->fifoTail++; - if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE) - conn->fifoTail = 0; + int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; + // fifoTail indicates where CPU needs to read the head of the fifo. + for (;;) { + if (runCheckCounter-- == 0) { + runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; + // Check if we need to exit + if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; } + // Poll to see if we are ready to send anything + readTrigger(&trigger, &fifo[*fifoTail]); + if (trigger.value[0] == 0) continue; + + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; + + // Iterate over what send is needed + if (trigger.fields.type & mscclppData){ + void *srcBuff = (void *)((char *)conn->devConn->localBuff + trigger.fields.dataOffset); + void *dstBuff = (void *)((char *)conn->devConn->remoteBuff + trigger.fields.dataOffset); + PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, stream)); + } + if (trigger.fields.type & mscclppFlag) { + PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->localFlag, sizeof(uint64_t), cudaMemcpyDeviceToDevice, stream)); + } + // Wait for completion + if (trigger.fields.type & mscclppSync){ + PROXYCUDACHECK(cudaStreamSynchronize(stream)); + } + + // Send completion: reset only the high 64 bits + *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; + *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; } // Need a sync in case previous copies are not completed @@ -107,15 +109,10 @@ void* mscclppProxyServiceP2P(void* _args) { void* mscclppProxyServiceIb(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; struct mscclppComm *comm = args->comm; - struct mscclppIbContext *ibCtx = args->ibCtx; - volatile mscclppProxyRunState_t *run = args->run; - std::vector conns; - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn *conn = &comm->conns[i]; - if (conn->transport == mscclppTransportIB) { - conns.push_back(conn); - } - } + struct mscclppIbContext *ibCtx = args->proxyState->ibContext; + volatile mscclppProxyRunState_t *run = &args->proxyState->run; + mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo; + unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail; free(_args); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) @@ -123,8 +120,16 @@ void* mscclppProxyServiceIb(void* _args) { SEND_STATE_INIT, SEND_STATE_INPROGRESS }; - int sendState = SEND_STATE_INIT; - uint64_t currentProxyFlagVlaue = *conn->cpuProxyFlag; + int *sendState; + uint64_t *currentProxyFlagValue; + if (mscclppCalloc((void **)&sendState, comm->nConns) != mscclppSuccess) { + WARN("mscclppCalloc failed: errno %d", errno); + return NULL; + } + if (mscclppCalloc((void **)¤tProxyFlagValue, comm->nConns) != mscclppSuccess) { + WARN("mscclppCalloc failed: errno %d", errno); + return NULL; + } #endif int rank = comm->rank; @@ -134,7 +139,10 @@ void* mscclppProxyServiceIb(void* _args) { NumaBind(ibCtx->numaNode); #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) - for (struct mscclppConn *conn : conns) { + for (int i = 0; i < (int)comm->nConns; ++i) { + sendState[i] = SEND_STATE_INIT; + struct mscclppConn *conn = &comm->conns[i]; + currentProxyFlagValue[i] = *conn->cpuProxyFlag; // Post recv if (conn->ibQp->postRecv(0) != 0) { WARN("postRecv failed: errno %d", errno); @@ -142,28 +150,94 @@ void* mscclppProxyServiceIb(void* _args) { } #endif - while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) { - for (struct mscclppConn *conn : conns) { + int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; + for (;;) { + if (runCheckCounter-- == 0) { + runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD; + // Check if we need to exit + if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break; + } + // Poll to see if we are ready to send anything + readTrigger(&trigger, &fifo[*fifoTail]); + #if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0) - // Try send - if (sendState == SEND_STATE_INIT) { - trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - if (trigger.value != 0) { - // Do send - conn->ibQp->stageSendWithImm(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, - /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/true, /*immData=*/0); - if (conn->ibQp->postSend() != 0) { - WARN("postSend failed: errno %d", errno); + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; + // Try send + if (sendState[trigger.fields.connId] == SEND_STATE_INIT) { + if (trigger.value[0] != 0) { + // Do send + conn->ibQp->stageSendWithImm(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, + /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/true, /*immData=*/0); + int ret; + if ((ret = conn->ibQp->postSend()) != 0) { + // Return value is errno. + WARN("postSend failed: errno %d", ret); + } + sendState[trigger.fields.connId] = SEND_STATE_INPROGRESS; + } + } + + // Poll completions + wcNum = conn->ibQp->pollCq(); + if (wcNum < 0) { + WARN("rank %d pollCq failed: errno %d", rank, errno); + } else { + for (int i = 0; i < wcNum; ++i) { + struct ibv_wc *wc = &conn->ibQp->wcs[i]; + if (wc->status != IBV_WC_SUCCESS) { + WARN("rank %d wc status %d", rank, wc->status); + continue; + } + if (wc->qp_num != conn->ibQp->qp->qp_num) { + WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num); + continue; + } + if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { + // TODO(chhwang): cpu flush + *((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagValue[trigger.fields.connId]; + // recv completion + if (conn->ibQp->postRecv(wc->wr_id) != 0) { + WARN("postRecv failed: errno %d", errno); } - sendState = SEND_STATE_INPROGRESS; + // WARN("rank %d recv completion", rank); + } else if (wc->opcode == IBV_WC_RDMA_WRITE) { + // send completion + *(volatile uint64_t *)(&fifo[fifoTail]) = 0; + fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; + sendState[trigger.fields.connId] = SEND_STATE_INIT; + // WARN("rank %d send completion", rank); } } + } +#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1) + if (trigger.value[0] == 0) continue; - // Poll completions - wcNum = conn->ibQp->pollCq(); - if (wcNum < 0) { - WARN("rank %d pollCq failed: errno %d", rank, errno); - } else { + struct mscclppConn *conn = &comm->conns[trigger.fields.connId]; + + if (trigger.fields.type & mscclppData) { + conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, + /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/false); + } + if (trigger.fields.type & mscclppFlag) { + // My local flag is copied to the peer's proxy flag + conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t), + /*wrId=*/0, /*offset=*/0, /*signaled=*/true); + } + int ret; + if ((ret = conn->ibQp->postSend()) != 0) { + // Return value is errno. + WARN("postSend failed: errno %d", ret); + } + + // Wait for completion + if (trigger.fields.type & mscclppSync) { + bool waiting = true; + while (waiting) { + wcNum = conn->ibQp->pollCq(); + if (wcNum < 0) { + WARN("rank %d pollCq failed: errno %d", rank, errno); + continue; + } for (int i = 0; i < wcNum; ++i) { struct ibv_wc *wc = &conn->ibQp->wcs[i]; if (wc->status != IBV_WC_SUCCESS) { @@ -174,80 +248,19 @@ void* mscclppProxyServiceIb(void* _args) { WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num); continue; } - if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { - // TODO(chhwang): cpu flush - *((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagVlaue; - // recv completion - if (conn->ibQp->postRecv(wc->wr_id) != 0) { - WARN("postRecv failed: errno %d", errno); - } - // WARN("rank %d recv completion", rank); - } else if (wc->opcode == IBV_WC_RDMA_WRITE) { + if (wc->opcode == IBV_WC_RDMA_WRITE) { // send completion - volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - *tmp = 0; - conn->fifoTail++; - if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE) - conn->fifoTail = 0; - sendState = SEND_STATE_INIT; - // WARN("rank %d send completion", rank); + waiting = false; + break; } } } -#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1) - // Poll to see if we are ready to send anything - trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - if (trigger.value == 0) continue; - - if (trigger.fields.type & mscclppData) { - conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize, - /*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/false); - } - if (trigger.fields.type & mscclppFlag) { - // My local flag is copied to the peer's proxy flag - conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t), - /*wrId=*/0, /*offset=*/0, /*signaled=*/true); - } - if (conn->ibQp->postSend() != 0) { - WARN("postSend failed: errno %d", errno); - } - - // Wait for completion - if (trigger.fields.type & mscclppSync) { - bool waiting = true; - while (waiting) { - wcNum = conn->ibQp->pollCq(); - if (wcNum < 0) { - WARN("rank %d pollCq failed: errno %d", rank, errno); - continue; - } - for (int i = 0; i < wcNum; ++i) { - struct ibv_wc *wc = &conn->ibQp->wcs[i]; - if (wc->status != IBV_WC_SUCCESS) { - WARN("rank %d wc status %d", rank, wc->status); - continue; - } - if (wc->qp_num != conn->ibQp->qp->qp_num) { - WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num); - continue; - } - if (wc->opcode == IBV_WC_RDMA_WRITE) { - // send completion - waiting = false; - break; - } - } - } - } - - // Send completion - volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]); - *tmp = 0; - conn->fifoTail++; - if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE) - conn->fifoTail = 0; -#endif } + + // Send completion: reset only the high 64 bits + *(volatile uint64_t *)(&fifo[*fifoTail]) = 0; + *fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE; +#endif } *run = MSCCLPP_PROXY_RUN_STATE_IDLE; // WARN("Proxy exits: rank %d", rank); @@ -256,9 +269,8 @@ void* mscclppProxyServiceIb(void* _args) { void* mscclppProxyService(void* _args) { struct proxyArgs *args = (struct proxyArgs *)_args; - struct mscclppIbContext *ibCtx = args->ibCtx; void *ret; - if (ibCtx == NULL) { + if (args->proxyState->ibContext == NULL) { ret = mscclppProxyServiceP2P(_args); } else { ret = mscclppProxyServiceIb(_args); @@ -266,69 +278,43 @@ void* mscclppProxyService(void* _args) { return ret; } -// mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses) { -// comm->proxyState.listenSock = sock; -// comm->proxyState.peerAddresses = peerAddresses; -// return mscclppSuccess; -// } - mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { - if (comm->ibContext[i] == NULL) continue; - if (comm->proxyState[i].threads == NULL) { - MSCCLPPCHECK(mscclppCalloc(&comm->proxyState[i].threads, 1)); - } - if (comm->proxyState[i].runs == NULL) { - MSCCLPPCHECK(mscclppCalloc(&comm->proxyState[i].runs, 1)); - } - // Create IB proxy threads + for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { + struct mscclppProxyState *proxyState = comm->proxyState[i]; + if (proxyState == NULL) break; + bool is_p2p = (proxyState->ibContext == NULL); + struct proxyArgs *args; MSCCLPPCHECK(mscclppCalloc(&args, 1)); args->comm = comm; - args->ibCtx = comm->ibContext[i]; - args->run = comm->proxyState[i].runs; - *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; - pthread_create(comm->proxyState[i].threads, NULL, mscclppProxyService, args); - mscclppSetThreadName(comm->proxyState[i].threads[0], "MSCCLPP Service IB - %02d", i); + args->proxyState = proxyState; + if (is_p2p) { + CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); + } + proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; + pthread_create(&proxyState->thread, NULL, mscclppProxyService, args); + if (is_p2p) { + mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev); + } else { + mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i); + } } - // P2P proxy - mscclppProxyState *proxyState = &comm->proxyState[MSCCLPP_IB_MAX_DEVS]; - if (proxyState->threads == NULL) { - MSCCLPPCHECK(mscclppCalloc(&proxyState->threads, 1)); - } - if (proxyState->runs == NULL) { - MSCCLPPCHECK(mscclppCalloc(&proxyState->runs, 1)); - } - // Create P2P DMA proxy thread - struct proxyArgs *args; - MSCCLPPCHECK(mscclppCalloc(&args, 1)); - args->comm = comm; - args->ibCtx = NULL; - args->run = proxyState->runs; - args->connIdx = -1; // unused - CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking)); - *args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING; - pthread_create(proxyState->threads, NULL, mscclppProxyService, args); - mscclppSetThreadName(proxyState->threads[0], "MSCCLPP Service P2P - %02d", comm->cudaDev); return mscclppSuccess; } -static void _stopProxy(struct mscclppComm* comm, int devIdx, int connIdx) { - volatile int *run = (volatile int *)&comm->proxyState[devIdx].runs[connIdx]; - if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) return; - *run = MSCCLPP_PROXY_RUN_STATE_EXITING; - while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) { - usleep(1000); - } -} - mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { - for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { - if (comm->ibContext[i] != NULL) { - _stopProxy(comm, i, 0); + for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) { + struct mscclppProxyState *proxyState = comm->proxyState[i]; + if (proxyState == NULL) break; + + volatile int *run = (volatile int *)&proxyState->run; + if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) { + continue; + } + *run = MSCCLPP_PROXY_RUN_STATE_EXITING; + while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) { + usleep(1000); } } - // P2P proxies - _stopProxy(comm, MSCCLPP_IB_MAX_DEVS, 0); return mscclppSuccess; } diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu index f90deb44..04a412d6 100644 --- a/tests/p2p_test.cu +++ b/tests/p2p_test.cu @@ -51,14 +51,11 @@ __global__ void kernel(int rank, int world_size) mscclppDevConn_t devConn = constDevConns[remoteRank]; volatile int *data = (volatile int *)devConn.localBuff; volatile uint64_t *localFlag = devConn.localFlag; +#if (USE_DMA_FOR_P2P == 0) volatile uint64_t *remoteFlag = devConn.remoteFlag; +#endif volatile uint64_t *proxyFlag = devConn.proxyFlag; - int curFifoHead = *devConn.triggerFifoHead; - volatile uint64_t *trig = (volatile uint64_t *)&devConn.trigger[curFifoHead]; - curFifoHead += 1; - if (curFifoHead == MSCCLPP_PROXY_FIFO_SIZE) - curFifoHead = 0; - *devConn.triggerFifoHead = curFifoHead; + mscclppTrigger *trig = devConn.getTrigger(); uint64_t baseFlag = *localFlag; @@ -78,12 +75,10 @@ __global__ void kernel(int rank, int world_size) #if (USE_DMA_FOR_P2P == 1) // Wait until the proxy have sent my data and flag - while (*trig != 0) {} + devConn.waitTrigger(trig); // Trigger sending data and flag - uint64_t dataOffset = rank * sizeof(int); - uint64_t dataSize = sizeof(int); - *trig = TRIGGER_VALUE(mscclppFlag | mscclppData, dataOffset, dataSize); + devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} @@ -91,13 +86,11 @@ __global__ void kernel(int rank, int world_size) #else // USE_DMA_FOR_P2P == 0 if (devConn.remoteBuff == NULL) { // IB - // Trigger sending data and flag - uint64_t dataOffset = rank * sizeof(int); - uint64_t dataSize = sizeof(int); - *trig = TRIGGER_VALUE(mscclppSync | mscclppFlag | mscclppData, dataOffset, dataSize); - // Wait until the proxy have sent my data and flag - while (*trig != 0) {} + devConn.waitTrigger(trig); + + // Trigger sending data and flag + devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int)); // Wait for receiving data from remote rank while (*proxyFlag == baseFlag) {} @@ -259,7 +252,7 @@ int main(int argc, const char *argv[]) CUDACHECK(cudaEventCreate(&ev_end)); // warm up - int warmupiter = 10; + // int warmupiter = 10; // for (int i = 0; i < warmupiter; ++i) { // kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); // } @@ -268,14 +261,14 @@ int main(int argc, const char *argv[]) cudaGraph_t graph; cudaGraphExec_t instance; cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal); - int cudagraphiter = 10; + int cudagraphiter = 100; for (int i = 0; i < cudagraphiter; ++i) { kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size); } cudaStreamEndCapture(stream, &graph); cudaGraphInstantiate(&instance, graph, NULL, NULL, 0); - int cudagraphwarmup = 20; + int cudagraphwarmup = 10; for (int i = 0; i < cudagraphwarmup; ++i) { cudaGraphLaunch(instance, stream); }