Merge pull request #13 from microsoft/chhwang/shared-fifo

Shared FIFO
This commit is contained in:
Saeed Maleki
2023-03-14 12:02:14 -07:00
committed by GitHub
14 changed files with 301 additions and 971 deletions

View File

@@ -108,7 +108,7 @@ LIBSRCS += $(addprefix src/bootstrap/,bootstrap.cc socket.cc)
LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS))
LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
INCEXPORTS := mscclpp.h mscclpp_net.h
INCEXPORTS := mscclpp.h
INCTARGETS := $(INCEXPORTS:%=$(BUILDDIR)/$(INCDIR)/%)
LIBNAME := libmscclpp.so

View File

@@ -8,7 +8,6 @@
#include "core.h"
#include "utils.h"
#include "bootstrap.h"
#include "net.h"
#include <unistd.h>
#include <sys/types.h>
@@ -170,7 +169,7 @@ out:
return NULL;
}
mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle, bool idFromEnv) {
mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle) {
struct mscclppSocket* listenSock;
struct bootstrapRootArgs* args;
pthread_t thread;
@@ -209,10 +208,10 @@ mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool
return mscclppInvalidArgument;
}
if (isRoot)
MSCCLPPCHECK(bootstrapCreateRoot(handle, false));
MSCCLPPCHECK(bootstrapCreateRoot(handle));
} else {
memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union mscclppSocketAddress));
MSCCLPPCHECK(bootstrapCreateRoot(handle, false));
MSCCLPPCHECK(bootstrapCreateRoot(handle));
}
// printf("addr = %s port = %d\n", inet_ntoa(handle->addr.sin.sin_addr), (int)ntohs(handle->addr.sin.sin_port));
// printf("addr = %s\n", inet_ntoa((*(struct sockaddr_in*)&handle->addr.sa).sin_addr));
@@ -248,7 +247,7 @@ mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscc
struct mscclppSocket* proxySocket;
mscclppSocketAddress nextAddr;
struct mscclppSocket sock, listenSockRoot;
struct extInfo info = { 0 };
struct extInfo info;
MSCCLPPCHECK(mscclppCalloc(&state, 1));
state->rank = rank;

View File

@@ -5,7 +5,7 @@
************************************************************************/
#include "core.h"
#include "mscclpp_net.h"
#include "debug.h"
#include <stdlib.h>
#include <stdarg.h>
#include <sys/syscall.h>

View File

@@ -186,8 +186,8 @@ mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct ms
qp_init_attr.send_cq = cq;
qp_init_attr.recv_cq = cq;
qp_init_attr.qp_type = IBV_QPT_RC;
qp_init_attr.cap.max_send_wr = MAXCONNECTIONS;
qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS;
qp_init_attr.cap.max_send_wr = MAXCONNECTIONS * MSCCLPP_PROXY_FIFO_SIZE;
qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS * MSCCLPP_PROXY_FIFO_SIZE;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = 0;
@@ -381,25 +381,23 @@ int mscclppIbQp::stageSendWithImm(struct mscclppIbMr *ibMr, const mscclppIbMrInf
int mscclppIbQp::postSend()
{
struct ibv_send_wr *bad_wr;
int ret = ibv_post_send(this->qp, this->wrs, &bad_wr);
if (ret != 0) {
return ret;
}
// std::memset(this->wrs, 0, sizeof(struct ibv_send_wr) * this->wrn);
// std::memset(this->sges, 0, sizeof(struct ibv_sge) * this->wrn);
this->wrn = 0;
return 0;
struct ibv_send_wr *bad_wr;
int ret = ibv_post_send(this->qp, this->wrs, &bad_wr);
if (ret != 0) {
return ret;
}
this->wrn = 0;
return 0;
}
int mscclppIbQp::postRecv(uint64_t wrId)
{
struct ibv_recv_wr wr, *bad_wr;
wr.wr_id = wrId;
wr.sg_list = nullptr;
wr.num_sge = 0;
wr.next = nullptr;
return ibv_post_recv(this->qp, &wr, &bad_wr);
struct ibv_recv_wr wr, *bad_wr;
wr.wr_id = wrId;
wr.sg_list = nullptr;
wr.num_sge = 0;
wr.next = nullptr;
return ibv_post_recv(this->qp, &wr, &bad_wr);
}
int mscclppIbQp::pollCq()

View File

@@ -19,7 +19,7 @@ struct mscclppBootstrapHandle {
static_assert(sizeof(struct mscclppBootstrapHandle) <= sizeof(mscclppUniqueId), "Bootstrap handle is too large to fit inside MSCCLPP unique ID");
mscclppResult_t bootstrapNetInit(const char* ip_port_pair = NULL);
mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle, bool idFromEnv);
mscclppResult_t bootstrapCreateRoot(struct mscclppBootstrapHandle* handle);
mscclppResult_t bootstrapGetUniqueId(struct mscclppBootstrapHandle* handle, bool isRoot = true, const char* ip_port_pair = NULL);
mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscclppComm* comm);
mscclppResult_t bootstrapAllGather(void* commState, void* allData, int size);

View File

@@ -7,24 +7,9 @@
#ifndef MSCCLPP_COMM_H_
#define MSCCLPP_COMM_H_
// #include "transport.h"
// #include "p2p.h"
// #include "collectives.h"
#include "proxy.h"
// #include "strongstream.h"
#include "ib.h"
// #if CUDART_VERSION < 9000
// struct cudaLaunchParams {
// void *func;
// dim3 gridDim;
// dim3 blockDim;
// void **args;
// size_t sharedMem;
// cudaStream_t stream;
// };
// #endif
// #define CACHE_LINE_SIZE 128
// #define MEM_ALIGN 4096
// #define CUDA_IPC_MIN 2097152UL
@@ -36,139 +21,12 @@
#define MAXCONNECTIONS 1024
// struct mscclppSendMem {
// union {
// struct {
// uint64_t head;
// char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
// void* ptrExchange;
// uint64_t redOpArgExchange[2];
// char pad2[CACHE_LINE_SIZE-sizeof(void*)-2*sizeof(uint64_t)];
// int offsFifo[MSCCLPP_STEPS];
// };
// char pad3[MEM_ALIGN];
// };
// };
// struct mscclppRecvMem {
// union {
// struct {
// uint64_t tail;
// char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
// int sizesFifo[MSCCLPP_STEPS];
// int offsFifo[MSCCLPP_STEPS];
// int flush; // For GDRCopy-based flush
// };
// char pad4[MEM_ALIGN];
// };
// };
// enum helperThreadState {ThreadStart, ThreadStop};
// #define MSCCLPP_IPC_POOL_SIZE (2*MSCCLPP_MAX_LOCAL_RANKS*MSCCLPP_MAX_OPS)
// struct mscclppGraphHelperResources {
// mscclppComm* comm;
// pthread_mutex_t threadLock;
// pthread_cond_t threadCond;
// enum helperThreadState threadState;
// void* ipcBases[MSCCLPP_IPC_POOL_SIZE];
// int ipcTail;
// int ipcHead;
// };
// struct mscclppUserRedOp {
// int freeNext; // -1=allocated, otherwise index of next free entry in array
// mscclppDataType_t datatype;
// mscclppDevRedOpFull opFull;
// };
// struct mscclppNodeRanks {
// int localRanks;
// int* localRankToRank;
// };
// struct mscclppDestructor {
// struct mscclppDestructor* next;
// void* obj;
// mscclppResult_t(*fn)(struct mscclppDestructor* me);
// };
// struct mscclppCommCallback {
// struct mscclppCommCallback* next;
// mscclppResult_t(*fn)(struct mscclppComm* comm, struct mscclppCommCallback* cb);
// };
// struct mscclppChannel {
// struct mscclppChannelPeer* peers;
// struct mscclppDevChannelPeer* devPeers;
// struct mscclppRing ring;
// int* devRingUserRanks;
// struct mscclppTree tree;
// struct mscclppTree collnetChain;
// struct mscclppDirect collnetDirect;
// int id; // index of this channel
// uint32_t workFifoSent; // last used work index+1
// uint64_t p2pOpCount;
// };
// struct mscclppWorkList {
// struct mscclppWorkList* next;
// struct mscclppWork work;
// };
// struct mscclppPointerList {
// struct mscclppPointerList* next;
// void *ptr;
// };
// struct mscclppKernelPlan {
// // A kernel plan is also a callback that reclaims itself. Hence this must
// // be the first member.
// struct mscclppCommCallback reclaimer;
// struct mscclppMemoryPool memPool_mscclppProxyOp; // memory to return to comm in cleanup
// struct mscclppComm* comm;
// struct mscclppKernelPlan* next;
// bool persistent; // aka captured in a graph
// bool kernelSpecialized;
// void *kernelFn;
// int channelUbound; // only channels c < channelUbound are present
// int channelCount; // number of channels present
// uint64_t channelMask; // which channels are present, channelCount == popcount(channelMask)
// bool hasProxyOps; // does any channel have a non-empty proxyOpQueue
// int threadPerBlock;
// // workHeap fields are null until uploadWorkFifo() or preparePersistentKernel()
// struct mscclppWork* workHead;
// int collOpCount; // zero based for this plan
// struct mscclppIntruQueue<struct mscclppPointerList, &mscclppPointerList::next> ipcMemQueue;
// struct Channel {
// int nWork;
// union {
// int nWorkElem; // used for coll and reg coll
// int p2pTailElem[2]; // used for p2p, indexed by mscclppWorkElemP2pType-1
// };
// size_t collBytes;
// struct mscclppIntruQueue<struct mscclppWorkList, &mscclppWorkList::next> workQueue;
// struct mscclppIntruQueue<struct mscclppProxyOp, &mscclppProxyOp::enqNext> proxyOpQueue;
// } channels[MAXCHANNELS];
// };
struct mscclppConn {
mscclppTransport_t transport;
int remoteRank;
int buffSize;
mscclppTrigger *cpuTriggerFifo;
// fifoTail indicates where CPU needs to read the head of the fifo. only accessible by CPU
// No atomicity is required for fifoTail as only a single CPU thread accesses it.
int fifoTail;
uint64_t *remoteProxyFlag;
uint64_t *cpuProxyFlag;
void *cpuTriggerFifoGdrDesc;
void *cpuProxyFlagGdrDesc;
struct mscclppDevConn *devConn;
struct mscclppIbContext *ibCtx;
@@ -182,231 +40,22 @@ struct mscclppConn {
};
struct mscclppComm {
// struct mscclppMemoryStack memPermanent, memScoped;
// // List of destructors to run when comm is destructed
// struct mscclppDestructor* destructorHead;
// struct mscclppChannel channels[MAXCHANNELS];
// struct mscclppPeerInfo* peerInfo;
// struct mscclppTopoSystem* topo;
struct mscclppConn conns[MAXCONNECTIONS];
int nConns;
// mscclppNet_t* mscclppNet;
// mscclppCollNet_t* mscclppCollNet;
void* bootstrap;
// // Bitmasks for mscclppTransportP2pSetup
// uint64_t* connectSend;
// uint64_t* connectRecv;
uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
int rank; // my rank in the communicator
int nRanks; // number of GPUs in communicator
int cudaDev; // my cuda device index
// int compCap; // compute capability of the GPU
// int64_t busId; // my PCI bus ID in int format
// cpu_set_t cpuAffinity; // CPU affinity of the GPU
// int node;
// int nNodes;
// int localRank;
// int localRanks;
// int maxLocalRanks;
// int* rankToNode;
// int* rankToLocalRank;
// int* localRankToRank;
// // localRanks and localRanktoRank for all nodes
// struct mscclppNodeRanks* nodeRanks;
// bool checkPointers;
// bool dmaBufSupport;
// // Counter for tracking CUDA launches (P2P and collectives included)
// uint64_t opCount;
// // Collective operation counter
// uint64_t collOpCount;
// // Channels for collectives
// int nChannels;
// // Channels (per peer) for p2p
// int p2pnChannels;
// int p2pnChannelsPerPeer;
// int p2pChannels[MAXCHANNELS];
// // Should this comm allocate LL buffers for network P2P connections?
// bool allocP2pNetLLBuffers;
// // Buffer sizes
// int buffSizes[MSCCLPP_NUM_PROTOCOLS];
// int p2pChunkSize;
// // Algorithm/Protocols thresholds
// ssize_t threadThresholds[MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS];
// float latencies[MSCCLPP_NUM_FUNCTIONS][MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS];
// float bandwidths[MSCCLPP_NUM_FUNCTIONS][MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS];
// int maxThreads[MSCCLPP_NUM_ALGORITHMS][MSCCLPP_NUM_PROTOCOLS];
// /* This attribute can indicate the states of communicators and return code of
// * asynchronous MSCCLPP operations. */
// mscclppResult_t asyncResult;
// Flag to ask MSCCLPP kernels to abort
volatile uint32_t *abortFlag;
// // Device side of the communicator (for cudaFree's)
// struct mscclppDevComm* devComm; // actually = &mscclppDevCommAndChannels::comm
// // Operation pool.
// int workFifoDepth; // size of workFifoHeap[], power of 2
// struct mscclppWork* workFifoHeap;
// struct mscclppWork* devWorkFifoHeap;
// void* workFifoHeapGdrHandle;
// // Work completion notificaion
// uint32_t* workFifoDone/*[MAXCHANNELS]*/; // in cudaHost memory
// uint32_t workFifoSent; // Monotonic (mod 1<<32) index of next unused fifo slot.
// uint32_t workFifoAckdMin; // Monotonic index of least unprocessed fifo slot over all channels.
// // Intra-process sync
// struct mscclppComm* intraComm0; // leader of intra-process comms (self possible)
// struct mscclppComm* intraNext; // next of intra-process comms, intraComm0 is head
// int intraRank;
// int intraRanks;
// uint32_t intraBarrierPhase;
// char intraPad1[64 - sizeof(uint64_t)];
// uint64_t intraBarrierCounter; // only used if this is intraComm0
// char intraPad2[64 - sizeof(uint64_t)];
// uint64_t intraBarrierGate; // only used if this is intraComm0
struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS];
// Last one is for P2P proxies.
struct mscclppProxyState proxyState[MSCCLPP_IB_MAX_DEVS + 1];
// // Whether this communicator uses collNet
// int collNetSupport;
// int intraHighestTransportType;
// size_t channelSize; // User requested work size (bytes) for channel partitions
// // Internal streams
// struct mscclppStrongStream deviceStream, hostStream;
// // pools backed by comm->memPermanent
// struct mscclppMemoryPool memPool_mscclppProxyOp;
// struct mscclppMemoryPool memPool_mscclppKernelPlan;
// struct mscclppMemoryPool memPool_mscclppPointerList;
// // Next comm in this thread's active mscclppGroup[Start|End](). Holds "0x1" when
// // this comm is not yet in a group.
// struct mscclppComm* groupNext;
// // Subset of those in groupNext list. Holds 0x1 if not needing preconnect.
// struct mscclppComm* preconnectNext;
// int persistentRefs; // number of persistent plan-lists capturing this comm
// struct mscclppTasks tasks;
// // user-created reduction ops
// int userRedOpCapacity, userRedOpFreeHead;
// mscclppUserRedOp *userRedOps;
// // Queue of things for the main thread to do
// struct mscclppIntruQueueMpsc<struct mscclppCommCallback, &mscclppCommCallback::next> callbackQueue;
// // List of kernel plans built form tasks.
// struct mscclppIntruQueue<struct mscclppKernelPlan, &mscclppKernelPlan::next> planQueue;
// // First of the unlaunched kernels in `planQueue`
// struct mscclppKernelPlan* unlaunchedPlansHead;
// // communicator mode
// int blocking;
// // initState is to more conveniently reclaim resources when errors happen.
// mscclppResult_t initState;
// // flag to indicate if mscclppCommFinalize() is called
// bool finalizeCalled;
// // shared structures for finalization
// int finalizeRankCnt;
struct mscclppProxyState *proxyState[MSCCLPP_PROXY_MAX_NUM];
};
// enum mscclppLaunchMode {
// mscclppLaunchModeInvalid=0,
// mscclppLaunchModeParallel,
// mscclppLaunchModeGroup
// };
// extern enum mscclppLaunchMode mscclppParamLaunchMode;
// void mscclppCommPushFree(struct mscclppComm* comm, void* buf);
// void mscclppCommPushCudaFree(struct mscclppComm* comm, void* buf);
// void mscclppCommPushCudaHostFree(struct mscclppComm* comm, void* buf);
// void mscclppCommPushCudaGdrFree(struct mscclppComm* comm, void* handle);
// inline mscclppResult_t mscclppCommPollCallbacks(struct mscclppComm* comm, bool waitSome) {
// mscclppResult_t result = mscclppSuccess;
// struct mscclppCommCallback* cb = mscclppIntruQueueMpscDequeueAll(&comm->callbackQueue, waitSome);
// while (cb != nullptr) {
// struct mscclppCommCallback* next = cb->next;
// mscclppResult_t res1 = cb->fn(comm, cb); // may reclaim memory of cb
// if (res1 != mscclppSuccess) result = res1;
// cb = next;
// }
// MSCCLPPCHECK(result);
// return mscclppSuccess;
// }
// inline void mscclppCommIntraBarrierIn(struct mscclppComm* comm, uint32_t x) {
// int phase = comm->intraBarrierPhase;
// if (comm->intraRanks == 1) {
// // Release everyone (just me).
// comm->intraBarrierGate = (uint64_t(x)<<32) | (phase^1);
// } else {
// struct mscclppComm* comm0 = comm->intraComm0;
// uint64_t count = __atomic_add_fetch(&comm0->intraBarrierCounter, (uint64_t(x)<<32) + 1, __ATOMIC_RELEASE);
// if (uint32_t(count) == uint32_t(comm->intraRanks)) {
// // Reset.
// __atomic_store_n(&comm0->intraBarrierCounter, 0, __ATOMIC_RELAXED);
// // Release everyone.
// __atomic_store_n(&comm0->intraBarrierGate, (count>>32<<32) | (phase^1), __ATOMIC_RELEASE);
// }
// }
// }
// // returns sum of x values contributed to mscclppCommIntraBarrierIn(comm, x)
// inline uint32_t mscclppCommIntraBarrierOut(struct mscclppComm* comm) {
// struct mscclppComm* comm0 = comm->intraComm0;
// comm->intraBarrierPhase ^= 1;
// uint32_t phase = comm->intraBarrierPhase;
// uint64_t gate = __atomic_load_n(&comm0->intraBarrierGate, __ATOMIC_RELAXED);
// if ((gate & 1) != phase) {
// uint64_t t0 = clockNano();
// do {
// // Spin vigorously for first 5us.
// if (clockNano()-t0 >= 5*1000) sched_yield();
// gate = __atomic_load_n(&comm0->intraBarrierGate, __ATOMIC_RELAXED);
// } while ((gate & 1) != phase);
// }
// if (comm->intraRanks != 1) __atomic_thread_fence(__ATOMIC_ACQUIRE);
// return gate>>32;
// }
// // Scrambles the bits of non-builtin values of mscclppRedOp_t according to the
// // communicator memory address. Used to catch bugs so that integer handles
// // associated with this communicator won't collide with handles of other
// // communicatrs. This function is its own inverse.
// static inline mscclppRedOp_t mscclppUserRedOpMangle(mscclppComm *comm, mscclppRedOp_t op) {
// // Preserve the built-in values.
// if(int(op) < int(mscclppNumOps))
// return op;
// uint64_t h = reinterpret_cast<uint64_t>(comm);
// h ^= h >> 32;
// h *= 0x9e3779b97f4a7c13u; // Knuth's 64-bit magical hash constant
// h >>= 32; // h is now an excellent 32-bit hash of the comm pointer
// h &= int(mscclppMaxRedOp); // mscclppMaxRedOp is a power of 2 minus 1
// int op1 = int(h) ^ int(op);
// // Since builtin values are preserved, we also have to preserve their preimage.
// return op1 < int(mscclppNumOps) ? op : mscclppRedOp_t(op1);
// }
// mscclppResult_t mscclppCommEnsureReady(mscclppComm_t comm);
// mscclppResult_t mscclppCommSetAsyncError(mscclppComm_t comm, mscclppResult_t nextState);
#endif

View File

@@ -7,7 +7,7 @@
#ifndef MSCCLPP_DEBUG_H_
#define MSCCLPP_DEBUG_H_
#include "mscclpp_net.h"
#include "mscclpp.h"
#include <stdio.h>
#include <chrono>
#include <type_traits>
@@ -19,6 +19,9 @@
// Conform to pthread and NVTX standard
#define MSCCLPP_THREAD_NAMELEN 16
typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel;
typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys;
extern int mscclppDebugLevel;
extern uint64_t mscclppDebugMask;
extern pthread_mutex_t mscclppDebugLock;

View File

@@ -22,18 +22,23 @@ typedef enum : uint64_t { mscclppData = 0x1,
mscclppFlag = 0x2,
mscclppSync = 0x4} mscclppTriggerType_t;
#define MSCCLPP_SIZE_BITS 30
#define MSCCLPP_OFFSET_BITS 31
#define MSCCLPP_BITS_SIZE 32
#define MSCCLPP_BITS_OFFSET 32
#define MSCCLPP_BITS_TYPE 3
#define MSCCLPP_BITS_CONNID 10
#define TRIGGER_VALUE(__TYPE__,__OFFSET__,__SIZE__) (((((__TYPE__) << MSCCLPP_OFFSET_BITS) + (__OFFSET__)) << MSCCLPP_SIZE_BITS) + __SIZE__ )
// the summation of number of bits must be 64 or less
union alignas(8) mscclppTrigger {
uint64_t value;
// the summation of number of bits must be 128 or less
union alignas(16) mscclppTrigger {
uint64_t value[2];
struct {
uint64_t dataSize : MSCCLPP_SIZE_BITS;
uint64_t dataOffset : MSCCLPP_OFFSET_BITS;
uint64_t type : 3;
// first 64 bits: value[0]
uint64_t dataSize : MSCCLPP_BITS_SIZE;
uint64_t dataOffset : MSCCLPP_BITS_OFFSET;
uint64_t : (64-MSCCLPP_BITS_SIZE-MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment
// second 64 bits: value[1]
uint64_t connId : MSCCLPP_BITS_CONNID;
uint64_t type : MSCCLPP_BITS_TYPE;
uint64_t : (64-MSCCLPP_BITS_CONNID-MSCCLPP_BITS_TYPE); // ensure 64-bit alignment
} fields;
};
@@ -64,6 +69,26 @@ union alignas(8) mscclppTrigger {
***************************************/
struct mscclppDevConn {
#ifdef __CUDACC__
__forceinline__ __device__ mscclppTrigger *getTrigger() {
unsigned int curFifoHead = atomicInc(this->triggerFifoHead, MSCCLPP_PROXY_FIFO_SIZE - 1);
return &this->trigger[curFifoHead];
}
__forceinline__ __device__ void setTrigger(mscclppTrigger *trig, uint64_t type, uint64_t dataOffset, uint64_t dataSize) {
asm volatile(
"st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(&trig->value),
"l"((dataOffset << (MSCCLPP_BITS_SIZE)) +
(dataSize)),
"l"((type << MSCCLPP_BITS_CONNID) + this->connId));
}
__forceinline__ __device__ void waitTrigger(mscclppTrigger *trig) {
// Check only the first 64 bits
while (*(volatile uint64_t *)trig->value != 0) {}
}
#endif // __CUDACC__
int tag;
void* localBuff;
@@ -80,9 +105,10 @@ struct mscclppDevConn {
// // localBuff[srcOffset..srcOffset+size-1] <- remoteBuff[dstOffset..dstOffset+size-1]
// virtual void pullRemoteBuff(size_t srcOffset, size_t dstOffset, size_t size);
int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic
unsigned int* triggerFifoHead; // indicates the tail of the fifo. only accessible by the gpu. for parallel, access use atomic
mscclppTrigger* trigger;
uint64_t* proxyFlag;
int connId;
};
typedef struct mscclppComm* mscclppComm_t;

View File

@@ -1,313 +0,0 @@
/*************************************************************************
* Copyright (c) 2017-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_NET_H_
#define MSCCLPP_NET_H_
#include "mscclpp.h"
#include <stdint.h>
#define MSCCLPP_NET_HANDLE_MAXSIZE 128
#define MSCCLPP_PTR_HOST 0x1
#define MSCCLPP_PTR_CUDA 0x2
#define MSCCLPP_PTR_DMABUF 0x4
// Maximum number of requests per comm object
#define MSCCLPP_NET_MAX_REQUESTS 8
typedef enum {MSCCLPP_LOG_NONE=0, MSCCLPP_LOG_VERSION=1, MSCCLPP_LOG_WARN=2, MSCCLPP_LOG_INFO=3, MSCCLPP_LOG_ABORT=4, MSCCLPP_LOG_TRACE=5} mscclppDebugLogLevel;
typedef enum {MSCCLPP_INIT=1, MSCCLPP_COLL=2, MSCCLPP_P2P=4, MSCCLPP_SHM=8, MSCCLPP_NET=16, MSCCLPP_GRAPH=32, MSCCLPP_TUNING=64, MSCCLPP_ENV=128, MSCCLPP_ALLOC=256, MSCCLPP_CALL=512, MSCCLPP_ALL=~0} mscclppDebugLogSubSys;
typedef void (*mscclppDebugLogger_t)(mscclppDebugLogLevel level, unsigned long flags, const char *file, int line, const char *fmt, ...);
typedef struct {
char* name; // Used mostly for logging.
char* pciPath; // Path to the PCI device in /sys.
uint64_t guid; // Unique identifier for the NIC chip. Important for
// cards with multiple PCI functions (Physical or virtual).
int ptrSupport; // [MSCCLPP_PTR_HOST|MSCCLPP_PTR_CUDA|MSCCLPP_PTR_DMABUF]
int speed; // Port speed in Mbps.
int port; // Port number.
float latency; // Network latency
int maxComms; // Maximum number of comms we can create
int maxRecvs; // Maximum number of grouped receives.
}mscclppNetProperties_v6_t;
typedef mscclppNetProperties_v6_t mscclppNetProperties_t;
typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network.
mscclppResult_t (*init)(mscclppDebugLogger_t logFunction);
// Return the number of adapters.
mscclppResult_t (*devices)(int* ndev);
// Get various device properties.
mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create a connection.
mscclppResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer.
// This call must not block for the connection to be established, and instead
// should return successfully with sendComm == NULL with the expectation that
// it will be called again until sendComm != NULL.
mscclppResult_t (*connect)(int dev, void* handle, void** sendComm);
// Finalize connection establishment after remote peer has called connect.
// This call must not block for the connection to be established, and instead
// should return successfully with recvComm == NULL with the expectation that
// it will be called again until recvComm != NULL.
mscclppResult_t (*accept)(void* listenComm, void** recvComm);
// Register/Deregister memory. Comm can be either a sendComm or a recvComm.
// Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA.
mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle);
/* DMA-BUF support */
mscclppResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
mscclppResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer.
// May return request == NULL if the call cannot be performed (or would block)
mscclppResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request);
// Asynchronous recv from a peer.
// May return request == NULL if the call cannot be performed (or would block)
mscclppResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request);
// Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is
// visible to the GPU
mscclppResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
mscclppResult_t (*test)(void* request, int* done, int* sizes);
// Close and free send/recv comm objects
mscclppResult_t (*closeSend)(void* sendComm);
mscclppResult_t (*closeRecv)(void* recvComm);
mscclppResult_t (*closeListen)(void* listenComm);
} mscclppNet_v6_t;
typedef mscclppNet_v6_t mscclppNet_t;
#define MSCCLPP_PLUGIN_SYMBOL mscclppNetPlugin_v6
typedef struct {
// Name of the collective network (mainly for logs)
const char* name;
// Initialize the collective network.
mscclppResult_t (*init)(mscclppDebugLogger_t logFunction);
// Return the number of adapters capable of doing collective operations.
// If ndev returns 0, all other functions might be set to NULL.
mscclppResult_t (*devices)(int* ndev);
// Get various device properties.
mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create connections.
mscclppResult_t (*listen)(int dev, void* handle, void** listenComm);
// Create a group for collective operations. handles have been created
// using listen() above. rank indicates caller's rank in the collective network.
mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm);
// Returns whether a reduction operation on a data type is supported.
// 1 for supported, 0 otherwise.
mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported);
// Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA.
mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle);
/* DMA-BUF support */
mscclppResult_t (*regMrDmaBuf)(void* collComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
mscclppResult_t (*deregMr)(void* collComm, void* mhandle);
// Performs an asynchronous allreduce operation on the collective group.
// May return request == NULL if the call cannot be performed (or would block).
mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count,
mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request);
// Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is
// visible to the GPU
mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
mscclppResult_t (*test)(void* request, int* done, int* size);
// Close and free collective comm objects
mscclppResult_t (*closeColl)(void* collComm);
mscclppResult_t (*closeListen)(void* listenComm);
} mscclppCollNet_v6_t;
typedef mscclppCollNet_v6_t mscclppCollNet_t;
#define MSCCLPP_COLLNET_PLUGIN_SYMBOL mscclppCollNetPlugin_v6
// v5 struct for backwards compatibility
typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network.
mscclppResult_t (*init)(mscclppDebugLogger_t logFunction);
// Return the number of adapters.
mscclppResult_t (*devices)(int* ndev);
// Get various device properties.
mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create a connection.
mscclppResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer.
// This call must not block for the connection to be established, and instead
// should return successfully with sendComm == NULL with the expectation that
// it will be called again until sendComm != NULL.
mscclppResult_t (*connect)(int dev, void* handle, void** sendComm);
// Finalize connection establishment after remote peer has called connect.
// This call must not block for the connection to be established, and instead
// should return successfully with recvComm == NULL with the expectation that
// it will be called again until recvComm != NULL.
mscclppResult_t (*accept)(void* listenComm, void** recvComm);
// Register/Deregister memory. Comm can be either a sendComm or a recvComm.
// Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA.
mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle);
mscclppResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer.
// May return request == NULL if the call cannot be performed (or would block)
mscclppResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request);
// Asynchronous recv from a peer.
// May return request == NULL if the call cannot be performed (or would block)
mscclppResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request);
// Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is
// visible to the GPU
mscclppResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
mscclppResult_t (*test)(void* request, int* done, int* sizes);
// Close and free send/recv comm objects
mscclppResult_t (*closeSend)(void* sendComm);
mscclppResult_t (*closeRecv)(void* recvComm);
mscclppResult_t (*closeListen)(void* listenComm);
} mscclppNet_v5_t;
// v5 struct for backwards compatibility
typedef struct {
// Name of the collective network (mainly for logs)
const char* name;
// Initialize the collective network.
mscclppResult_t (*init)(mscclppDebugLogger_t logFunction);
// Return the number of adapters capable of doing collective operations.
// If ndev returns 0, all other functions might be set to NULL.
mscclppResult_t (*devices)(int* ndev);
// Get various device properties.
mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v6_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create connections.
mscclppResult_t (*listen)(int dev, void* handle, void** listenComm);
// Create a group for collective operations. handles have been created
// using listen() above. rank indicates caller's rank in the collective network.
mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm);
// Returns whether a reduction operation on a data type is supported.
// 1 for supported, 0 otherwise.
mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported);
// Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA.
mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle);
mscclppResult_t (*deregMr)(void* collComm, void* mhandle);
// Performs an asynchronous allreduce operation on the collective group.
// May return request == NULL if the call cannot be performed (or would block).
mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count,
mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request);
// Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is
// visible to the GPU
mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
mscclppResult_t (*test)(void* request, int* done, int* size);
// Close and free collective comm objects
mscclppResult_t (*closeColl)(void* collComm);
mscclppResult_t (*closeListen)(void* listenComm);
} mscclppCollNet_v5_t;
// v4 struct for backwards compatibility
typedef struct {
char* name; // Used mostly for logging.
char* pciPath; // Path to the PCI device in /sys.
uint64_t guid; // Unique identifier for the NIC chip. Important for
// cards with multiple PCI functions (Physical or virtual).
int ptrSupport; // MSCCLPP_PTR_HOST or MSCCLPP_PTR_HOST|MSCCLPP_PTR_CUDA
int speed; // Port speed in Mbps.
int port; // Port number.
int maxComms; // Maximum number of comms we can create
} mscclppNetProperties_v4_t;
// v4 struct for backwards compatibility
typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network.
mscclppResult_t (*init)(mscclppDebugLogger_t logFunction);
// Return the number of adapters.
mscclppResult_t (*devices)(int* ndev);
// Get various device properties.
mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v4_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create a connection.
mscclppResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer.
mscclppResult_t (*connect)(int dev, void* handle, void** sendComm);
// Finalize connection establishment after remote peer has called connectHandle
mscclppResult_t (*accept)(void* listenComm, void** recvComm);
// Register/Deregister memory. Comm can be either a sendComm or a recvComm.
// Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA.
mscclppResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle);
mscclppResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer.
// May return request == NULL if the call cannot be performed (or would block)
mscclppResult_t (*isend)(void* sendComm, void* data, int size, void* mhandle, void** request);
// Asynchronous recv from a peer.
// May return request == NULL if the call cannot be performed (or would block)
mscclppResult_t (*irecv)(void* recvComm, void* data, int size, void* mhandle, void** request);
// Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is
// visible to the GPU
mscclppResult_t (*iflush)(void* recvComm, void* data, int size, void* mhandle, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
mscclppResult_t (*test)(void* request, int* done, int* size);
// Close and free send/recv comm objects
mscclppResult_t (*closeSend)(void* sendComm);
mscclppResult_t (*closeRecv)(void* recvComm);
mscclppResult_t (*closeListen)(void* listenComm);
} mscclppNet_v4_t;
// v4 struct for backwards compatibility
typedef struct {
// Name of the collective network (mainly for logs)
const char* name;
// Initialize the collective network.
mscclppResult_t (*init)(mscclppDebugLogger_t logFunction);
// Return the number of adapters capable of doing collective operations.
// If ndev returns 0, all other functions might be set to NULL.
mscclppResult_t (*devices)(int* ndev);
// Get various device properties.
mscclppResult_t (*getProperties)(int dev, mscclppNetProperties_v4_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to MSCCLPP_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create connections.
mscclppResult_t (*listen)(int dev, void* handle, void** listenComm);
// Create a group for collective operations. handles have been created
// using listen() above. rank indicates caller's rank in the collective network.
mscclppResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm);
// Returns whether a reduction operation on a data type is supported.
// 1 for supported, 0 otherwise.
mscclppResult_t (*reduceSupport)(mscclppDataType_t dataType, mscclppRedOp_t redOp, int* supported);
// Register/Deregister memory. Type is either MSCCLPP_PTR_HOST or MSCCLPP_PTR_CUDA.
mscclppResult_t (*regMr)(void* collComm, void* data, int size, int type, void** mhandle);
mscclppResult_t (*deregMr)(void* collComm, void* mhandle);
// Performs an asynchronous allreduce operation on the collective group.
// May return request == NULL if the call cannot be performed (or would block).
mscclppResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, int count,
mscclppDataType_t dataType, mscclppRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request);
// Perform a flush/fence to make sure all data received with MSCCLPP_PTR_CUDA is
// visible to the GPU
mscclppResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
mscclppResult_t (*test)(void* request, int* done, int* size);
// Close and free collective comm objects
mscclppResult_t (*closeColl)(void* collComm);
mscclppResult_t (*closeListen)(void* listenComm);
} mscclppCollNet_v4_t;
#endif // end include guard

View File

@@ -1,46 +0,0 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_INT_NET_H_
#define MSCCLPP_INT_NET_H_
#include "mscclpp.h"
#include "mscclpp_net.h"
// #include "comm.h"
#include "checks.h"
typedef char mscclppNetHandle_t[MSCCLPP_NET_HANDLE_MAXSIZE];
mscclppResult_t mscclppNetPluginInit();
// mscclppResult_t mscclppNetInit(struct mscclppComm* comm);
// int mscclppNetVersion(struct mscclppComm* comm);
// // Translation to external API
// static const char* mscclppNetName(struct mscclppComm* comm) { return comm->mscclppNet->name; }
// static mscclppResult_t mscclppNetDevices(struct mscclppComm* comm, int* ndev) { MSCCLPPCHECK(comm->mscclppNet->devices(ndev)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetGetProperties(struct mscclppComm* comm, int dev, mscclppNetProperties_t* props) { MSCCLPPCHECK(comm->mscclppNet->getProperties(dev, props)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetListen(struct mscclppComm* comm, int dev, void* handle, void** listenComm) { MSCCLPPCHECK(comm->mscclppNet->listen(dev, handle, listenComm)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetConnect(struct mscclppComm* comm, int dev, void* handle, void** sendComm) { MSCCLPPCHECK(comm->mscclppNet->connect(dev, handle, sendComm)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetAccept(struct mscclppComm* comm, void* listenComm, void** recvComm) { MSCCLPPCHECK(comm->mscclppNet->accept(listenComm, recvComm)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetRegMr(struct mscclppComm* comm, void* netComm, void* data, int size, int type, void** mhandle) { MSCCLPPCHECK(comm->mscclppNet->regMr(netComm, data, size, type, mhandle)); return mscclppSuccess; }
// /* DMA-BUF support */
// static mscclppResult_t mscclppNetRegMrDmaBuf(struct mscclppComm* comm, void* netComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle) { MSCCLPPCHECK(comm->mscclppNet->regMrDmaBuf(netComm, data, size, type, offset, fd, mhandle)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetDeregMr(struct mscclppComm* comm, void* netComm, void* mhandle) { MSCCLPPCHECK(comm->mscclppNet->deregMr(netComm, mhandle)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetIsend(struct mscclppComm* comm, void* sendComm, void* data, int size, int tag, void* mhandle, void** request) { MSCCLPPCHECK(comm->mscclppNet->isend(sendComm, data, size, tag, mhandle, request)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetIrecv(struct mscclppComm* comm, void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request) { MSCCLPPCHECK(comm->mscclppNet->irecv(recvComm, n, data, sizes, tags, mhandles, request)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetIflush(struct mscclppComm* comm, void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) { MSCCLPPCHECK(comm->mscclppNet->iflush(recvComm, n, data, sizes, mhandles, request)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetTest(struct mscclppComm* comm, void* request, int* done, int* sizes) { MSCCLPPCHECK(comm->mscclppNet->test(request, done, sizes)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetCloseSend(struct mscclppComm* comm, void* sendComm) { MSCCLPPCHECK(comm->mscclppNet->closeSend(sendComm)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetCloseRecv(struct mscclppComm* comm, void* recvComm) { MSCCLPPCHECK(comm->mscclppNet->closeRecv(recvComm)); return mscclppSuccess; }
// static mscclppResult_t mscclppNetCloseListen(struct mscclppComm* comm, void* listenComm) { MSCCLPPCHECK(comm->mscclppNet->closeListen(listenComm)); return mscclppSuccess; }
// // Test whether the current GPU support GPU Direct RDMA.
// mscclppResult_t mscclppGpuGdrSupport(struct mscclppComm* comm, int* gdrSupport);
// extern mscclppNet_t mscclppNetIb;
// extern mscclppNet_t mscclppNetSocket;
#endif

View File

@@ -5,6 +5,8 @@
#include "comm.h"
#include <pthread.h>
#define MSCCLPP_PROXY_MAX_NUM (MSCCLPP_IB_MAX_DEVS + 1) // One is for a P2P proxy.
typedef enum {
MSCCLPP_PROXY_RUN_STATE_IDLE = 0,
MSCCLPP_PROXY_RUN_STATE_RUNNING,
@@ -12,8 +14,16 @@ typedef enum {
} mscclppProxyRunState_t;
struct mscclppProxyState {
pthread_t *threads;
mscclppProxyRunState_t *runs;
pthread_t thread;
mscclppProxyRunState_t run;
mscclppTrigger *cpuTriggerFifo;
mscclppTrigger *gpuTriggerFifo;
// cpuTriggerFifoTail indicates where CPU needs to read the head of the fifo.
unsigned int cpuTriggerFifoTail;
unsigned int *gpuTriggerFifoHead;
void *cpuTriggerFifoGdrDesc;
// NULL for the P2P proxy.
struct mscclppIbContext *ibContext;
};
mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm);

View File

@@ -171,24 +171,16 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i
WARN("devConnOut is the output of this function and needs to be allocated by the user");
return mscclppInvalidUsage;
}
struct mscclppConn *conn = &comm->conns[comm->nConns++];
struct mscclppConn *conn = &comm->conns[comm->nConns];
conn->transport = transportType;
conn->remoteRank = remoteRank;
conn->buffSize = buffSize;
conn->devConn = devConnOut;
conn->devConn->localBuff = localBuff;
conn->devConn->localFlag = localFlag;
conn->devConn->tag = tag;
// TODO(saemal): these two should be shared for all P2P-DMA connections made from each GPU. Same for each IB driver.
MSCCLPPCHECK(mscclppGdrCudaCalloc(&conn->cpuTriggerFifo, &conn->devConn->trigger, MSCCLPP_PROXY_FIFO_SIZE, &conn->cpuTriggerFifoGdrDesc));
MSCCLPPCHECK(mscclppCudaCalloc(&conn->devConn->triggerFifoHead, 1));
conn->ibCtx = NULL;
conn->ibQp = NULL;
int ibDevIdx = -1;
if (ibDev != NULL) {
// Check if an IB context exists
int ibDevIdx = -1;
int firstNullIdx = -1;
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i] == NULL) {
@@ -210,6 +202,39 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, i
}
conn->ibCtx = comm->ibContext[ibDevIdx];
}
// Find a proxy state that uses the given IB device
struct mscclppProxyState *proxyState = NULL;
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
if (comm->proxyState[i] == NULL) {
// Cannot find, create a new one
MSCCLPPCHECK(mscclppCalloc(&proxyState, 1));
MSCCLPPCHECK(mscclppGdrCudaCalloc(&proxyState->cpuTriggerFifo, &proxyState->gpuTriggerFifo,
MSCCLPP_PROXY_FIFO_SIZE, &proxyState->cpuTriggerFifoGdrDesc));
MSCCLPPCHECK(mscclppCudaCalloc(&proxyState->gpuTriggerFifoHead, 1));
proxyState->ibContext = conn->ibCtx;
comm->proxyState[i] = proxyState;
break;
}
if (comm->proxyState[i]->ibContext == conn->ibCtx) {
// `conn->ibCtx == NULL` indicatess the P2P proxy.
proxyState = comm->proxyState[i];
break;
}
}
if (proxyState == NULL) {
// Cannot reach
WARN("Unexpected error");
return mscclppInternalError;
}
conn->devConn = devConnOut;
conn->devConn->localBuff = localBuff;
conn->devConn->localFlag = localFlag;
conn->devConn->tag = tag;
conn->devConn->connId = comm->nConns;
conn->devConn->trigger = proxyState->gpuTriggerFifo;
conn->devConn->triggerFifoHead = proxyState->gpuTriggerFifoHead;
comm->nConns++;
return mscclppSuccess;
}

View File

@@ -5,12 +5,14 @@
#include "ib.h"
#include "checks.h"
#include <emmintrin.h>
#include <sys/syscall.h>
#include <numa.h>
#include <map>
#include <vector>
#include <thread>
#define MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD 100
// TODO(chhwang): verify if MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0 is useful, otherwise delete this option.
#define MSCCLPP_PROXY_FLAG_SET_BY_RDMA 1
@@ -33,25 +35,21 @@ static void NumaBind(int node)
struct proxyArgs {
struct mscclppComm* comm;
struct mscclppIbContext* ibCtx;
struct mscclppProxyState *proxyState;
cudaStream_t stream;
volatile mscclppProxyRunState_t* run;
int connIdx;
};
// TODO(saemal) We need to add a fifo for each DMA engine
static void readTrigger(mscclppTrigger *dst, mscclppTrigger *src) {
__m128i xmm0 = _mm_load_si128((__m128i *)src);
_mm_store_si128((__m128i *)dst, xmm0);
}
void* mscclppProxyServiceP2P(void* _args) {
struct proxyArgs *args = (struct proxyArgs *)_args;
struct mscclppComm *comm = args->comm;
volatile mscclppProxyRunState_t *run = args->run;
std::vector<struct mscclppConn *> conns;
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
// TODO(saemal): we need to create another transport type which doesn't need a proxy.
if (conn->transport == mscclppTransportP2P) {
conns.push_back(conn);
}
}
volatile mscclppProxyRunState_t *run = &args->proxyState->run;
mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo;
unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail;
cudaStream_t stream = args->stream;
free(_args);
@@ -65,33 +63,37 @@ void* mscclppProxyServiceP2P(void* _args) {
PROXYCUDACHECK(cudaSetDevice(comm->cudaDev));
PROXYCUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) {
for (struct mscclppConn *conn : conns) {
// Poll to see if we are ready to send anything
trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]);
if (trigger.value == 0) continue;
// Iterate over what send is needed
if (trigger.fields.type & mscclppData){
void *srcBuff = (void *)((char *)conn->devConn->localBuff + trigger.fields.dataOffset);
void *dstBuff = (void *)((char *)conn->devConn->remoteBuff + trigger.fields.dataOffset);
PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, stream));
}
if (trigger.fields.type & mscclppFlag) {
PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->localFlag, sizeof(uint64_t), cudaMemcpyDeviceToDevice, stream));
}
// Wait for completion
if (trigger.fields.type & mscclppSync){
PROXYCUDACHECK(cudaStreamSynchronize(stream));
}
// Send completion
volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]);
*tmp = 0;
conn->fifoTail++;
if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE)
conn->fifoTail = 0;
int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
// fifoTail indicates where CPU needs to read the head of the fifo.
for (;;) {
if (runCheckCounter-- == 0) {
runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
// Check if we need to exit
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break;
}
// Poll to see if we are ready to send anything
readTrigger(&trigger, &fifo[*fifoTail]);
if (trigger.value[0] == 0) continue;
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
// Iterate over what send is needed
if (trigger.fields.type & mscclppData){
void *srcBuff = (void *)((char *)conn->devConn->localBuff + trigger.fields.dataOffset);
void *dstBuff = (void *)((char *)conn->devConn->remoteBuff + trigger.fields.dataOffset);
PROXYCUDACHECK(cudaMemcpyAsync(dstBuff, srcBuff, trigger.fields.dataSize, cudaMemcpyDeviceToDevice, stream));
}
if (trigger.fields.type & mscclppFlag) {
PROXYCUDACHECK(cudaMemcpyAsync(conn->remoteProxyFlag, conn->devConn->localFlag, sizeof(uint64_t), cudaMemcpyDeviceToDevice, stream));
}
// Wait for completion
if (trigger.fields.type & mscclppSync){
PROXYCUDACHECK(cudaStreamSynchronize(stream));
}
// Send completion: reset only the high 64 bits
*(volatile uint64_t *)(&fifo[*fifoTail]) = 0;
*fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
}
// Need a sync in case previous copies are not completed
@@ -107,15 +109,10 @@ void* mscclppProxyServiceP2P(void* _args) {
void* mscclppProxyServiceIb(void* _args) {
struct proxyArgs *args = (struct proxyArgs *)_args;
struct mscclppComm *comm = args->comm;
struct mscclppIbContext *ibCtx = args->ibCtx;
volatile mscclppProxyRunState_t *run = args->run;
std::vector<struct mscclppConn *> conns;
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
if (conn->transport == mscclppTransportIB) {
conns.push_back(conn);
}
}
struct mscclppIbContext *ibCtx = args->proxyState->ibContext;
volatile mscclppProxyRunState_t *run = &args->proxyState->run;
mscclppTrigger *fifo = args->proxyState->cpuTriggerFifo;
unsigned int *fifoTail = &args->proxyState->cpuTriggerFifoTail;
free(_args);
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
@@ -123,8 +120,16 @@ void* mscclppProxyServiceIb(void* _args) {
SEND_STATE_INIT,
SEND_STATE_INPROGRESS
};
int sendState = SEND_STATE_INIT;
uint64_t currentProxyFlagVlaue = *conn->cpuProxyFlag;
int *sendState;
uint64_t *currentProxyFlagValue;
if (mscclppCalloc((void **)&sendState, comm->nConns) != mscclppSuccess) {
WARN("mscclppCalloc failed: errno %d", errno);
return NULL;
}
if (mscclppCalloc((void **)&currentProxyFlagValue, comm->nConns) != mscclppSuccess) {
WARN("mscclppCalloc failed: errno %d", errno);
return NULL;
}
#endif
int rank = comm->rank;
@@ -134,7 +139,10 @@ void* mscclppProxyServiceIb(void* _args) {
NumaBind(ibCtx->numaNode);
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
for (struct mscclppConn *conn : conns) {
for (int i = 0; i < (int)comm->nConns; ++i) {
sendState[i] = SEND_STATE_INIT;
struct mscclppConn *conn = &comm->conns[i];
currentProxyFlagValue[i] = *conn->cpuProxyFlag;
// Post recv
if (conn->ibQp->postRecv(0) != 0) {
WARN("postRecv failed: errno %d", errno);
@@ -142,28 +150,94 @@ void* mscclppProxyServiceIb(void* _args) {
}
#endif
while (*run == MSCCLPP_PROXY_RUN_STATE_RUNNING) {
for (struct mscclppConn *conn : conns) {
int runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
for (;;) {
if (runCheckCounter-- == 0) {
runCheckCounter = MSCCLPP_PROXY_RUN_STATE_CHECK_PERIOD;
// Check if we need to exit
if (*run != MSCCLPP_PROXY_RUN_STATE_RUNNING) break;
}
// Poll to see if we are ready to send anything
readTrigger(&trigger, &fifo[*fifoTail]);
#if (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 0)
// Try send
if (sendState == SEND_STATE_INIT) {
trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]);
if (trigger.value != 0) {
// Do send
conn->ibQp->stageSendWithImm(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize,
/*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/true, /*immData=*/0);
if (conn->ibQp->postSend() != 0) {
WARN("postSend failed: errno %d", errno);
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
// Try send
if (sendState[trigger.fields.connId] == SEND_STATE_INIT) {
if (trigger.value[0] != 0) {
// Do send
conn->ibQp->stageSendWithImm(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize,
/*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/true, /*immData=*/0);
int ret;
if ((ret = conn->ibQp->postSend()) != 0) {
// Return value is errno.
WARN("postSend failed: errno %d", ret);
}
sendState[trigger.fields.connId] = SEND_STATE_INPROGRESS;
}
}
// Poll completions
wcNum = conn->ibQp->pollCq();
if (wcNum < 0) {
WARN("rank %d pollCq failed: errno %d", rank, errno);
} else {
for (int i = 0; i < wcNum; ++i) {
struct ibv_wc *wc = &conn->ibQp->wcs[i];
if (wc->status != IBV_WC_SUCCESS) {
WARN("rank %d wc status %d", rank, wc->status);
continue;
}
if (wc->qp_num != conn->ibQp->qp->qp_num) {
WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num);
continue;
}
if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
// TODO(chhwang): cpu flush
*((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagValue[trigger.fields.connId];
// recv completion
if (conn->ibQp->postRecv(wc->wr_id) != 0) {
WARN("postRecv failed: errno %d", errno);
}
sendState = SEND_STATE_INPROGRESS;
// WARN("rank %d recv completion", rank);
} else if (wc->opcode == IBV_WC_RDMA_WRITE) {
// send completion
*(volatile uint64_t *)(&fifo[fifoTail]) = 0;
fifoTail = (fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
sendState[trigger.fields.connId] = SEND_STATE_INIT;
// WARN("rank %d send completion", rank);
}
}
}
#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1)
if (trigger.value[0] == 0) continue;
// Poll completions
wcNum = conn->ibQp->pollCq();
if (wcNum < 0) {
WARN("rank %d pollCq failed: errno %d", rank, errno);
} else {
struct mscclppConn *conn = &comm->conns[trigger.fields.connId];
if (trigger.fields.type & mscclppData) {
conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize,
/*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/false);
}
if (trigger.fields.type & mscclppFlag) {
// My local flag is copied to the peer's proxy flag
conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t),
/*wrId=*/0, /*offset=*/0, /*signaled=*/true);
}
int ret;
if ((ret = conn->ibQp->postSend()) != 0) {
// Return value is errno.
WARN("postSend failed: errno %d", ret);
}
// Wait for completion
if (trigger.fields.type & mscclppSync) {
bool waiting = true;
while (waiting) {
wcNum = conn->ibQp->pollCq();
if (wcNum < 0) {
WARN("rank %d pollCq failed: errno %d", rank, errno);
continue;
}
for (int i = 0; i < wcNum; ++i) {
struct ibv_wc *wc = &conn->ibQp->wcs[i];
if (wc->status != IBV_WC_SUCCESS) {
@@ -174,80 +248,19 @@ void* mscclppProxyServiceIb(void* _args) {
WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num);
continue;
}
if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
// TODO(chhwang): cpu flush
*((volatile uint64_t *)conn->cpuProxyFlag) = ++currentProxyFlagVlaue;
// recv completion
if (conn->ibQp->postRecv(wc->wr_id) != 0) {
WARN("postRecv failed: errno %d", errno);
}
// WARN("rank %d recv completion", rank);
} else if (wc->opcode == IBV_WC_RDMA_WRITE) {
if (wc->opcode == IBV_WC_RDMA_WRITE) {
// send completion
volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]);
*tmp = 0;
conn->fifoTail++;
if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE)
conn->fifoTail = 0;
sendState = SEND_STATE_INIT;
// WARN("rank %d send completion", rank);
waiting = false;
break;
}
}
}
#else // (MSCCLPP_PROXY_FLAG_SET_BY_RDMA == 1)
// Poll to see if we are ready to send anything
trigger.value = *(volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]);
if (trigger.value == 0) continue;
if (trigger.fields.type & mscclppData) {
conn->ibQp->stageSend(conn->ibBuffMr, &conn->ibBuffMrInfo, (uint32_t)trigger.fields.dataSize,
/*wrId=*/0, /*offset=*/trigger.fields.dataOffset, /*signaled=*/false);
}
if (trigger.fields.type & mscclppFlag) {
// My local flag is copied to the peer's proxy flag
conn->ibQp->stageSend(conn->ibLocalFlagMr, &conn->ibProxyFlagMrInfo, sizeof(uint64_t),
/*wrId=*/0, /*offset=*/0, /*signaled=*/true);
}
if (conn->ibQp->postSend() != 0) {
WARN("postSend failed: errno %d", errno);
}
// Wait for completion
if (trigger.fields.type & mscclppSync) {
bool waiting = true;
while (waiting) {
wcNum = conn->ibQp->pollCq();
if (wcNum < 0) {
WARN("rank %d pollCq failed: errno %d", rank, errno);
continue;
}
for (int i = 0; i < wcNum; ++i) {
struct ibv_wc *wc = &conn->ibQp->wcs[i];
if (wc->status != IBV_WC_SUCCESS) {
WARN("rank %d wc status %d", rank, wc->status);
continue;
}
if (wc->qp_num != conn->ibQp->qp->qp_num) {
WARN("rank %d got wc of unknown qp_num %d", rank, wc->qp_num);
continue;
}
if (wc->opcode == IBV_WC_RDMA_WRITE) {
// send completion
waiting = false;
break;
}
}
}
}
// Send completion
volatile uint64_t *tmp = (volatile uint64_t *)(&conn->cpuTriggerFifo[conn->fifoTail]);
*tmp = 0;
conn->fifoTail++;
if (conn->fifoTail == MSCCLPP_PROXY_FIFO_SIZE)
conn->fifoTail = 0;
#endif
}
// Send completion: reset only the high 64 bits
*(volatile uint64_t *)(&fifo[*fifoTail]) = 0;
*fifoTail = (*fifoTail + 1) % MSCCLPP_PROXY_FIFO_SIZE;
#endif
}
*run = MSCCLPP_PROXY_RUN_STATE_IDLE;
// WARN("Proxy exits: rank %d", rank);
@@ -256,9 +269,8 @@ void* mscclppProxyServiceIb(void* _args) {
void* mscclppProxyService(void* _args) {
struct proxyArgs *args = (struct proxyArgs *)_args;
struct mscclppIbContext *ibCtx = args->ibCtx;
void *ret;
if (ibCtx == NULL) {
if (args->proxyState->ibContext == NULL) {
ret = mscclppProxyServiceP2P(_args);
} else {
ret = mscclppProxyServiceIb(_args);
@@ -266,69 +278,43 @@ void* mscclppProxyService(void* _args) {
return ret;
}
// mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses) {
// comm->proxyState.listenSock = sock;
// comm->proxyState.peerAddresses = peerAddresses;
// return mscclppSuccess;
// }
mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) {
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i] == NULL) continue;
if (comm->proxyState[i].threads == NULL) {
MSCCLPPCHECK(mscclppCalloc(&comm->proxyState[i].threads, 1));
}
if (comm->proxyState[i].runs == NULL) {
MSCCLPPCHECK(mscclppCalloc(&comm->proxyState[i].runs, 1));
}
// Create IB proxy threads
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState *proxyState = comm->proxyState[i];
if (proxyState == NULL) break;
bool is_p2p = (proxyState->ibContext == NULL);
struct proxyArgs *args;
MSCCLPPCHECK(mscclppCalloc(&args, 1));
args->comm = comm;
args->ibCtx = comm->ibContext[i];
args->run = comm->proxyState[i].runs;
*args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
pthread_create(comm->proxyState[i].threads, NULL, mscclppProxyService, args);
mscclppSetThreadName(comm->proxyState[i].threads[0], "MSCCLPP Service IB - %02d", i);
args->proxyState = proxyState;
if (is_p2p) {
CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking));
}
proxyState->run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
pthread_create(&proxyState->thread, NULL, mscclppProxyService, args);
if (is_p2p) {
mscclppSetThreadName(proxyState->thread, "MSCCLPP Service P2P - %02d", comm->cudaDev);
} else {
mscclppSetThreadName(proxyState->thread, "MSCCLPP Service IB - %02d", i);
}
}
// P2P proxy
mscclppProxyState *proxyState = &comm->proxyState[MSCCLPP_IB_MAX_DEVS];
if (proxyState->threads == NULL) {
MSCCLPPCHECK(mscclppCalloc(&proxyState->threads, 1));
}
if (proxyState->runs == NULL) {
MSCCLPPCHECK(mscclppCalloc(&proxyState->runs, 1));
}
// Create P2P DMA proxy thread
struct proxyArgs *args;
MSCCLPPCHECK(mscclppCalloc(&args, 1));
args->comm = comm;
args->ibCtx = NULL;
args->run = proxyState->runs;
args->connIdx = -1; // unused
CUDACHECK(cudaStreamCreateWithFlags(&args->stream, cudaStreamNonBlocking));
*args->run = MSCCLPP_PROXY_RUN_STATE_RUNNING;
pthread_create(proxyState->threads, NULL, mscclppProxyService, args);
mscclppSetThreadName(proxyState->threads[0], "MSCCLPP Service P2P - %02d", comm->cudaDev);
return mscclppSuccess;
}
static void _stopProxy(struct mscclppComm* comm, int devIdx, int connIdx) {
volatile int *run = (volatile int *)&comm->proxyState[devIdx].runs[connIdx];
if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) return;
*run = MSCCLPP_PROXY_RUN_STATE_EXITING;
while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) {
usleep(1000);
}
}
mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) {
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i] != NULL) {
_stopProxy(comm, i, 0);
for (int i = 0; i < MSCCLPP_PROXY_MAX_NUM; ++i) {
struct mscclppProxyState *proxyState = comm->proxyState[i];
if (proxyState == NULL) break;
volatile int *run = (volatile int *)&proxyState->run;
if (*run == MSCCLPP_PROXY_RUN_STATE_IDLE) {
continue;
}
*run = MSCCLPP_PROXY_RUN_STATE_EXITING;
while (*run == MSCCLPP_PROXY_RUN_STATE_EXITING && *comm->abortFlag == 0) {
usleep(1000);
}
}
// P2P proxies
_stopProxy(comm, MSCCLPP_IB_MAX_DEVS, 0);
return mscclppSuccess;
}

View File

@@ -51,14 +51,11 @@ __global__ void kernel(int rank, int world_size)
mscclppDevConn_t devConn = constDevConns[remoteRank];
volatile int *data = (volatile int *)devConn.localBuff;
volatile uint64_t *localFlag = devConn.localFlag;
#if (USE_DMA_FOR_P2P == 0)
volatile uint64_t *remoteFlag = devConn.remoteFlag;
#endif
volatile uint64_t *proxyFlag = devConn.proxyFlag;
int curFifoHead = *devConn.triggerFifoHead;
volatile uint64_t *trig = (volatile uint64_t *)&devConn.trigger[curFifoHead];
curFifoHead += 1;
if (curFifoHead == MSCCLPP_PROXY_FIFO_SIZE)
curFifoHead = 0;
*devConn.triggerFifoHead = curFifoHead;
mscclppTrigger *trig = devConn.getTrigger();
uint64_t baseFlag = *localFlag;
@@ -78,12 +75,10 @@ __global__ void kernel(int rank, int world_size)
#if (USE_DMA_FOR_P2P == 1)
// Wait until the proxy have sent my data and flag
while (*trig != 0) {}
devConn.waitTrigger(trig);
// Trigger sending data and flag
uint64_t dataOffset = rank * sizeof(int);
uint64_t dataSize = sizeof(int);
*trig = TRIGGER_VALUE(mscclppFlag | mscclppData, dataOffset, dataSize);
devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int));
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {}
@@ -91,13 +86,11 @@ __global__ void kernel(int rank, int world_size)
#else // USE_DMA_FOR_P2P == 0
if (devConn.remoteBuff == NULL) { // IB
// Trigger sending data and flag
uint64_t dataOffset = rank * sizeof(int);
uint64_t dataSize = sizeof(int);
*trig = TRIGGER_VALUE(mscclppSync | mscclppFlag | mscclppData, dataOffset, dataSize);
// Wait until the proxy have sent my data and flag
while (*trig != 0) {}
devConn.waitTrigger(trig);
// Trigger sending data and flag
devConn.setTrigger(trig, mscclppFlag | mscclppData, rank * sizeof(int), sizeof(int));
// Wait for receiving data from remote rank
while (*proxyFlag == baseFlag) {}
@@ -259,7 +252,7 @@ int main(int argc, const char *argv[])
CUDACHECK(cudaEventCreate(&ev_end));
// warm up
int warmupiter = 10;
// int warmupiter = 10;
// for (int i = 0; i < warmupiter; ++i) {
// kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
// }
@@ -268,14 +261,14 @@ int main(int argc, const char *argv[])
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
int cudagraphiter = 10;
int cudagraphiter = 100;
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
int cudagraphwarmup = 20;
int cudagraphwarmup = 10;
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
}