Merge branch 'saemal/cleanup' into chhwang/p2p-simple

This commit is contained in:
Changho Hwang
2023-02-22 06:54:51 +00:00
7 changed files with 258 additions and 400 deletions

View File

@@ -6,7 +6,7 @@ MSCCLPP_MINOR := 1
DEBUG ?= 0
VERBOSE ?= 1
TRACE ?= 0
USE_MPI_FOR_TESTS ?= 0
USE_MPI_FOR_TESTS ?= 1
######## CUDA
CUDA_HOME ?= /usr/local/cuda
@@ -116,7 +116,7 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR)
LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR)
TESTSDIR := tests
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu)
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test_mpi.cu)
TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS))
TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS))

View File

@@ -7,4 +7,4 @@ mpirun -allow-run-as-root \
-bind-to numa \
-x MSCCLPP_DEBUG_SUBSYS=ALL \
-x MSCCLPP_SOCKET_IFNAME=eth0 \
./build/bin/tests/p2p_test 172.17.0.4:50000
./build/bin/tests/p2p_test_mpi 172.17.0.4:50000

View File

@@ -64,9 +64,9 @@ MSCCLPP_API(mscclppResult_t, mscclppCommInitRank, mscclppComm_t* comm, int nrank
mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, const char* ip_port_pair){
mscclppResult_t res = mscclppSuccess;
mscclppComm_t _comm = NULL;
uint64_t hash = getHostHash();
uint64_t *hashes;
std::map<uint64_t, int> hashToNode;
// uint64_t hash = getHostHash();
// uint64_t *hashes;
// std::map<uint64_t, int> hashToNode;
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm, 1), res, fail);
_comm->rank = rank;
@@ -80,36 +80,36 @@ mscclppResult_t mscclppCommInitRank(mscclppComm_t* comm, int nranks, int rank, c
MSCCLPPCHECKGOTO(mscclppCudaHostCalloc((uint32_t **)&_comm->abortFlag, 1), res, fail);
MSCCLPPCHECK(bootstrapInit(&handle, _comm));
_comm->maxLocalRanks = 8;
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToNode, nranks), res, fail);
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToLocalRank, nranks), res, fail);
MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->localRankToRank, _comm->maxLocalRanks), res, fail);
// _comm->maxLocalRanks = 8;
// MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToNode, nranks), res, fail);
// MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->rankToLocalRank, nranks), res, fail);
// MSCCLPPCHECKGOTO(mscclppCalloc(&_comm->localRankToRank, _comm->maxLocalRanks), res, fail);
MSCCLPPCHECKGOTO(mscclppCalloc(&hashes, nranks), res, fail);
hashes[rank] = hash;
MSCCLPPCHECK(bootstrapAllGather(_comm->bootstrap, hashes, sizeof(uint64_t)));
// MSCCLPPCHECKGOTO(mscclppCalloc(&hashes, nranks), res, fail);
// hashes[rank] = hash;
// MSCCLPPCHECK(bootstrapAllGather(_comm->bootstrap, hashes, sizeof(uint64_t)));
for (int i = 0; i < nranks; ++i) {
auto it = hashToNode.find(hashes[i]);
if (it == hashToNode.end()) {
_comm->nNodes++;
hashToNode[hashes[i]] = _comm->nNodes - 1;
_comm->rankToNode[i] = _comm->nNodes - 1;
} else {
_comm->rankToNode[i] = it->second;
}
if (hashes[i] == hash) {
_comm->rankToLocalRank[i] = _comm->localRanks++;
_comm->localRankToRank[_comm->rankToLocalRank[i]] = i;
}
}
if (_comm->localRanks > _comm->maxLocalRanks) {
WARN("Too many ranks on the same host: %d", _comm->localRanks);
res = mscclppInvalidUsage;
goto fail;
}
_comm->node = _comm->rankToNode[rank];
_comm->localRank = _comm->rankToLocalRank[rank];
// for (int i = 0; i < nranks; ++i) {
// auto it = hashToNode.find(hashes[i]);
// if (it == hashToNode.end()) {
// _comm->nNodes++;
// hashToNode[hashes[i]] = _comm->nNodes - 1;
// _comm->rankToNode[i] = _comm->nNodes - 1;
// } else {
// _comm->rankToNode[i] = it->second;
// }
// if (hashes[i] == hash) {
// _comm->rankToLocalRank[i] = _comm->localRanks++;
// _comm->localRankToRank[_comm->rankToLocalRank[i]] = i;
// }
// }
// if (_comm->localRanks > _comm->maxLocalRanks) {
// WARN("Too many ranks on the same host: %d", _comm->localRanks);
// res = mscclppInvalidUsage;
// goto fail;
// }
// _comm->node = _comm->rankToNode[rank];
// _comm->localRank = _comm->rankToLocalRank[rank];
*comm = _comm;
return res;
@@ -141,21 +141,31 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, int remoteRank,
void *buff, size_t buffSize, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev);
mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, void *buff, size_t buffSize,
int *flag, int tag, mscclppTransport_t transportType, const char *ibDev/*=NULL*/)
MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, mscclppDevConn* devConnOut, int remoteRank,
void* localBuff, size_t buffSize, int* localFlag, int tag, mscclppTransport_t transportType, const char *ibDev);
mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, int remoteRank, void* localBuff, size_t buffSize,
int* localFlag, int tag, mscclppTransport_t transportType, const char *ibDev/*=NULL*/)
{
if (comm->nConns == MAXCONNECTIONS) {
WARN("Too many connections made");
return mscclppInternalError;
}
if (devConnOut == NULL) {
WARN("devConnOut is the output of this function and needs to be allocated by the user");
return mscclppInvalidUsage;
}
struct mscclppConn *conn = &comm->conns[comm->nConns++];
conn->transport = transportType;
conn->remoteRank = remoteRank;
conn->tag = tag;
conn->buff = buff;
conn->buffSize = buffSize;
conn->flag = flag;
conn->devConn = devConnOut;
conn->devConn->localBuff = localBuff;
conn->devConn->localFlag = localFlag;
conn->devConn->tag = tag;
MSCCLPPCHECK(mscclppCudaHostCalloc(&conn->devConn->trigger, 1));
conn->ibCtx = NULL;
conn->ibQp = NULL;
if (ibDev != NULL) {
// Check if an IB context exists
int ibDevIdx = -1;
@@ -196,70 +206,95 @@ struct connInfo {
mscclppIbMrInfo infoRemoteFlagMr;
};
mscclppResult_t mscclppP2pConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/){
if (connInfo == NULL || conn == NULL){
WARN("connInfo or connection cannot be null");
return mscclppInternalError;
}
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleBuff, conn->devConn->localBuff));
CUDACHECK(cudaIpcGetMemHandle(&connInfo->handleFlag, conn->devConn->localFlag));
return mscclppSuccess;
}
mscclppResult_t mscclppP2pConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/){
if (connInfo == NULL || conn == NULL){
WARN("ipcHandles or connection cannot be null");
return mscclppInternalError;
}
CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->devConn->remoteBuff, connInfo->handleBuff, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(cudaIpcOpenMemHandle((void**)&conn->devConn->remoteFlag, connInfo->handleFlag, cudaIpcMemLazyEnablePeerAccess));
return mscclppSuccess;
}
mscclppResult_t mscclppIbConnectionSetupStart(struct connInfo* connInfo /*output*/, struct mscclppConn* conn /*input*/){
if (connInfo == NULL || conn == NULL){
WARN("connInfo or connection cannot be null");
return mscclppInternalError;
}
struct mscclppDevConn *devConn = conn->devConn;
devConn->remoteBuff = NULL;
MSCCLPPCHECK(mscclppCudaCalloc(&devConn->remoteFlag, 1));
struct mscclppIbContext *ibCtx = conn->ibCtx;
if (conn->ibQp == NULL) {
MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp));
}
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localBuff, conn->buffSize, &conn->ibBuffMr));
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localFlag, sizeof(int), &conn->ibLocalFlagMr));
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->remoteFlag, sizeof(int), &conn->ibRemoteFlagMr));
connInfo->infoQp = conn->ibQp->info;
connInfo->infoBuffMr = conn->ibBuffMr->info;
connInfo->infoLocalFlagMr = conn->ibLocalFlagMr->info;
connInfo->infoRemoteFlagMr = conn->ibRemoteFlagMr->info;
return mscclppSuccess;
}
mscclppResult_t mscclppIbConnectionSetupEnd(struct connInfo* connInfo /*input*/, struct mscclppConn* conn /*output*/){
if (connInfo == NULL || conn == NULL){
WARN("ipcHandles or connection cannot be null");
return mscclppInternalError;
}
if (conn->ibQp->rtr(&connInfo->infoQp) != 0) {
WARN("Failed to transition QP to RTR");
return mscclppInvalidUsage;
}
if (conn->ibQp->rts() != 0) {
WARN("Failed to transition QP to RTS");
return mscclppInvalidUsage;
}
conn->ibBuffMrInfo = connInfo->infoBuffMr;
conn->ibLocalFlagMrInfo = connInfo->infoLocalFlagMr;
conn->ibRemoteFlagMrInfo = connInfo->infoRemoteFlagMr;
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppConnectionSetup, mscclppComm_t comm);
mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm)
{
// Allocate connection info to be shared with GPU
MSCCLPPCHECK(mscclppCudaHostCalloc(&comm->devConns, comm->nConns));
// Send info to peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
struct mscclppDevConn *devConn = &comm->devConns[i];
conn->devConn = devConn;
devConn->tag = conn->tag;
devConn->localBuff = conn->buff;
devConn->localFlag = conn->flag;
MSCCLPPCHECK(mscclppCudaHostCalloc(&devConn->trigger, 1));
struct connInfo cInfo;
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleBuff, devConn->localBuff));
CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleFlag, devConn->localFlag));
MSCCLPPCHECK(mscclppP2pConnectionSetupStart(&cInfo, conn));
} else if (conn->transport == mscclppTransportIB) {
devConn->remoteBuff = NULL;
MSCCLPPCHECK(mscclppCudaCalloc(&devConn->remoteFlag, 1));
struct mscclppIbContext *ibCtx = conn->ibCtx;
if (conn->ibQp == NULL) {
MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp));
}
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localBuff, conn->buffSize, &conn->ibBuffMr));
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->localFlag, sizeof(int), &conn->ibLocalFlagMr));
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, devConn->remoteFlag, sizeof(int), &conn->ibRemoteFlagMr));
cInfo.infoQp = conn->ibQp->info;
cInfo.infoBuffMr = conn->ibBuffMr->info;
cInfo.infoLocalFlagMr = conn->ibLocalFlagMr->info;
cInfo.infoRemoteFlagMr = conn->ibRemoteFlagMr->info;
MSCCLPPCHECK(mscclppIbConnectionSetupStart(&cInfo, conn));
}
MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->remoteRank, conn->tag, &cInfo, sizeof(cInfo)));
MSCCLPPCHECK(bootstrapSend(comm->bootstrap, conn->remoteRank, conn->devConn->tag, &cInfo, sizeof(cInfo)));
}
// Recv info from peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
struct mscclppDevConn *devConn = &comm->devConns[i];
struct connInfo cInfo;
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->remoteRank, conn->tag, &cInfo, sizeof(cInfo)));
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, conn->remoteRank, conn->devConn->tag, &cInfo, sizeof(cInfo)));
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcOpenMemHandle(&devConn->remoteBuff, cInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(cudaIpcOpenMemHandle((void **)&devConn->remoteFlag, cInfo.handleFlag, cudaIpcMemLazyEnablePeerAccess));
MSCCLPPCHECK(mscclppP2pConnectionSetupEnd(&cInfo, conn));
} else if (conn->transport == mscclppTransportIB) {
if (conn->ibQp->rtr(&cInfo.infoQp) != 0) {
WARN("Failed to transition QP to RTR");
return mscclppInvalidUsage;
}
if (conn->ibQp->rts() != 0) {
WARN("Failed to transition QP to RTS");
return mscclppInvalidUsage;
}
conn->ibBuffMrInfo = cInfo.infoBuffMr;
conn->ibLocalFlagMrInfo = cInfo.infoLocalFlagMr;
conn->ibRemoteFlagMrInfo = cInfo.infoRemoteFlagMr;
MSCCLPPCHECK(mscclppIbConnectionSetupEnd(&cInfo, conn));
}
}
return mscclppSuccess;
}
@@ -276,24 +311,3 @@ mscclppResult_t mscclppProxyStop(mscclppComm_t comm)
MSCCLPPCHECK(mscclppProxyDestroy(comm));
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppGetLocalRank, mscclppComm_t comm, int *localRank);
mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank)
{
*localRank = comm->localRank;
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppGetNodeFromRank, mscclppComm_t comm, int rank, int *node);
mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node)
{
*node = comm->rankToNode[rank];
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppGetDevConns, mscclppComm_t comm, mscclppDevConn_t* devConns);
mscclppResult_t mscclppGetDevConns(mscclppComm_t comm, mscclppDevConn_t* devConns)
{
*devConns = comm->devConns;
return mscclppSuccess;
}

View File

@@ -161,10 +161,7 @@
struct mscclppConn {
mscclppTransport_t transport;
int remoteRank;
int tag;
void* buff;
int buffSize;
int* flag;
struct mscclppDevConn *devConn;
struct mscclppIbContext *ibCtx;
struct mscclppIbQp *ibQp;
@@ -186,7 +183,6 @@ struct mscclppComm {
// struct mscclppTopoSystem* topo;
struct mscclppConn conns[MAXCONNECTIONS];
struct mscclppDevConn *devConns;
int nConns;
// mscclppNet_t* mscclppNet;
@@ -205,14 +201,14 @@ struct mscclppComm {
// int64_t busId; // my PCI bus ID in int format
// cpu_set_t cpuAffinity; // CPU affinity of the GPU
int node;
int nNodes;
int localRank;
int localRanks;
int maxLocalRanks;
int* rankToNode;
int* rankToLocalRank;
int* localRankToRank;
// int node;
// int nNodes;
// int localRank;
// int localRanks;
// int maxLocalRanks;
// int* rankToNode;
// int* rankToLocalRank;
// int* localRankToRank;
// // localRanks and localRanktoRank for all nodes
// struct mscclppNodeRanks* nodeRanks;

View File

@@ -46,7 +46,7 @@ struct mscclppDevConn {
};
typedef struct mscclppComm* mscclppComm_t;
typedef struct mscclppDevConn* mscclppDevConn_t;
typedef struct mscclppDevConn mscclppDevConn_t;
#define MSCCLPP_UNIQUE_ID_BYTES 128
typedef struct { char internal[MSCCLPP_UNIQUE_ID_BYTES]; } mscclppUniqueId;
@@ -113,8 +113,8 @@ mscclppResult_t mscclppBootStrapAllGather(mscclppComm_t comm, void* data, int si
mscclppResult_t mscclppCommDestroy(mscclppComm_t comm);
mscclppResult_t mscclppConnect(mscclppComm_t comm, int remoteRank, void *buff, size_t buffSize, int *flag,
int tag, mscclppTransport_t transportType, const char *ibDev=NULL);
mscclppResult_t mscclppConnect(mscclppComm_t comm, mscclppDevConn* devConnOut, int remoteRank, void* localBuff, size_t buffSize,
int* localFlag, int tag, mscclppTransport_t transportType, const char *ibDev=NULL);
mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm);
@@ -122,10 +122,6 @@ mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm);
mscclppResult_t mscclppProxyStop(mscclppComm_t comm);
mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank);
mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node);
mscclppResult_t mscclppGetDevConns(mscclppComm_t comm, mscclppDevConn_t* devConns);
#ifdef __cplusplus
} // end extern "C"
#endif

View File

@@ -1,207 +0,0 @@
#include "mscclpp.h"
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string>
#define RANKS_PER_NODE 8
#define TEST_CONN_TYPE 1 // 0: P2P(for local)+IB(for remote), 1: IB-Only
// Check CUDA RT calls
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if( err != cudaSuccess ) { \
printf("%s:%d Cuda failure '%s'", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while(false)
__global__ void kernel(mscclppDevConn_t devConns, int rank, int world_size)
{
int warpId = threadIdx.x / 32;
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
mscclppDevConn_t devConn = &devConns[(remoteRank < rank) ? remoteRank : remoteRank - 1];
volatile int *data = (volatile int *)devConn->localBuff;
volatile int *localFlag = devConn->localFlag;
volatile int *remoteFlag = devConn->remoteFlag;
volatile uint64_t *trig = (volatile uint64_t *)devConn->trigger;
if (threadIdx.x == 0) {
// Set my data and flag
*(data + rank) = rank + 1;
__threadfence_system();
*localFlag = 1;
}
__syncthreads();
// Each warp receives data from different ranks
if (threadIdx.x % 32 == 0) {
if (devConn->remoteBuff == NULL) { // IB
// Trigger sending data and flag
uint64_t dataOffset = rank * sizeof(int);
uint64_t dataSize = sizeof(int);
*trig = (dataOffset << 32) + dataSize;
// Wait until the proxy have sent my data and flag
while (*trig != 0) {}
// Wait for receiving data from remote rank
while (*remoteFlag != 1) {}
} else { // P2P
// Directly read data
volatile int *remoteData = (volatile int *)devConn->remoteBuff;
// Wait until the remote data is set
while (*remoteFlag != 1) {}
// Read remote data
data[remoteRank] = remoteData[remoteRank];
}
}
}
int rankToLocalRank(int rank)
{
return rank % RANKS_PER_NODE;
}
int rankToNode(int rank)
{
return rank / RANKS_PER_NODE;
}
void print_usage(const char *prog)
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
printf("usage: %s IP:PORT rank nranks\n", prog);
#endif
}
int main(int argc, const char *argv[])
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc != 2 && argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank;
int world_size;
if (argc == 4) {
rank = atoi(argv[2]);
world_size = atoi(argv[3]);
} else {
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
}
#else
if (argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank = atoi(argv[2]);
int world_size = atoi(argv[3]);
#endif
int localRank = rankToLocalRank(rank);
int thisNode = rankToNode(rank);
mscclppComm_t comm;
mscclppResult_t res = mscclppCommInitRank(&comm, world_size, rank, ip_port);
if (res != mscclppSuccess) {
printf("mscclppCommInitRank failed\n");
return -1;
}
CUDACHECK(cudaSetDevice(localRank));
int *data_d;
int *flag_d;
CUDACHECK(cudaMalloc(&data_d, sizeof(int) * world_size));
CUDACHECK(cudaMalloc(&flag_d, sizeof(int)));
CUDACHECK(cudaMemset(data_d, 0, sizeof(int) * world_size));
CUDACHECK(cudaMemset(flag_d, 0, sizeof(int)));
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
mscclppTransport_t transportType = mscclppTransportIB;
const char *ibDev = ibDevStr.c_str();
#if (TEST_CONN_TYPE == 0) // P2P+IB
if (rankToNode(r) == thisNode) {
transportType = mscclppTransportP2P;
ibDev = NULL;
}
#endif
// Connect with all other ranks
res = mscclppConnect(comm, r, data_d, sizeof(int) * world_size, flag_d, 0, transportType, ibDev);
if (res != mscclppSuccess) {
printf("mscclppConnect failed\n");
return -1;
}
}
res = mscclppConnectionSetup(comm);
if (res != mscclppSuccess) {
printf("mscclppConnectionSetup failed\n");
return -1;
}
res = mscclppProxyLaunch(comm);
if (res != mscclppSuccess) {
printf("mscclppProxyLaunch failed\n");
return -1;
}
mscclppDevConn_t devConns;
mscclppGetDevConns(comm, &devConns);
kernel<<<1, 32 * (world_size - 1)>>>(devConns, rank, world_size);
CUDACHECK(cudaDeviceSynchronize());
res = mscclppProxyStop(comm);
if (res != mscclppSuccess) {
printf("mscclppProxyStop failed\n");
return -1;
}
// Read results from GPU
int *buf = (int *)calloc(world_size, sizeof(int));
if (buf == nullptr) {
printf("calloc failed\n");
return -1;
}
CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost));
bool failed = false;
for (int i = 0; i < world_size; ++i) {
if (buf[i] != i + 1) {
printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1);
failed = true;
}
}
if (failed) {
return -1;
}
res = mscclppCommDestroy(comm);
if (res != mscclppSuccess) {
printf("mscclppDestroy failed\n");
return -1;
}
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc == 2) {
MPI_Finalize();
}
#endif
printf("Succeeded! %d\n", rank);
return 0;
}

View File

@@ -1,113 +1,169 @@
#include "mscclpp.h"
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string>
#define RANKS_PER_NODE 8
#define TEST_CONN_TYPE 1 // 0: P2P(for local)+IB(for remote), 1: IB-Only
#define MSCCLPPCHECK(call) do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
// Check CUDA RT calls
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if( err != cudaSuccess ) { \
printf("Cuda failure '%s'", cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if( err != cudaSuccess ) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while(false)
__global__ void kernel(mscclppDevConn_t devConns, int rank, int world_size)
__constant__ mscclppDevConn_t constDevConns[8];
__global__ void kernel(int rank, int world_size)
{
int tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid == 0) {
// Set my data
volatile int *data = (volatile int *)devConns[rank].localBuff;
volatile int *flag = (volatile int *)devConns[rank].localFlag;
data[rank] = rank;
int warpId = threadIdx.x / 32;
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
mscclppDevConn_t devConn = constDevConns[remoteRank];
volatile int *data = (volatile int *)devConn.localBuff;
volatile int *localFlag = devConn.localFlag;
volatile int *remoteFlag = devConn.remoteFlag;
volatile uint64_t *trig = (volatile uint64_t *)devConn.trigger;
// Inform that the data is set
*flag = 1;
if (threadIdx.x == 0) {
// Set my data and flag
*(data + rank) = rank + 1;
__threadfence_system();
*localFlag = 1;
}
__syncthreads();
for (int i = 0; i < (world_size - 1) * 2; ++i) {
mscclppDevConn_t devConn = &devConns[i];
int tag = devConn->tag;
int rankRecv = tag / world_size;
int rankSend = tag % world_size;
// Each warp receives data from different ranks
if (threadIdx.x % 32 == 0) {
if (devConn.remoteBuff == NULL) { // IB
// Trigger sending data and flag
uint64_t dataOffset = rank * sizeof(int);
uint64_t dataSize = sizeof(int);
*trig = (dataOffset << 32) + dataSize;
if (rankRecv != rank) continue;
// Wait until the proxy have sent my data and flag
while (*trig != 0) {}
volatile int *remoteData = (volatile int *)devConn->remoteBuff;
volatile int *remoteFlag = (volatile int *)devConn->remoteFlag;
// Wait for receiving data from remote rank
while (*remoteFlag != 1) {}
} else { // P2P
// Directly read data
volatile int *remoteData = (volatile int *)devConn.remoteBuff;
// Wait until the remote data is set
while (*remoteFlag != 1) {}
// Read remote data
data[rankSend] = remoteData[rankSend];
data[remoteRank] = remoteData[remoteRank];
}
}
}
int rankToLocalRank(int rank)
{
return rank % RANKS_PER_NODE;
}
int rankToNode(int rank)
{
return rank / RANKS_PER_NODE;
}
void print_usage(const char *prog)
{
printf("usage: %s IP:PORT\n", prog);
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
printf("usage: %s IP:PORT rank nranks\n", prog);
#endif
}
int main(int argc, const char *argv[])
{
if (argc != 2) {
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc != 2 && argc != 4) {
print_usage(argv[0]);
return -1;
}
MPI_Init(NULL, NULL);
const char *ip_port = argv[1];
int rank;
int world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
if (argc == 4) {
rank = atoi(argv[2]);
world_size = atoi(argv[3]);
} else {
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
}
#else
if (argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank = atoi(argv[2]);
int world_size = atoi(argv[3]);
#endif
int localRank = rankToLocalRank(rank);
int thisNode = rankToNode(rank);
CUDACHECK(cudaSetDevice(localRank));
mscclppComm_t comm;
const char *ip_port = argv[1];
mscclppCommInitRank(&comm, world_size, rank, ip_port);
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
int *data_d;
int *flag_d;
CUDACHECK(cudaMalloc(&data_d, sizeof(int) * world_size));
size_t data_size = sizeof(int) * world_size;
CUDACHECK(cudaMalloc(&data_d, data_size));
CUDACHECK(cudaMalloc(&flag_d, sizeof(int)));
printf("-------- buf: %p, flag: %p\n", data_d, flag_d);
CUDACHECK(cudaMemset(data_d, 0, data_size));
CUDACHECK(cudaMemset(flag_d, 0, sizeof(int)));
mscclppResult_t res;
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
// Read from all other ranks
mscclppDevConn_t devConns[8];
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
int tag = rank * world_size + r;
res = mscclppConnect(comm, rank, r, data_d, flag_d, tag, mscclppTransportP2P);
if (res != mscclppSuccess) {
printf("mscclppConnect failed\n");
return -1;
mscclppTransport_t transportType = mscclppTransportIB;
const char *ibDev = ibDevStr.c_str();
#if (TEST_CONN_TYPE == 0) // P2P+IB
if (rankToNode(r) == thisNode) {
transportType = mscclppTransportP2P;
ibDev = NULL;
}
}
// Let others read from me
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
int tag = r * world_size + rank;
res = mscclppConnect(comm, r, rank, data_d, flag_d, tag, mscclppTransportP2P);
if (res != mscclppSuccess) {
printf("mscclppConnect failed\n");
return -1;
}
}
res = mscclppConnectionSetup(comm);
if (res != mscclppSuccess) {
printf("mscclppConnectionSetup failed\n");
return -1;
#endif
// Connect with all other ranks
MSCCLPPCHECK(mscclppConnect(comm, &devConns[r], r, data_d, data_size, flag_d, 0, transportType, ibDev));
}
mscclppDevConn_t devConns;
mscclppGetDevConns(comm, &devConns);
MSCCLPPCHECK(mscclppConnectionSetup(comm));
kernel<<<1, 1>>>(devConns, rank, world_size);
MSCCLPPCHECK(mscclppProxyLaunch(comm));
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * world_size));
kernel<<<1, 32 * (world_size - 1)>>>(rank, world_size);
CUDACHECK(cudaDeviceSynchronize());
MSCCLPPCHECK(mscclppProxyStop(comm));
// Read results from GPU
int *buf = (int *)calloc(world_size, sizeof(int));
if (buf == nullptr) {
printf("calloc failed\n");
@@ -115,21 +171,24 @@ int main(int argc, const char *argv[])
}
CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost));
bool failed = false;
for (int i = 0; i < world_size; ++i) {
if (buf[i] != i) {
printf("wrong data: %d, expected %d\n", buf[i], i);
return -1;
if (buf[i] != i + 1) {
printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1);
failed = true;
}
}
res = mscclppCommDestroy(comm);
if (res != mscclppSuccess) {
printf("mscclppDestroy failed\n");
if (failed) {
return -1;
}
MPI_Finalize();
MSCCLPPCHECK(mscclppCommDestroy(comm));
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc == 2) {
MPI_Finalize();
}
#endif
printf("Succeeded! %d\n", rank);
return 0;
}