diff --git a/Makefile b/Makefile index 3369dc10..573ab85f 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ MSCCLPP_MINOR := 1 DEBUG ?= 0 VERBOSE ?= 1 TRACE ?= 0 -USE_MPI_FOR_TESTS ?= 0 +USE_MPI_FOR_TESTS ?= 1 ######## CUDA CUDA_HOME ?= /usr/local/cuda @@ -116,7 +116,7 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR) LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR) TESTSDIR := tests -TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu) +TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test_mpi.cu) TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS)) TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) diff --git a/scripts/tests.sh b/scripts/tests.sh index f97e3eda..f721f01a 100755 --- a/scripts/tests.sh +++ b/scripts/tests.sh @@ -7,4 +7,4 @@ mpirun -allow-run-as-root \ -bind-to numa \ -x MSCCLPP_DEBUG_SUBSYS=ALL \ -x MSCCLPP_SOCKET_IFNAME=eth0 \ - ./build/bin/tests/p2p_test 172.17.0.4:50000 + ./build/bin/tests/p2p_test_mpi 172.17.0.4:50000 diff --git a/src/bootstrap/init.cc b/src/bootstrap/init.cc index 95cf0551..0a649c1c 100644 --- a/src/bootstrap/init.cc +++ b/src/bootstrap/init.cc @@ -64,9 +64,9 @@ MSCCLPP_API(mscclppResult_t, mscclppCommInitRank, mscclppComm_t* comm, int nrank mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair){ mscclppResult_t res = mscclppSuccess; mscclppComm_t _comm = NULL; - uint64_t hash = getHostHash(); - uint64_t *hashes; - std::map hashToNode; + // uint64_t hash = getHostHash(); + // uint64_t *hashes; + // std::map hashToNode; MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail); _comm->rank = rank; @@ -80,36 +80,36 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, c MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t **)&_comm->abortFlag, 1), res, fail); MSCCLPPCHECK(bootstrapInit(&handle, _comm)); - _comm->maxLocalRanks = 8; - MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToNode, nranks), res, fail); - MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToLocalRank, nranks), res, fail); - MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->localRankToRank, _comm->maxLocalRanks), res, fail); + // _comm->maxLocalRanks = 8; + // MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToNode, nranks), res, fail); + // MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToLocalRank, nranks), res, fail); + // MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->localRankToRank, _comm->maxLocalRanks), res, fail); - MSCCLPPCHECKGOTO(mscclppCalloc(&hashes, nranks), res, fail); - hashes[rank] = hash; - MSCCLPPCHECK(bootstrapAllGather(_comm->bootstrap, hashes, sizeof(uint64_t))); + // MSCCLPPCHECKGOTO(mscclppCalloc(&hashes, nranks), res, fail); + // hashes[rank] = hash; + // MSCCLPPCHECK(bootstrapAllGather(_comm->bootstrap, hashes, sizeof(uint64_t))); - for (int i = 0; i < nranks; ++i) { - auto it = hashToNode.find(hashes[i]); - if (it == hashToNode.end()) { - _comm->nNodes++; - hashToNode[hashes[i]] = _comm->nNodes - 1; - _comm->rankToNode[i] = _comm->nNodes - 1; - } else { - _comm->rankToNode[i] = it->second; - } - if (hashes[i] == hash) { - _comm->rankToLocalRank[i] = _comm->localRanks++; - _comm->localRankToRank[_comm->rankToLocalRank[i]] = i; - } - } - if (_comm->localRanks > _comm->maxLocalRanks) { - WARN("Too many ranks on the same host: %d", _comm->localRanks); - res = mscclppInvalidUsage; - goto fail; - } - _comm->node = _comm->rankToNode[rank]; - _comm->localRank = _comm->rankToLocalRank[rank]; + // for (int i = 0; i < nranks; ++i) { + // auto it = hashToNode.find(hashes[i]); + // if (it == hashToNode.end()) { + // _comm->nNodes++; + // hashToNode[hashes[i]] = _comm->nNodes - 1; + // _comm->rankToNode[i] = _comm->nNodes - 1; + // } else { + // _comm->rankToNode[i] = it->second; + // } + // if (hashes[i] == hash) { + // _comm->rankToLocalRank[i] = _comm->localRanks++; + // _comm->localRankToRank[_comm->rankToLocalRank[i]] = i; + // } + // } + // if (_comm->localRanks > _comm->maxLocalRanks) { + // WARN("Too many ranks on the same host: %d", _comm->localRanks); + // res = mscclppInvalidUsage; + // goto fail; + // } + // _comm->node = _comm->rankToNode[rank]; + // _comm->localRank = _comm->rankToLocalRank[rank]; *comm = _comm; return res; @@ -141,21 +141,31 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){ return mscclppSuccess; } -MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, int remoteRank, - void *buff, size_t buffSize, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev); -mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, void *buff, size_t buffSize, - int *flag, int tag, mscclppTransport_t transportType, const char *ibDev/*=NULL*/) +MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, mscclppDevConn* devConnOut, int remoteRank, + void* localBuff, size_t buffSize, int* localFlag, int tag, mscclppTransport_t transportType, const char *ibDev); +mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, int remoteRank, void* localBuff, size_t buffSize, + int* localFlag, int tag, mscclppTransport_t transportType, const char *ibDev/*=NULL*/) { + if (comm->nConns == MAXCONNECTIONS) { + WARN("Too many connections made"); + return mscclppInternalError; + } + if (devConnOut == NULL) { + WARN("devConnOut is the output of this function and needs to be allocated by the user"); + return mscclppInvalidUsage; + } struct mscclppConn *conn = &comm->conns[comm->nConns++]; conn->transport = transportType; conn->remoteRank = remoteRank; - conn->tag = tag; - conn->buff = buff; conn->buffSize = buffSize; - conn->flag = flag; + conn->devConn = devConnOut; + conn->devConn->localBuff = localBuff; + conn->devConn->localFlag = localFlag; + conn->devConn->tag = tag; + MSCCLPPCHECK(mscclppCudaHostCalloc(&conn->devConn->trigger, 1)); + conn->ibCtx = NULL; conn->ibQp = NULL; - if (ibDev != NULL) { // Check if an IB context exists int ibDevIdx = -1; @@ -196,70 +206,95 @@ struct connInfo { mscclppIbMrInfo infoRemoteFlagMr; }; +mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/){ + if (connInfo == NULL || conn == NULL){ + WARN("connInfo or connection cannot be null"); + return mscclppInternalError; + } + CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleBuff, conn->devConn->localBuff)); + CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleFlag, conn->devConn->localFlag)); + return mscclppSuccess; +} + +mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/){ + if (connInfo == NULL || conn == NULL){ + WARN("ipcHandles or connection cannot be null"); + return mscclppInternalError; + } + CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->devConn->remoteBuff, connInfo->handleBuff, cudaIpcMemLazyEnablePeerAccess)); + CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->devConn->remoteFlag, connInfo->handleFlag, cudaIpcMemLazyEnablePeerAccess)); + return mscclppSuccess; +} + +mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/){ + if (connInfo == NULL || conn == NULL){ + WARN("connInfo or connection cannot be null"); + return mscclppInternalError; + } + struct mscclppDevConn *devConn = conn->devConn; + devConn->remoteBuff = NULL; + MSCCLPPCHECK(mscclppCudaCalloc(&devConn->remoteFlag, 1)); + + struct mscclppIbContext *ibCtx = conn->ibCtx; + if (conn->ibQp == NULL) { + MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp)); + } + MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localBuff, conn->buffSize, &conn->ibBuffMr)); + MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localFlag, sizeof(int), &conn->ibLocalFlagMr)); + MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->remoteFlag, sizeof(int), &conn->ibRemoteFlagMr)); + connInfo->infoQp = conn->ibQp->info; + connInfo->infoBuffMr = conn->ibBuffMr->info; + connInfo->infoLocalFlagMr = conn->ibLocalFlagMr->info; + connInfo->infoRemoteFlagMr = conn->ibRemoteFlagMr->info; + return mscclppSuccess; +} + +mscclppResult_t mscclppIbConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/){ + if (connInfo == NULL || conn == NULL){ + WARN("ipcHandles or connection cannot be null"); + return mscclppInternalError; + } + if (conn->ibQp->rtr(&connInfo->infoQp) != 0) { + WARN("Failed to transition QP to RTR"); + return mscclppInvalidUsage; + } + if (conn->ibQp->rts() != 0) { + WARN("Failed to transition QP to RTS"); + return mscclppInvalidUsage; + } + conn->ibBuffMrInfo = connInfo->infoBuffMr; + conn->ibLocalFlagMrInfo = connInfo->infoLocalFlagMr; + conn->ibRemoteFlagMrInfo = connInfo->infoRemoteFlagMr; + return mscclppSuccess; +} + MSCCLPP_API(mscclppResult_t, mscclppConnectionSetup, mscclppComm_t comm); mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm) { - // Allocate connection info to be shared with GPU - MSCCLPPCHECK(mscclppCudaHostCalloc(&comm->devConns, comm->nConns)); - // Send info to peers for (int i = 0; i < comm->nConns; ++i) { struct mscclppConn *conn = &comm->conns[i]; - struct mscclppDevConn *devConn = &comm->devConns[i]; - conn->devConn = devConn; - devConn->tag = conn->tag; - devConn->localBuff = conn->buff; - devConn->localFlag = conn->flag; - MSCCLPPCHECK(mscclppCudaHostCalloc(&devConn->trigger, 1)); struct connInfo cInfo; if (conn->transport == mscclppTransportP2P) { - CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleBuff, devConn->localBuff)); - CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleFlag, devConn->localFlag)); + MSCCLPPCHECK(mscclppP2pConnectionSetupStart(&cInfo, conn)); } else if (conn->transport == mscclppTransportIB) { - devConn->remoteBuff = NULL; - MSCCLPPCHECK(mscclppCudaCalloc(&devConn->remoteFlag, 1)); - - struct mscclppIbContext *ibCtx = conn->ibCtx; - if (conn->ibQp == NULL) { - MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp)); - } - MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localBuff, conn->buffSize, &conn->ibBuffMr)); - MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localFlag, sizeof(int), &conn->ibLocalFlagMr)); - MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->remoteFlag, sizeof(int), &conn->ibRemoteFlagMr)); - cInfo.infoQp = conn->ibQp->info; - cInfo.infoBuffMr = conn->ibBuffMr->info; - cInfo.infoLocalFlagMr = conn->ibLocalFlagMr->info; - cInfo.infoRemoteFlagMr = conn->ibRemoteFlagMr->info; + MSCCLPPCHECK(mscclppIbConnectionSetupStart(&cInfo, conn)); } - MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->remoteRank, conn->tag, &cInfo, sizeof(cInfo))); + MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->remoteRank, conn->devConn->tag, &cInfo, sizeof(cInfo))); } // Recv info from peers for (int i = 0; i < comm->nConns; ++i) { struct mscclppConn *conn = &comm->conns[i]; - struct mscclppDevConn *devConn = &comm->devConns[i]; - struct connInfo cInfo; - MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->remoteRank, conn->tag, &cInfo, sizeof(cInfo))); + MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->remoteRank, conn->devConn->tag, &cInfo, sizeof(cInfo))); if (conn->transport == mscclppTransportP2P) { - CUDACHECK(cudaIpcOpenMemHandle(&devConn->remoteBuff, cInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess)); - CUDACHECK(cudaIpcOpenMemHandle((void **)&devConn->remoteFlag, cInfo.handleFlag, cudaIpcMemLazyEnablePeerAccess)); + MSCCLPPCHECK(mscclppP2pConnectionSetupEnd(&cInfo, conn)); } else if (conn->transport == mscclppTransportIB) { - if (conn->ibQp->rtr(&cInfo.infoQp) != 0) { - WARN("Failed to transition QP to RTR"); - return mscclppInvalidUsage; - } - if (conn->ibQp->rts() != 0) { - WARN("Failed to transition QP to RTS"); - return mscclppInvalidUsage; - } - conn->ibBuffMrInfo = cInfo.infoBuffMr; - conn->ibLocalFlagMrInfo = cInfo.infoLocalFlagMr; - conn->ibRemoteFlagMrInfo = cInfo.infoRemoteFlagMr; + MSCCLPPCHECK(mscclppIbConnectionSetupEnd(&cInfo, conn)); } } - return mscclppSuccess; } @@ -276,24 +311,3 @@ mscclppResult_t mscclppProxyStop(mscclppComm_t comm) MSCCLPPCHECK(mscclppProxyDestroy(comm)); return mscclppSuccess; } - -MSCCLPP_API(mscclppResult_t, mscclppGetLocalRank, mscclppComm_t comm, int *localRank); -mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank) -{ - *localRank = comm->localRank; - return mscclppSuccess; -} - -MSCCLPP_API(mscclppResult_t, mscclppGetNodeFromRank, mscclppComm_t comm, int rank, int *node); -mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node) -{ - *node = comm->rankToNode[rank]; - return mscclppSuccess; -} - -MSCCLPP_API(mscclppResult_t, mscclppGetDevConns, mscclppComm_t comm, mscclppDevConn_t* devConns); -mscclppResult_t mscclppGetDevConns(mscclppComm_t comm, mscclppDevConn_t* devConns) -{ - *devConns = comm->devConns; - return mscclppSuccess; -} diff --git a/src/include/comm.h b/src/include/comm.h index df5270f9..af9622c8 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -161,10 +161,7 @@ struct mscclppConn { mscclppTransport_t transport; int remoteRank; - int tag; - void* buff; int buffSize; - int* flag; struct mscclppDevConn *devConn; struct mscclppIbContext *ibCtx; struct mscclppIbQp *ibQp; @@ -186,7 +183,6 @@ struct mscclppComm { // struct mscclppTopoSystem* topo; struct mscclppConn conns[MAXCONNECTIONS]; - struct mscclppDevConn *devConns; int nConns; // mscclppNet_t* mscclppNet; @@ -205,14 +201,14 @@ struct mscclppComm { // int64_t busId; // my PCI bus ID in int format // cpu_set_t cpuAffinity; // CPU affinity of the GPU - int node; - int nNodes; - int localRank; - int localRanks; - int maxLocalRanks; - int* rankToNode; - int* rankToLocalRank; - int* localRankToRank; + // int node; + // int nNodes; + // int localRank; + // int localRanks; + // int maxLocalRanks; + // int* rankToNode; + // int* rankToLocalRank; + // int* localRankToRank; // // localRanks and localRanktoRank for all nodes // struct mscclppNodeRanks* nodeRanks; diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index a7d2fbf8..e8a85494 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -46,7 +46,7 @@ struct mscclppDevConn { }; typedef struct mscclppComm* mscclppComm_t; -typedef struct mscclppDevConn* mscclppDevConn_t; +typedef struct mscclppDevConn mscclppDevConn_t; #define MSCCLPP_UNIQUE_ID_BYTES 128 typedef struct { char internal[MSCCLPP_UNIQUE_ID_BYTES]; } mscclppUniqueId; @@ -113,8 +113,8 @@ mscclppResult_t mscclppBootStrapAllGather(mscclppComm_t comm, void* data, int si mscclppResult_t mscclppCommDestroy(mscclppComm_t comm); -mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, void *buff, size_t buffSize, int *flag, - int tag, mscclppTransport_t transportType, const char *ibDev=NULL); +mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, int remoteRank, void* localBuff, size_t buffSize, + int* localFlag, int tag, mscclppTransport_t transportType, const char *ibDev=NULL); mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm); @@ -122,10 +122,6 @@ mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm); mscclppResult_t mscclppProxyStop(mscclppComm_t comm); -mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank); -mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node); -mscclppResult_t mscclppGetDevConns(mscclppComm_t comm, mscclppDevConn_t* devConns); - #ifdef __cplusplus } // end extern "C" #endif diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu deleted file mode 100644 index 80620a99..00000000 --- a/tests/p2p_test.cu +++ /dev/null @@ -1,207 +0,0 @@ -#include "mscclpp.h" -#ifdef MSCCLPP_USE_MPI_FOR_TESTS -#include "mpi.h" -#endif // MSCCLPP_USE_MPI_FOR_TESTS -#include -#include -#include -#include - -#define RANKS_PER_NODE 8 -#define TEST_CONN_TYPE 1 // 0: P2P(for local)+IB(for remote), 1: IB-Only - -// Check CUDA RT calls -#define CUDACHECK(cmd) do { \ - cudaError_t err = cmd; \ - if( err != cudaSuccess ) { \ - printf("%s:%d Cuda failure '%s'", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ -} while(false) - -__global__ void kernel(mscclppDevConn_t devConns, int rank, int world_size) -{ - int warpId = threadIdx.x / 32; - int remoteRank = (warpId < rank) ? warpId : warpId + 1; - mscclppDevConn_t devConn = &devConns[(remoteRank < rank) ? remoteRank : remoteRank - 1]; - volatile int *data = (volatile int *)devConn->localBuff; - volatile int *localFlag = devConn->localFlag; - volatile int *remoteFlag = devConn->remoteFlag; - volatile uint64_t *trig = (volatile uint64_t *)devConn->trigger; - - if (threadIdx.x == 0) { - // Set my data and flag - *(data + rank) = rank + 1; - __threadfence_system(); - *localFlag = 1; - } - __syncthreads(); - - // Each warp receives data from different ranks - if (threadIdx.x % 32 == 0) { - if (devConn->remoteBuff == NULL) { // IB - // Trigger sending data and flag - uint64_t dataOffset = rank * sizeof(int); - uint64_t dataSize = sizeof(int); - *trig = (dataOffset << 32) + dataSize; - - // Wait until the proxy have sent my data and flag - while (*trig != 0) {} - - // Wait for receiving data from remote rank - while (*remoteFlag != 1) {} - } else { // P2P - // Directly read data - volatile int *remoteData = (volatile int *)devConn->remoteBuff; - - // Wait until the remote data is set - while (*remoteFlag != 1) {} - - // Read remote data - data[remoteRank] = remoteData[remoteRank]; - } - } -} - -int rankToLocalRank(int rank) -{ - return rank % RANKS_PER_NODE; -} - -int rankToNode(int rank) -{ - return rank / RANKS_PER_NODE; -} - -void print_usage(const char *prog) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - printf("usage: %s IP:PORT [rank nranks]\n", prog); -#else - printf("usage: %s IP:PORT rank nranks\n", prog); -#endif -} - -int main(int argc, const char *argv[]) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc != 2 && argc != 4) { - print_usage(argv[0]); - return -1; - } - const char *ip_port = argv[1]; - int rank; - int world_size; - if (argc == 4) { - rank = atoi(argv[2]); - world_size = atoi(argv[3]); - } else { - MPI_Init(NULL, NULL); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - } -#else - if (argc != 4) { - print_usage(argv[0]); - return -1; - } - const char *ip_port = argv[1]; - int rank = atoi(argv[2]); - int world_size = atoi(argv[3]); -#endif - int localRank = rankToLocalRank(rank); - int thisNode = rankToNode(rank); - - mscclppComm_t comm; - mscclppResult_t res = mscclppCommInitRank(&comm, world_size, rank, ip_port); - if (res != mscclppSuccess) { - printf("mscclppCommInitRank failed\n"); - return -1; - } - - CUDACHECK(cudaSetDevice(localRank)); - - int *data_d; - int *flag_d; - CUDACHECK(cudaMalloc(&data_d, sizeof(int) * world_size)); - CUDACHECK(cudaMalloc(&flag_d, sizeof(int))); - CUDACHECK(cudaMemset(data_d, 0, sizeof(int) * world_size)); - CUDACHECK(cudaMemset(flag_d, 0, sizeof(int))); - - std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); - - for (int r = 0; r < world_size; ++r) { - if (r == rank) continue; - mscclppTransport_t transportType = mscclppTransportIB; - const char *ibDev = ibDevStr.c_str(); -#if (TEST_CONN_TYPE == 0) // P2P+IB - if (rankToNode(r) == thisNode) { - transportType = mscclppTransportP2P; - ibDev = NULL; - } -#endif - // Connect with all other ranks - res = mscclppConnect(comm, r, data_d, sizeof(int) * world_size, flag_d, 0, transportType, ibDev); - if (res != mscclppSuccess) { - printf("mscclppConnect failed\n"); - return -1; - } - } - - res = mscclppConnectionSetup(comm); - if (res != mscclppSuccess) { - printf("mscclppConnectionSetup failed\n"); - return -1; - } - - res = mscclppProxyLaunch(comm); - if (res != mscclppSuccess) { - printf("mscclppProxyLaunch failed\n"); - return -1; - } - - mscclppDevConn_t devConns; - mscclppGetDevConns(comm, &devConns); - - kernel<<<1, 32 * (world_size - 1)>>>(devConns, rank, world_size); - CUDACHECK(cudaDeviceSynchronize()); - - res = mscclppProxyStop(comm); - if (res != mscclppSuccess) { - printf("mscclppProxyStop failed\n"); - return -1; - } - - // Read results from GPU - int *buf = (int *)calloc(world_size, sizeof(int)); - if (buf == nullptr) { - printf("calloc failed\n"); - return -1; - } - CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost)); - - bool failed = false; - for (int i = 0; i < world_size; ++i) { - if (buf[i] != i + 1) { - printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1); - failed = true; - } - } - if (failed) { - return -1; - } - - res = mscclppCommDestroy(comm); - if (res != mscclppSuccess) { - printf("mscclppDestroy failed\n"); - return -1; - } - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc == 2) { - MPI_Finalize(); - } -#endif - printf("Succeeded! %d\n", rank); - return 0; -} diff --git a/tests/p2p_test_mpi.cu b/tests/p2p_test_mpi.cu index 975436a7..6fd14e95 100644 --- a/tests/p2p_test_mpi.cu +++ b/tests/p2p_test_mpi.cu @@ -1,113 +1,169 @@ #include "mscclpp.h" +#ifdef MSCCLPP_USE_MPI_FOR_TESTS #include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS #include #include #include +#include + +#define RANKS_PER_NODE 8 +#define TEST_CONN_TYPE 1 // 0: P2P(for local)+IB(for remote), 1: IB-Only + +#define MSCCLPPCHECK(call) do { \ + mscclppResult_t res = call; \ + if (res != mscclppSuccess && res != mscclppInProgress) { \ + /* Print the back trace*/ \ + printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \ + return res; \ + } \ +} while (0); // Check CUDA RT calls -#define CUDACHECK(cmd) do { \ - cudaError_t err = cmd; \ - if( err != cudaSuccess ) { \ - printf("Cuda failure '%s'", cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ +#define CUDACHECK(cmd) do { \ + cudaError_t err = cmd; \ + if( err != cudaSuccess ) { \ + printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ } while(false) -__global__ void kernel(mscclppDevConn_t devConns, int rank, int world_size) +__constant__ mscclppDevConn_t constDevConns[8]; + +__global__ void kernel(int rank, int world_size) { - int tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid == 0) { - // Set my data - volatile int *data = (volatile int *)devConns[rank].localBuff; - volatile int *flag = (volatile int *)devConns[rank].localFlag; - data[rank] = rank; + int warpId = threadIdx.x / 32; + int remoteRank = (warpId < rank) ? warpId : warpId + 1; + mscclppDevConn_t devConn = constDevConns[remoteRank]; + volatile int *data = (volatile int *)devConn.localBuff; + volatile int *localFlag = devConn.localFlag; + volatile int *remoteFlag = devConn.remoteFlag; + volatile uint64_t *trig = (volatile uint64_t *)devConn.trigger; - // Inform that the data is set - *flag = 1; + if (threadIdx.x == 0) { + // Set my data and flag + *(data + rank) = rank + 1; + __threadfence_system(); + *localFlag = 1; + } + __syncthreads(); - for (int i = 0; i < (world_size - 1) * 2; ++i) { - mscclppDevConn_t devConn = &devConns[i]; - int tag = devConn->tag; - int rankRecv = tag / world_size; - int rankSend = tag % world_size; + // Each warp receives data from different ranks + if (threadIdx.x % 32 == 0) { + if (devConn.remoteBuff == NULL) { // IB + // Trigger sending data and flag + uint64_t dataOffset = rank * sizeof(int); + uint64_t dataSize = sizeof(int); + *trig = (dataOffset << 32) + dataSize; - if (rankRecv != rank) continue; + // Wait until the proxy have sent my data and flag + while (*trig != 0) {} - volatile int *remoteData = (volatile int *)devConn->remoteBuff; - volatile int *remoteFlag = (volatile int *)devConn->remoteFlag; + // Wait for receiving data from remote rank + while (*remoteFlag != 1) {} + } else { // P2P + // Directly read data + volatile int *remoteData = (volatile int *)devConn.remoteBuff; // Wait until the remote data is set while (*remoteFlag != 1) {} // Read remote data - data[rankSend] = remoteData[rankSend]; + data[remoteRank] = remoteData[remoteRank]; } } } +int rankToLocalRank(int rank) +{ + return rank % RANKS_PER_NODE; +} + +int rankToNode(int rank) +{ + return rank / RANKS_PER_NODE; +} + void print_usage(const char *prog) { - printf("usage: %s IP:PORT\n", prog); +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT [rank nranks]\n", prog); +#else + printf("usage: %s IP:PORT rank nranks\n", prog); +#endif } int main(int argc, const char *argv[]) { - if (argc != 2) { +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc != 2 && argc != 4) { print_usage(argv[0]); return -1; } - - MPI_Init(NULL, NULL); - + const char *ip_port = argv[1]; int rank; int world_size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); + if (argc == 4) { + rank = atoi(argv[2]); + world_size = atoi(argv[3]); + } else { + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + } +#else + if (argc != 4) { + print_usage(argv[0]); + return -1; + } + const char *ip_port = argv[1]; + int rank = atoi(argv[2]); + int world_size = atoi(argv[3]); +#endif + int localRank = rankToLocalRank(rank); + int thisNode = rankToNode(rank); + CUDACHECK(cudaSetDevice(localRank)); mscclppComm_t comm; - const char *ip_port = argv[1]; - mscclppCommInitRank(&comm, world_size, rank, ip_port); + MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); int *data_d; int *flag_d; - CUDACHECK(cudaMalloc(&data_d, sizeof(int) * world_size)); + size_t data_size = sizeof(int) * world_size; + CUDACHECK(cudaMalloc(&data_d, data_size)); CUDACHECK(cudaMalloc(&flag_d, sizeof(int))); - printf("-------- buf: %p, flag: %p\n", data_d, flag_d); + CUDACHECK(cudaMemset(data_d, 0, data_size)); + CUDACHECK(cudaMemset(flag_d, 0, sizeof(int))); - mscclppResult_t res; + std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); - // Read from all other ranks + mscclppDevConn_t devConns[8]; for (int r = 0; r < world_size; ++r) { if (r == rank) continue; - int tag = rank * world_size + r; - res = mscclppConnect(comm, rank, r, data_d, flag_d, tag, mscclppTransportP2P); - if (res != mscclppSuccess) { - printf("mscclppConnect failed\n"); - return -1; + mscclppTransport_t transportType = mscclppTransportIB; + const char *ibDev = ibDevStr.c_str(); +#if (TEST_CONN_TYPE == 0) // P2P+IB + if (rankToNode(r) == thisNode) { + transportType = mscclppTransportP2P; + ibDev = NULL; } - } - // Let others read from me - for (int r = 0; r < world_size; ++r) { - if (r == rank) continue; - int tag = r * world_size + rank; - res = mscclppConnect(comm, r, rank, data_d, flag_d, tag, mscclppTransportP2P); - if (res != mscclppSuccess) { - printf("mscclppConnect failed\n"); - return -1; - } - } - res = mscclppConnectionSetup(comm); - if (res != mscclppSuccess) { - printf("mscclppConnectionSetup failed\n"); - return -1; +#endif + // Connect with all other ranks + MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, data_d, data_size, flag_d, 0, transportType, ibDev)); } - mscclppDevConn_t devConns; - mscclppGetDevConns(comm, &devConns); + MSCCLPPCHECK(mscclppConnectionSetup(comm)); - kernel<<<1, 1>>>(devConns, rank, world_size); + MSCCLPPCHECK(mscclppProxyLaunch(comm)); + + CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * world_size)); + + kernel<<<1, 32 * (world_size - 1)>>>(rank, world_size); CUDACHECK(cudaDeviceSynchronize()); + MSCCLPPCHECK(mscclppProxyStop(comm)); + + // Read results from GPU int *buf = (int *)calloc(world_size, sizeof(int)); if (buf == nullptr) { printf("calloc failed\n"); @@ -115,21 +171,24 @@ int main(int argc, const char *argv[]) } CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost)); + bool failed = false; for (int i = 0; i < world_size; ++i) { - if (buf[i] != i) { - printf("wrong data: %d, expected %d\n", buf[i], i); - return -1; + if (buf[i] != i + 1) { + printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1); + failed = true; } } - - res = mscclppCommDestroy(comm); - if (res != mscclppSuccess) { - printf("mscclppDestroy failed\n"); + if (failed) { return -1; } - MPI_Finalize(); + MSCCLPPCHECK(mscclppCommDestroy(comm)); +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc == 2) { + MPI_Finalize(); + } +#endif printf("Succeeded! %d\n", rank); return 0; }