IB all-to-all works

This commit is contained in:
Changho Hwang
2023-02-17 11:39:16 +00:00
parent 8e57fd9896
commit 33e20aceb9
15 changed files with 1000 additions and 1664 deletions

View File

@@ -6,6 +6,7 @@ MSCCLPP_MINOR := 1
DEBUG ?= 0
VERBOSE ?= 1
TRACE ?= 0
USE_MPI_FOR_TESTS ?= 0
######## CUDA
CUDA_HOME ?= /usr/local/cuda
@@ -81,9 +82,17 @@ CXXFLAGS += -DNVTX_DISABLE
endif
#### MPI (only for test code)
ifeq ($(USE_MPI_FOR_TESTS), 1)
MPI_HOME ?= /usr/local/mpi
MPI_INC := -I$(MPI_HOME)/include
MPI_LDFLAGS := -L$(MPI_HOME)/lib -lmpi
MPI_MACRO := -D MSCCLPP_USE_MPI_FOR_TESTS
else
MPI_HOME :=
MPI_INC :=
MPI_LDFLAGS :=
MPI_MACRO :=
endif
#### MSCCL++
BUILDDIR ?= $(abspath ./build)
@@ -92,8 +101,10 @@ LIBDIR := lib
OBJDIR := obj
BINDIR := bin
LDFLAGS := $(NVLDFLAGS) -libverbs
LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc)
LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc shmutils.cc)
LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc ib.cc)
LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS))
LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
@@ -105,7 +116,7 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR)
LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR)
TESTSDIR := tests
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc bootstrap_test_mpi.cc p2p_test_mpi.cu)
TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu)
TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS))
TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS))
@@ -140,17 +151,17 @@ $(LIBTARGET): $(LIBOBJTARGETS)
# Compile .cc tests
$(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cc
@mkdir -p $(@D)
$(CXX) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(CXXFLAGS) -c $<
$(CXX) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(CXXFLAGS) -c $< $(MPI_MACRO)
# Compile .cu tests
$(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cu
@mkdir -p $(@D)
$(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) -c $<
$(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) -c $< $(MPI_MACRO)
# Test bins
$(BUILDDIR)/$(BINDIR)/%: $(BUILDDIR)/$(OBJDIR)/%.o $(LIBTARGET)
@mkdir -p $(@D)
$(NVCC) -o $@ $< $(NVLDFLAGS) $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp
$(NVCC) -o $@ $< $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp
clean:
rm -rf $(BUILDDIR)

View File

@@ -5,7 +5,6 @@ mpirun -allow-run-as-root \
-tag-output \
-map-by ppr:8:node \
-bind-to numa \
-x MSCCLPP_DEBUG=INFO \
-x MSCCLPP_DEBUG_SUBSYS=ALL \
-x MSCCLPP_SOCKET_IFNAME=eth0 \
./build/bin/tests/p2p_test_mpi 172.17.0.4:50000
./build/bin/tests/p2p_test 172.17.0.4:50000

View File

@@ -315,7 +315,7 @@ mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscc
MSCCLPPCHECK(mscclppSocketListen(proxySocket));
MSCCLPPCHECK(mscclppSocketGetAddr(proxySocket, state->peerProxyAddresses+rank));
MSCCLPPCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union mscclppSocketAddress)));
MSCCLPPCHECK(mscclppProxyInit(comm, proxySocket, state->peerProxyAddresses));
// MSCCLPPCHECK(mscclppProxyInit(comm, proxySocket, state->peerProxyAddresses));
TRACE(MSCCLPP_INIT, "rank %d nranks %d - DONE", rank, nranks);

352
src/bootstrap/ib.cc Normal file
View File

@@ -0,0 +1,352 @@
#include <cassert>
#include <cstdlib>
#include <cstring>
#include <malloc.h>
#include <vector>
#include <unistd.h>
#include "debug.h"
#include "alloc.h"
#include "comm.h"
#include "ib.h"
mscclppResult_t mscclppIbContextCreate(struct mscclppIbContext **ctx, const char *ibDevName)
{
struct mscclppIbContext *_ctx;
MSCCLPPCHECK(mscclppCalloc(&_ctx, 1));
MSCCLPPCHECK(mscclppCalloc(&_ctx->wcs, MSCCLPP_IB_CQ_POLL_NUM));
std::vector<int> ports;
int num;
struct ibv_device **devices = ibv_get_device_list(&num);
for (int i = 0; i < num; ++i) {
if (strncmp(devices[i]->name, ibDevName, IBV_SYSFS_NAME_MAX) == 0) {
_ctx->ctx = ibv_open_device(devices[i]);
break;
}
}
ibv_free_device_list(devices);
if (_ctx->ctx == nullptr) {
WARN("ibv_open_device failed (errno %d, device name %s)", errno, ibDevName);
goto fail;
}
// Check available ports
struct ibv_device_attr devAttr;
if (ibv_query_device(_ctx->ctx, &devAttr) != 0) {
WARN("ibv_query_device failed (errno %d, device name %s)", errno, ibDevName);
goto fail;
}
for (uint8_t i = 1; i <= devAttr.phys_port_cnt; ++i) {
struct ibv_port_attr portAttr;
if (ibv_query_port(_ctx->ctx, i, &portAttr) != 0) {
WARN("ibv_query_port failed (errno %d, port %d)", errno, i);
goto fail;
}
if (portAttr.state != IBV_PORT_ACTIVE) {
continue;
}
if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND &&
portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) {
continue;
}
ports.push_back((int)i);
}
if (ports.size() == 0) {
WARN("no active IB port found");
goto fail;
}
MSCCLPPCHECK(mscclppCalloc(&_ctx->ports, ports.size()));
_ctx->nPorts = (int)ports.size();
for (int i = 0; i < _ctx->nPorts; ++i) {
_ctx->ports[i] = ports[i];
}
_ctx->cq = ibv_create_cq(_ctx->ctx, MSCCLPP_IB_CQ_SIZE, NULL, NULL, 0);
if (_ctx->cq == NULL) {
WARN("ibv_create_cq failed (errno %d)", errno);
goto fail;
}
_ctx->pd = ibv_alloc_pd(_ctx->ctx);
if (_ctx->pd == NULL) {
WARN("ibv_alloc_pd failed (errno %d)", errno);
goto fail;
}
*ctx = _ctx;
return mscclppSuccess;
fail:
*ctx = NULL;
if (_ctx->ports != NULL) {
free(_ctx->ports);
}
free(_ctx->wcs);
free(_ctx);
return mscclppInternalError;
}
mscclppResult_t mscclppIbContextDestroy(struct mscclppIbContext *ctx)
{
for (int i = 0; i < ctx->nMrs; ++i) {
ibv_dereg_mr(ctx->mrs[i].mr);
}
for (int i = 0; i < ctx->nQps; ++i) {
ibv_destroy_qp(ctx->qps[i].qp);
}
if (ctx->pd != NULL) {
ibv_dealloc_pd(ctx->pd);
}
if (ctx->cq != NULL) {
ibv_destroy_cq(ctx->cq);
}
if (ctx->ctx != NULL) {
ibv_close_device(ctx->ctx);
}
free(ctx->mrs);
free(ctx->qps);
free(ctx->ports);
free(ctx->wcs);
free(ctx);
return mscclppSuccess;
}
mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct mscclppIbQp **ibQp, int port/*=-1*/)
{
if (port < 0) {
port = ctx->ports[0];
} else {
bool found = false;
for (int i = 0; i < ctx->nPorts; ++i) {
if (ctx->ports[i] == port) {
found = true;
break;
}
}
if (!found) {
WARN("invalid IB port: %d", port);
return mscclppInternalError;
}
}
struct ibv_qp_init_attr qp_init_attr;
std::memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr));
qp_init_attr.sq_sig_all = 0;
qp_init_attr.send_cq = ctx->cq;
qp_init_attr.recv_cq = ctx->cq;
qp_init_attr.qp_type = IBV_QPT_RC;
qp_init_attr.cap.max_send_wr = MAXCONNECTIONS;
qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = 0;
struct ibv_qp *qp = ibv_create_qp(ctx->pd, &qp_init_attr);
if (qp == nullptr) {
WARN("ibv_create_qp failed (errno %d)", errno);
return mscclppInternalError;
}
struct ibv_port_attr port_attr;
if (ibv_query_port(ctx->ctx, port, &port_attr) != 0) {
WARN("ibv_query_port failed (errno %d, port %d)", errno, port);
return mscclppInternalError;
}
// Register QP to this ctx
qp->context = ctx->ctx;
if (qp->context == NULL) {
WARN("IB context is NULL");
return mscclppInternalError;
}
ctx->nQps++;
if (ctx->qps == NULL) {
MSCCLPPCHECK(mscclppCalloc(&ctx->qps, MAXCONNECTIONS));
ctx->maxQps = MAXCONNECTIONS;
}
if (ctx->maxQps < ctx->nQps) {
WARN("too many QPs");
return mscclppInternalError;
}
struct mscclppIbQp *_ibQp = &ctx->qps[ctx->nQps - 1];
_ibQp->qp = qp;
_ibQp->info.lid = port_attr.lid;
_ibQp->info.port = port;
_ibQp->info.linkLayer = port_attr.link_layer;
_ibQp->info.qpn = qp->qp_num;
_ibQp->info.mtu = port_attr.active_mtu;
if (port_attr.link_layer != IBV_LINK_LAYER_INFINIBAND) {
union ibv_gid gid;
if (ibv_query_gid(ctx->ctx, port, 0, &gid) != 0) {
WARN("ibv_query_gid failed (errno %d)", errno);
return mscclppInternalError;
}
_ibQp->info.spn = gid.global.subnet_prefix;
}
struct ibv_qp_attr qp_attr;
std::memset(&qp_attr, 0, sizeof(struct ibv_qp_attr));
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.pkey_index = 0;
qp_attr.port_num = port;
qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
if (ibv_modify_qp(qp, &qp_attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) {
WARN("ibv_modify_qp failed (errno %d)", errno);
return mscclppInternalError;
}
MSCCLPPCHECK(mscclppCalloc(&_ibQp->wrs, MSCCLPP_IB_MAX_SENDS));
MSCCLPPCHECK(mscclppCalloc(&_ibQp->sges, MSCCLPP_IB_MAX_SENDS));
*ibQp = _ibQp;
return mscclppSuccess;
}
mscclppResult_t mscclppIbContextRegisterMr(struct mscclppIbContext *ctx, void *buff, size_t size, struct mscclppIbMr **ibMr)
{
if (size == 0) {
WARN("invalid size: %zu", size);
return mscclppInvalidArgument;
}
static __thread uintptr_t pageSize = 0;
if (pageSize == 0) {
pageSize = sysconf(_SC_PAGESIZE);
}
uintptr_t addr = reinterpret_cast<uintptr_t>(buff) & -pageSize;
size_t pages = (size + (reinterpret_cast<uintptr_t>(buff) - addr) + pageSize - 1) / pageSize;
struct ibv_mr *mr =
ibv_reg_mr(ctx->pd, reinterpret_cast<void *>(addr), pages * pageSize,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_RELAXED_ORDERING);
if (mr == nullptr) {
WARN("ibv_reg_mr failed (errno %d)", errno);
return mscclppInternalError;
}
ctx->nMrs++;
if (ctx->mrs == NULL) {
MSCCLPPCHECK(mscclppCalloc(&ctx->mrs, MAXCONNECTIONS));
ctx->maxMrs = MAXCONNECTIONS;
}
if (ctx->maxMrs < ctx->nMrs) {
WARN("too many MRs");
return mscclppInternalError;
}
struct mscclppIbMr *_ibMr = &ctx->mrs[ctx->nMrs - 1];
_ibMr->mr = mr;
_ibMr->buff = buff;
_ibMr->info.addr = (uint64_t)buff;
_ibMr->info.rkey = mr->rkey;
*ibMr = _ibMr;
return mscclppSuccess;
}
mscclppResult_t mscclppIbContextPollCq(struct mscclppIbContext *ctx, int *wcNum)
{
int ret = ibv_poll_cq(ctx->cq, MSCCLPP_IB_CQ_POLL_NUM, ctx->wcs);
if (ret < 0) {
WARN("ibv_poll_cq failed (errno %d)", errno);
return mscclppInternalError;
}
ctx->wcn = ret;
*wcNum = ret;
return mscclppSuccess;
}
//////////////////////////////////////////////////////////////////////////////
int mscclppIbQp::rtr(const mscclppIbQpInfo *info)
{
struct ibv_qp_attr qp_attr;
std::memset(&qp_attr, 0, sizeof(struct ibv_qp_attr));
qp_attr.qp_state = IBV_QPS_RTR;
qp_attr.path_mtu = IBV_MTU_1024;
qp_attr.dest_qp_num = info->qpn;
qp_attr.rq_psn = 0;
qp_attr.max_dest_rd_atomic = 1;
qp_attr.min_rnr_timer = 0x12;
if (info->linkLayer == IBV_LINK_LAYER_ETHERNET) {
qp_attr.ah_attr.is_global = 1;
qp_attr.ah_attr.grh.dgid.global.subnet_prefix = info->spn;
qp_attr.ah_attr.grh.dgid.global.interface_id = info->lid;
qp_attr.ah_attr.grh.flow_label = 0;
qp_attr.ah_attr.grh.sgid_index = 0;
qp_attr.ah_attr.grh.hop_limit = 255;
qp_attr.ah_attr.grh.traffic_class = 0;
} else {
qp_attr.ah_attr.is_global = 0;
qp_attr.ah_attr.dlid = info->lid;
}
qp_attr.ah_attr.sl = 0;
qp_attr.ah_attr.src_path_bits = 0;
qp_attr.ah_attr.port_num = info->port;
return ibv_modify_qp(this->qp, &qp_attr,
IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER);
}
int mscclppIbQp::rts()
{
struct ibv_qp_attr qp_attr;
std::memset(&qp_attr, 0, sizeof(struct ibv_qp_attr));
qp_attr.qp_state = IBV_QPS_RTS;
qp_attr.timeout = 18;
qp_attr.retry_cnt = 7;
qp_attr.rnr_retry = 7;
qp_attr.sq_psn = 0;
qp_attr.max_rd_atomic = 1;
return ibv_modify_qp(this->qp, &qp_attr,
IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC);
}
int mscclppIbQp::stageSend(struct mscclppIbMr *ibMr, const mscclppIbMrInfo *info, int size,
uint64_t wrId, unsigned int immData, int offset)
{
if (this->wrn >= MSCCLPP_IB_MAX_SENDS) {
return -1;
}
int wrn = this->wrn;
struct ibv_send_wr *wr_ = &this->wrs[wrn];
struct ibv_sge *sge_ = &this->sges[wrn];
std::memset(wr_, 0, sizeof(struct ibv_send_wr));
std::memset(sge_, 0, sizeof(struct ibv_sge));
wr_->wr_id = wrId;
wr_->sg_list = sge_;
wr_->num_sge = 1;
wr_->opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
wr_->imm_data = immData;
wr_->send_flags = IBV_SEND_SIGNALED;
wr_->wr.rdma.remote_addr = info->addr;
wr_->wr.rdma.rkey = info->rkey;
wr_->next = nullptr;
sge_->addr = (uint64_t)(ibMr->buff) + (uint64_t)offset;
sge_->length = size;
sge_->lkey = ibMr->mr->lkey;
if (wrn > 0) {
this->wrs[wrn - 1].next = wr_;
}
this->wrn++;
return this->wrn;
}
int mscclppIbQp::postSend()
{
struct ibv_send_wr *bad_wr;
int ret = ibv_post_send(this->qp, this->wrs, &bad_wr);
if (ret != 0) {
return ret;
}
std::memset(this->wrs, 0, sizeof(struct ibv_send_wr) * this->wrn);
std::memset(this->sges, 0, sizeof(struct ibv_sge) * this->wrn);
this->wrn = 0;
return 0;
}
int mscclppIbQp::postRecv(uint64_t wrId)
{
struct ibv_recv_wr wr, *bad_wr;
wr.wr_id = wrId;
wr.sg_list = nullptr;
wr.num_sge = 0;
wr.next = nullptr;
return ibv_post_recv((struct ibv_qp *)this->qp, &wr, &bad_wr);
}

View File

@@ -1,7 +1,6 @@
#include "mscclpp.h"
#include "bootstrap.h"
#include "core.h"
#include "shmutils.h"
#include <map>
#include <sstream>
@@ -128,6 +127,12 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
if (comm == NULL)
return mscclppSuccess;
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i]) {
MSCCLPPCHECK(mscclppIbContextDestroy(comm->ibContext[i]));
}
}
if (comm->bootstrap)
MSCCLPPCHECK(bootstrapClose(comm->bootstrap));
@@ -137,19 +142,50 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){
}
MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, int rankRecv, int rankSend,
void *buff, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev);
mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, int *flag, int tag,
mscclppTransport_t transportType, const char *ibDev/*=NULL*/)
void *buff, size_t buffSize, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev);
mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, size_t buffSize,
int *flag, int tag, mscclppTransport_t transportType, const char *ibDev/*=NULL*/)
{
if (comm->rank == rankRecv || comm->rank == rankSend) {
struct mscclppConn *conn = &comm->conns[comm->nConns++];
conn->transport = transportType;
conn->localRank = comm->rank;
conn->ibDev = ibDev;
conn->rankSend = rankSend;
conn->rankRecv = rankRecv;
conn->tag = tag;
conn->buff = buff;
conn->buffSize = buffSize;
conn->flag = flag;
conn->remoteRank = (comm->rank == rankRecv) ? rankSend : rankRecv;
conn->ibCtx = NULL;
conn->ibQp = NULL;
if (ibDev != NULL) {
// Check if an IB context exists
int ibDevIdx = -1;
int firstNullIdx = -1;
for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) {
if (comm->ibContext[i] == NULL) {
if (firstNullIdx == -1) {
firstNullIdx = i;
}
} else if (strncmp(comm->ibContext[i]->ctx->device->name, ibDev, IBV_SYSFS_NAME_MAX) == 0) {
ibDevIdx = i;
break;
}
}
if (ibDevIdx == -1) {
// Create a new context.
if (firstNullIdx == -1) {
WARN("Too many IB devices");
return mscclppInvalidUsage;
}
ibDevIdx = firstNullIdx;
if (mscclppIbContextCreate(&comm->ibContext[ibDevIdx], ibDev) != mscclppSuccess) {
WARN("Failed to create IB context");
return mscclppInternalError;
}
}
conn->ibCtx = comm->ibContext[ibDevIdx];
}
}
return mscclppSuccess;
}
@@ -157,68 +193,94 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, v
MSCCLPP_API(mscclppResult_t, mscclppConnectionSetup, mscclppComm_t comm);
mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm)
{
struct ipcMemHandleInfo {
cudaIpcMemHandle_t handle_buff;
cudaIpcMemHandle_t handle_flag;
int tag;
int valid;
struct connInfo {
cudaIpcMemHandle_t handleBuff;
cudaIpcMemHandle_t handleFlag;
mscclppIbQpInfo qpInfo;
mscclppIbMrInfo mrInfo;
};
size_t shmSize = MAXCONNECTIONS * sizeof(struct ipcMemHandleInfo);
int fd;
struct ipcMemHandleInfo *handleInfos;
std::string shmname = mscclppShmFileName(comm, comm->localRank);
MSCCLPPCHECK(mscclppShmutilsMapCreate(shmname.c_str(), shmSize, &fd, (void **)&handleInfos));
// Send info to peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
CUDACHECK(cudaIpcGetMemHandle(&handleInfos[i].handle_buff, conn->buff));
CUDACHECK(cudaIpcGetMemHandle(&handleInfos[i].handle_flag, conn->flag));
handleInfos[i].tag = conn->tag;
handleInfos[i].valid = 1;
struct connInfo cInfo;
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleBuff, conn->buff));
CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleFlag, conn->flag));
} else if (conn->transport == mscclppTransportIB) {
struct mscclppIbContext *ibCtx = conn->ibCtx;
if (conn->ibQp == NULL) {
MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp));
}
MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, conn->buff, conn->buffSize, &conn->ibMr));
cInfo.qpInfo = conn->ibQp->info;
cInfo.mrInfo = conn->ibMr->info;
}
int peer = conn->rankSend == comm->rank ? conn->rankRecv : conn->rankSend;
MSCCLPPCHECK(bootstrapSend(comm->bootstrap, peer, conn->tag, &cInfo, sizeof(cInfo)));
}
// Local intra-node barrier: wait for all local ranks to have written their memory handles
MSCCLPPCHECK(bootstrapBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0]));
// Allocate connection info to be shared with GPU
MSCCLPPCHECK(mscclppCudaHostCalloc(&comm->devConns, comm->nConns));
for (int r = 0; r < comm->localRanks; ++r) {
if (r == comm->localRank)
continue;
int fd_r;
struct ipcMemHandleInfo *handleInfos_r;
std::string shmname_r = mscclppShmFileName(comm, r);
MSCCLPPCHECK(mscclppShmutilsMapOpen(shmname_r.c_str(), shmSize, &fd_r, (void **)&handleInfos_r));
// Recv info from peers
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
struct mscclppDevConn *devConn = &comm->devConns[i];
std::map<int, std::pair<cudaIpcMemHandle_t, cudaIpcMemHandle_t>> remoteHandles;
for (int i = 0; i < MAXCONNECTIONS; ++i) {
if (handleInfos_r[i].valid != 1) {
break;
devConn->tag = conn->tag;
devConn->localBuff = conn->buff;
devConn->localFlag = conn->flag;
struct connInfo cInfo;
int peer = conn->rankSend == comm->rank ? conn->rankRecv : conn->rankSend;
MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, peer, conn->tag, &cInfo, sizeof(cInfo)));
if (conn->transport == mscclppTransportP2P) {
CUDACHECK(cudaIpcOpenMemHandle(&devConn->remoteBuff, cInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(cudaIpcOpenMemHandle((void **)&devConn->remoteFlag, cInfo.handleFlag, cudaIpcMemLazyEnablePeerAccess));
} else if (conn->transport == mscclppTransportIB) {
if (conn->ibQp->rtr(&cInfo.qpInfo) != 0) {
WARN("Failed to transition QP to RTR");
return mscclppInvalidUsage;
}
remoteHandles[handleInfos_r[i].tag] = std::make_pair(handleInfos_r[i].handle_buff, handleInfos_r[i].handle_flag);
}
for (int i = 0; i < comm->nConns; ++i) {
struct mscclppConn *conn = &comm->conns[i];
auto it = remoteHandles.find(conn->tag);
if (it != remoteHandles.end()) {
comm->devConns[i].tag = conn->tag;
comm->devConns[i].localBuff = conn->buff;
comm->devConns[i].localFlag = conn->flag;
CUDACHECK(cudaIpcOpenMemHandle(&comm->devConns[i].remoteBuff, it->second.first, cudaIpcMemLazyEnablePeerAccess));
CUDACHECK(cudaIpcOpenMemHandle((void **)&comm->devConns[i].remoteFlag, it->second.second, cudaIpcMemLazyEnablePeerAccess));
if (conn->ibQp->rts() != 0) {
WARN("Failed to transition QP to RTS");
return mscclppInvalidUsage;
}
conn->ibRemoteMrInfo = cInfo.mrInfo;
devConn->remoteBuff = NULL;
CUDACHECK(cudaMalloc(&devConn->remoteFlag, sizeof(int)));
}
MSCCLPPCHECK(mscclppShmutilsMapClose(shmname_r.c_str(), shmSize, fd_r, handleInfos_r));
}
// Local intra-node barrier: wait for all local ranks to have read all memory handles
MSCCLPPCHECK(bootstrapBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0]));
return mscclppSuccess;
}
MSCCLPPCHECK(mscclppShmutilsMapDestroy(shmname.c_str(), shmSize, fd, handleInfos));
MSCCLPP_API(mscclppResult_t, mscclppProxyLaunch, mscclppComm_t comm);
mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm)
{
MSCCLPPCHECK(mscclppProxyCreate(comm));
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppProxyStop, mscclppComm_t comm);
mscclppResult_t mscclppProxyStop(mscclppComm_t comm)
{
MSCCLPPCHECK(mscclppProxyDestroy(comm));
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppGetLocalRank, mscclppComm_t comm, int *localRank);
mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank)
{
*localRank = comm->localRank;
return mscclppSuccess;
}
MSCCLPP_API(mscclppResult_t, mscclppGetNodeFromRank, mscclppComm_t comm, int rank, int *node);
mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node)
{
*node = comm->rankToNode[rank];
return mscclppSuccess;
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,80 +0,0 @@
#include "shmutils.h"
#include "debug.h"
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/mman.h>
#define SHM_MODE 0666
// Open a shme file and create an mmap.
static mscclppResult_t shmutilsMapOpen(const char *name, size_t size, int *fd, void **map, int flag)
{
int _fd = shm_open(name, flag, SHM_MODE);
if (_fd == -1) {
WARN("Failed to open shm file %s (flag: %d)", name, flag);
return mscclppInternalError;
}
void *_map = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0);
if (_map == MAP_FAILED) {
WARN("Failed to mmap shm file %s", name);
goto fail;
}
if (flag & O_CREAT) {
if (ftruncate(_fd, 0) == -1) {
WARN("Failed to ftruncate shm file %s", name);
goto fail;
}
}
if (ftruncate(_fd, size) == -1) {
WARN("Failed to ftruncate shm file %s", name);
goto fail;
}
*fd = _fd;
*map = _map;
return mscclppSuccess;
fail:
close(_fd);
shm_unlink(name);
return mscclppInternalError;
}
// Open or create a shm file.
mscclppResult_t mscclppShmutilsMapCreate(const char *name, size_t size, int *fd, void **map)
{
return shmutilsMapOpen(name, size, fd, map, O_CREAT | O_RDWR);
}
// Open an existing shm file.
mscclppResult_t mscclppShmutilsMapOpen(const char *name, size_t size, int *fd, void **map)
{
return shmutilsMapOpen(name, size, fd, map, O_RDWR);
}
// Close a shm mmap.
mscclppResult_t mscclppShmutilsMapClose(const char *name, size_t size, int fd, void *map)
{
int err = 0;
if (munmap(map, size) == -1) {
WARN("Failed to munmap shm file %s", name);
err = 1;
}
close(fd);
return err ? mscclppInternalError : mscclppSuccess;
}
// Close a shm mmap and destroy a shm file.
mscclppResult_t mscclppShmutilsMapDestroy(const char *name, size_t size, int fd, void *map)
{
int err = 0;
if (munmap(map, size) == -1) {
WARN("Failed to munmap shm file %s", name);
err = 1;
}
close(fd);
if (shm_unlink(name) == -1) {
WARN("Failed to unlink shm file %s: errno %d", name, errno);
err = 1;
}
return err ? mscclppInternalError : mscclppSuccess;
}

View File

@@ -12,6 +12,7 @@
// #include "collectives.h"
#include "proxy.h"
// #include "strongstream.h"
#include "ib.h"
// #if CUDART_VERSION < 9000
// struct cudaLaunchParams {
@@ -159,13 +160,17 @@
struct mscclppConn {
mscclppTransport_t transport;
int localRank;
int remoteRank;
const char* ibDev;
int rankSend;
int rankRecv;
int tag;
void* buff;
int buffSize;
int* flag;
struct mscclppDevConn *devConn;
struct mscclppIbContext *ibCtx;
struct mscclppIbQp *ibQp;
struct mscclppIbMr *ibMr;
struct mscclppIbMrInfo ibRemoteMrInfo;
};
struct mscclppComm {
@@ -268,7 +273,9 @@ struct mscclppComm {
// char intraPad2[64 - sizeof(uint64_t)];
// uint64_t intraBarrierGate; // only used if this is intraComm0
struct mscclppProxyState proxyState;
struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS];
struct mscclppProxyState proxyState[MSCCLPP_IB_MAX_DEVS];
// // Whether this communicator uses collNet
// int collNetSupport;

78
src/include/ib.h Normal file
View File

@@ -0,0 +1,78 @@
#ifndef MSCCLPP_IB_H_
#define MSCCLPP_IB_H_
#include "mscclpp.h"
#include <list>
#include <memory>
#include <string>
#include <infiniband/verbs.h>
#define MSCCLPP_IB_CQ_SIZE 1024
#define MSCCLPP_IB_CQ_POLL_NUM 4
#define MSCCLPP_IB_MAX_SENDS 64
#define MSCCLPP_IB_MAX_DEVS 8
// MR info to be shared with the remote peer
struct mscclppIbMrInfo {
uint64_t addr;
uint32_t rkey;
};
// IB memory region
struct mscclppIbMr {
struct ibv_mr *mr;
void *buff;
struct mscclppIbMrInfo info;
};
// QP info to be shared with the remote peer
struct mscclppIbQpInfo {
uint16_t lid;
uint8_t port;
uint8_t linkLayer;
uint32_t qpn;
uint64_t spn;
int mtu;
};
// IB queue pair
struct mscclppIbQp {
struct ibv_qp *qp;
struct mscclppIbQpInfo info;
struct ibv_send_wr *wrs;
struct ibv_sge *sges;
int wrn;
int rtr(const mscclppIbQpInfo *info);
int rts();
int stageSend(struct mscclppIbMr *ibMr, const mscclppIbMrInfo *info, int size,
uint64_t wrId, unsigned int immData, int offset = 0);
int postSend();
int postRecv(uint64_t wrId);
};
// Holds resources of a single IB device.
struct mscclppIbContext {
int numa_node;
struct ibv_context *ctx;
struct ibv_cq *cq;
struct ibv_pd *pd;
struct ibv_wc *wcs;
int wcn;
int *ports;
int nPorts;
struct mscclppIbQp *qps;
int nQps;
int maxQps;
struct mscclppIbMr *mrs;
int nMrs;
int maxMrs;
};
mscclppResult_t mscclppIbContextCreate(struct mscclppIbContext **ctx, const char *ibDevName);
mscclppResult_t mscclppIbContextDestroy(struct mscclppIbContext *ctx);
mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct mscclppIbQp **ibQp, int port = -1);
mscclppResult_t mscclppIbContextRegisterMr(struct mscclppIbContext *ctx, void *buff, size_t size, struct mscclppIbMr **ibMr);
mscclppResult_t mscclppIbContextPollCq(struct mscclppIbContext *ctx, int *wcNum);
#endif

View File

@@ -102,11 +102,17 @@ mscclppResult_t mscclppBootStrapAllGather(mscclppComm_t comm, void* data, int si
mscclppResult_t mscclppCommDestroy(mscclppComm_t comm);
mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, int *flag, int tag,
mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, size_t buffSize, int *flag, int tag,
mscclppTransport_t transportType, const char *ibDev=NULL);
mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm);
mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm);
mscclppResult_t mscclppProxyStop(mscclppComm_t comm);
mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank);
mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node);
mscclppResult_t mscclppGetDevConns(mscclppComm_t comm, mscclppDevConn_t* devConns);
#ifdef __cplusplus

View File

@@ -1,229 +1,16 @@
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef MSCCLPP_PROXY_H_
#define MSCCLPP_PROXY_H_
// #include "devcomm.h"
// #include "info.h"
#include "socket.h"
// #include <pthread.h>
// #include "shm.h"
// enum mscclppProxyOpState { mscclppProxyOpNone, mscclppProxyOpReady, mscclppProxyOpProgress };
// struct mscclppProxyArgs;
// typedef mscclppResult_t (*proxyProgressFunc_t)(struct mscclppComm*, struct mscclppProxyArgs*);
// #define MSCCLPP_PROXY_MAX_SUBS MAXCHANNELS
// static_assert(MSCCLPP_MAX_WORK_ELEMENTS <= MAXCHANNELS, "Not enough sub space for max work elements");
// struct mscclppProxyOp {
// struct mscclppProxyConnection* connection;
// int channelId;
// int nsteps;
// ssize_t nbytes;
// int root;
// int next;
// uint64_t opCount;
// int sliceSteps;
// int chunkSteps;
// int chunkSize;
// uint8_t /*mscclppDataType_t*/ dtype;
// uint8_t /*mscclppDevRedOp_t*/ redOp;
// uint8_t /*mscclppPattern_t*/ pattern;
// uint8_t protocol;
// union {
// uint64_t unused;
// // For use by enqueue.cc
// struct mscclppProxyOp *enqNext;
// };
// };
// static_assert(sizeof(struct mscclppProxyOp) == 64, "Keep ProxyOp aligned with cache lines for effective prefetch");
// struct mscclppProxySubArgs {
// struct mscclppProxyConnection* connection;
// int channelId;
// int nsteps;
// ssize_t nbytes;
// int peer;
// int groupSize; // Number of consecutive sub operations sharing the same recvComm
// uint64_t base;
// uint64_t posted;
// uint64_t received;
// uint64_t flushed;
// uint64_t transmitted;
// uint64_t done;
// uint64_t end;
// void* requests[MSCCLPP_STEPS];
// void* profilingEvents[MSCCLPP_STEPS];
// };
// struct mscclppProxyArgs {
// struct mscclppProxySubArgs subs[MSCCLPP_PROXY_MAX_SUBS];
// proxyProgressFunc_t progress;
// int nsubs;
// int done;
// uint64_t opCount;
// int sliceSteps;
// int chunkSteps;
// int chunkSize;
// uint8_t /*mscclppDataType_t*/ dtype;
// uint8_t /*mscclppDevRedOp_t*/ redOp;
// uint8_t /*mscclppPattern_t*/ pattern;
// uint8_t protocol;
// int state;
// char* sharedBuff[MSCCLPP_STEPS];
// int sharedSize[MSCCLPP_STEPS];
// int idle;
// // Element linking
// struct mscclppProxyArgs* next;
// struct mscclppProxyArgs* nextPeer;
// struct mscclppProxyArgs** proxyAppendPtr;
// };
// #define MSCCLPP_MAX_NETDEVS 128
// // ProxyOps are used to communicate between main thread and service thread
// // Make sure we have enough to store two full rounds of operations on all channels.
// // Otherwise we'd be unable to post half of them to free new elements.
// #define MAX_OPS_PER_PEER (2*MAXCHANNELS*MSCCLPP_MAX_WORK_ELEMENTS_P2P)
// #define MSCCLPP_MAX_LOCAL_RANKS 64
// struct mscclppProxyOpsPool {
// struct mscclppProxyOp ops[MAX_OPS_PER_PEER*MSCCLPP_MAX_LOCAL_RANKS];
// volatile int nextOps;
// volatile int nextOpsEnd;
// volatile int freeOps[MSCCLPP_MAX_LOCAL_RANKS];
// pthread_mutex_t mutex;
// pthread_cond_t cond;
// };
// struct mscclppProxyOps {
// mscclppProxyOpsPool* pool;
// mscclppShmHandle_t handle;
// int count;
// int freeOp;
// int nextOps;
// int nextOpsEnd;
// };
// struct mscclppProxySharedP2p {
// int refcount;
// int size;
// char* cudaBuff;
// char* hostBuff;
// cudaIpcMemHandle_t ipc;
// struct mscclppProxyArgs* proxyAppend[MAXCHANNELS]; // Separate send and recv
// };
// struct mscclppProxySharedCollNet {
// int size;
// char* cudaBuff;
// char* hostBuff;
// struct mscclppProxyArgs* proxyAppend[2*MSCCLPP_MAX_NETDEVS];
// void* resources;
// };
// struct mscclppProxyPeer {
// struct mscclppProxySharedP2p send;
// struct mscclppProxySharedP2p recv;
// };
// struct mscclppSharedNetComms {
// void* sendComm[MAXCHANNELS];
// void* recvComm[MAXCHANNELS];
// int sendRefCount[MAXCHANNELS];
// int recvRefCount[MAXCHANNELS];
// };
// struct mscclppProxyPool;
// struct mscclppProxyProgressState {
// // Used by main threads to send work to progress thread
// struct mscclppProxyOpsPool* opsPool;
// mscclppShmHandle_t handle;
// char opsPoolShmSuffix[6];
// pthread_t thread;
// bool stop;
// struct mscclppProxyPeer** localPeers;
// struct mscclppSharedNetComms* netComms[MSCCLPP_MAX_NETDEVS];
// struct mscclppProxySharedCollNet collNet;
// struct mscclppProxyArgs* active;
// struct mscclppProxyArgs* pool;
// struct mscclppProxyPool* pools;
// int nextOps;
// };
#include "mscclpp.h"
#include "comm.h"
#include <pthread.h>
struct mscclppProxyState {
// Service thread
pthread_t thread;
struct mscclppSocket* listenSock;
int stop;
// CUcontext cudaCtx;
// Used by main thread
union mscclppSocketAddress* peerAddresses;
struct mscclppSocket* peerSocks;
// struct mscclppProxyOps* proxyOps;
// void** sharedDevMems;
// Progress thread
// struct mscclppProxyProgressState progressState;
};
// enum proxyConnectState {
// connUninitialized = 0,
// connInitialized = 1,
// connSharedInitialized = 2,
// connSetupDone = 3,
// connConnected = 4,
// numConnStates = 5
// };
mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm);
mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm);
// struct mscclppProxyConnection {
// int send, transport, shared;
// int localRank;
// struct mscclppSocket* sock;
// struct mscclppTransportComm* tcomm;
// struct mscclppProxyArgs *proxyAppend;
// struct mscclppProxyArgs **proxyAppendPtr;
// void* transportResources;
// proxyConnectState state;
// };
// typedef mscclppResult_t (*threadFunc_t)(struct mscclppProxyArgs*);
// enum proxyMode {
// proxyRing = 0,
// proxyFrom = 1,
// proxyTo = 2
// };
// mscclppResult_t mscclppProxySaveOp(struct mscclppComm* comm, struct mscclppProxyOp* proxyOp, bool *justInquire);
// mscclppResult_t mscclppProxyComputeP2p(struct mscclppInfo* info, struct mscclppProxyOp* proxyOp);
// mscclppResult_t mscclppProxyStart(struct mscclppComm* comm);
mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses);
// mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm);
// mscclppResult_t mscclppProxyConnect(struct mscclppComm* comm, int transport, int send, int rank, struct mscclppProxyConnector* proxyConn);
// enum mscclppProxyMsgType {
// mscclppProxyMsgInit = 1,
// mscclppProxyMsgSharedInit = 2,
// mscclppProxyMsgSetup = 3,
// mscclppProxyMsgConnect = 4,
// mscclppProxyMsgStart = 5,
// mscclppProxyMsgClose = 6,
// mscclppProxyMsgAbort = 7,
// mscclppProxyMsgStop = 8
// };
// mscclppResult_t mscclppProxyCall(struct mscclppProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize);
// mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm);
// mscclppResult_t mscclppProxyShmUnlink(struct mscclppComm* comm);
#endif

View File

@@ -1,11 +0,0 @@
#ifndef MSCCLPP_SHMUTILS_H_
#define MSCCLPP_SHMUTILS_H_
#include "mscclpp.h"
mscclppResult_t mscclppShmutilsMapCreate(const char *name, size_t size, int *fd, void **map);
mscclppResult_t mscclppShmutilsMapOpen(const char *name, size_t size, int *fd, void **map);
mscclppResult_t mscclppShmutilsMapClose(const char *name, size_t size, int fd, void *map);
mscclppResult_t mscclppShmutilsMapDestroy(const char *name, size_t size, int fd, void *map);
#endif

View File

@@ -1,24 +1,43 @@
#include "mscclpp.h"
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
void print_usage(const char *prog)
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT\n", prog);
#else
printf("usage: %s IP:PORT rank nranks\n", prog);
#endif
}
int main(int argc, const char *argv[])
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc != 2) {
print_usage(argv[0]);
return -1;
}
MPI_Init(NULL, NULL);
const char *ip_port = argv[1];
int rank;
int world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
#else
if (argc != 4) {
print_usage(argv[0]);
return -1;
}
mscclppComm_t comm;
const char *ip_port = argv[1];
int rank = atoi(argv[2]);
int world_size = atoi(argv[3]);
#endif
mscclppComm_t comm;
mscclppCommInitRank(&comm, world_size, rank, ip_port);
int *buf = (int *)calloc(world_size, sizeof(int));
@@ -46,6 +65,10 @@ int main(int argc, const char *argv[])
return -1;
}
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Finalize();
#endif
printf("Succeeded! %d\n", rank);
return 0;
}

View File

@@ -1,58 +0,0 @@
#include "mscclpp.h"
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
void print_usage(const char *prog)
{
printf("usage: %s IP:PORT\n", prog);
}
int main(int argc, const char *argv[])
{
if (argc != 2) {
print_usage(argv[0]);
return -1;
}
MPI_Init(NULL, NULL);
int rank;
int world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
mscclppComm_t comm;
const char *ip_port = argv[1];
mscclppCommInitRank(&comm, world_size, rank, ip_port);
int *buf = (int *)calloc(world_size, sizeof(int));
if (buf == nullptr) {
printf("calloc failed\n");
return -1;
}
buf[rank] = rank;
mscclppResult_t res = mscclppBootStrapAllGather(comm, buf, sizeof(int));
if (res != mscclppSuccess) {
printf("bootstrapAllGather failed\n");
return -1;
}
for (int i = 0; i < world_size; ++i) {
if (buf[i] != i) {
printf("wrong data: %d, expected %d\n", buf[i], i);
return -1;
}
}
res = mscclppCommDestroy(comm);
if (res != mscclppSuccess) {
printf("mscclppDestroy failed\n");
return -1;
}
MPI_Finalize();
printf("Succeeded! %d\n", rank);
return 0;
}

260
tests/p2p_test.cu Normal file
View File

@@ -0,0 +1,260 @@
#include "mscclpp.h"
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
#include "mpi.h"
#endif // MSCCLPP_USE_MPI_FOR_TESTS
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string>
#define RANKS_PER_NODE 8
#define TEST_CONN_TYPE 1 // 0: P2P(for local)+IB(for remote), 1: IB-Only
// Check CUDA RT calls
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if( err != cudaSuccess ) { \
printf("Cuda failure '%s'", cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while(false)
__global__ void kernel(mscclppDevConn_t devConns, int rank, int world_size)
{
int tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid == 0) {
// Get sending data and send flag
volatile int *data;
for (int i = 0; i < (world_size - 1) * 2; ++i) {
mscclppDevConn_t devConn = &devConns[i];
int tag = devConn->tag;
int rankSend = tag % world_size;
if (rankSend == rank) { // I am a sender
data = (volatile int *)devConn->localBuff;
// We are sending the same data to all peers, so just break here
break;
}
}
// Set my data
*data = rank + 1;
// Set send flags to inform all peers that the data is ready
for (int i = 0; i < (world_size - 1) * 2; ++i) {
mscclppDevConn_t devConn = &devConns[i];
int tag = devConn->tag;
int rankSend = tag % world_size;
if (rankSend == rank) { // I am a sender
*((volatile int *)devConn->localFlag) = 1;
}
}
// Read data from all other peers
for (int i = 0; i < (world_size - 1) * 2; ++i) {
mscclppDevConn_t devConn = &devConns[i];
int tag = devConn->tag;
int rankSend = tag % world_size;
int rankRecv = tag / world_size;
if (rankRecv == rank) { // I am a receiver
if (devConn->remoteBuff == NULL) { // IB
volatile int *localFlag = (volatile int *)devConn->localFlag;
// Wait until the data comes in via proxy
while (*localFlag != 1) {}
} else { // P2P
volatile int *remoteData = (volatile int *)devConn->remoteBuff;
volatile int *remoteFlag = (volatile int *)devConn->remoteFlag;
// Wait until the remote data is set
while (*remoteFlag != 1) {}
// Read remote data
data[rankSend] = remoteData[rankSend];
}
}
}
// Wait until the proxy have sent my data to all peers
for (int i = 0; i < (world_size - 1) * 2; ++i) {
mscclppDevConn_t devConn = &devConns[i];
int tag = devConn->tag;
int rankSend = tag % world_size;
if (rankSend == rank) { // I am a sender
volatile int *flag = (volatile int *)devConn->localFlag;
while (*flag == 1) {}
}
}
}
}
int rankToLocalRank(int rank)
{
return rank % RANKS_PER_NODE;
}
int rankToNode(int rank)
{
return rank / RANKS_PER_NODE;
}
void print_usage(const char *prog)
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
printf("usage: %s IP:PORT [rank nranks]\n", prog);
#else
printf("usage: %s IP:PORT rank nranks\n", prog);
#endif
}
int main(int argc, const char *argv[])
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc != 2 && argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank;
int world_size;
if (argc == 4) {
rank = atoi(argv[2]);
world_size = atoi(argv[3]);
} else {
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
}
#else
if (argc != 4) {
print_usage(argv[0]);
return -1;
}
const char *ip_port = argv[1];
int rank = atoi(argv[2]);
int world_size = atoi(argv[3]);
#endif
mscclppComm_t comm;
mscclppResult_t res = mscclppCommInitRank(&comm, world_size, rank, ip_port);
if (res != mscclppSuccess) {
printf("mscclppCommInitRank failed\n");
return -1;
}
int *data_d;
int *send_flags_d;
int *recv_flags_d;
CUDACHECK(cudaMalloc(&data_d, sizeof(int) * world_size));
CUDACHECK(cudaHostAlloc(&send_flags_d, sizeof(int) * (world_size - 1), cudaHostAllocMapped));
CUDACHECK(cudaHostAlloc(&recv_flags_d, sizeof(int) * (world_size - 1), cudaHostAllocMapped));
CUDACHECK(cudaMemset(data_d, 0, sizeof(int) * world_size));
// CUDACHECK(cudaMemcpy(data_d, tmp, sizeof(int) * 2, cudaMemcpyHostToDevice));
// printf("rank %d CPU: setting data at %p\n", rank, data_d + rank);
memset(send_flags_d, 0, sizeof(int) * (world_size - 1));
memset(recv_flags_d, 0, sizeof(int) * (world_size - 1));
int localRank = rankToLocalRank(rank);
int thisNode = rankToNode(rank);
std::string ibDev = "mlx5_ib" + std::to_string(localRank);
// Read from all other ranks
int idx = 0;
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
int tag = rank * world_size + r;
#if (TEST_CONN_TYPE == 0) // P2P+IB
int node = rankToNode(r);
if (node == thisNode) {
res = mscclppConnect(comm, rank, r, data_d + r, sizeof(int), recv_flags_d + idx, tag, mscclppTransportP2P);
} else {
res = mscclppConnect(comm, rank, r, data_d + r, sizeof(int), recv_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str());
}
#else // (TEST_CONN_TYPE == 1) // IB-Only
res = mscclppConnect(comm, rank, r, data_d + r, sizeof(int), recv_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str());
#endif
if (res != mscclppSuccess) {
printf("mscclppConnect failed\n");
return -1;
}
++idx;
}
// Let others read from me
idx = 0;
for (int r = 0; r < world_size; ++r) {
if (r == rank) continue;
int tag = r * world_size + rank;
#if (TEST_CONN_TYPE == 0) // P2P+IB
int node = rankToNode(r);
if (node == thisNode) {
res = mscclppConnect(comm, r, rank, data_d + rank, sizeof(int), send_flags_d + idx, tag, mscclppTransportP2P);
} else {
res = mscclppConnect(comm, r, rank, data_d + rank, sizeof(int), send_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str());
}
#else // (TEST_CONN_TYPE == 1) // IB-Only
res = mscclppConnect(comm, r, rank, data_d + rank, sizeof(int), send_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str());
#endif
if (res != mscclppSuccess) {
printf("mscclppConnect failed\n");
return -1;
}
++idx;
}
res = mscclppConnectionSetup(comm);
if (res != mscclppSuccess) {
printf("mscclppConnectionSetup failed\n");
return -1;
}
res = mscclppProxyLaunch(comm);
if (res != mscclppSuccess) {
printf("mscclppProxyLaunch failed\n");
return -1;
}
mscclppDevConn_t devConns;
mscclppGetDevConns(comm, &devConns);
kernel<<<1, 1>>>(devConns, rank, world_size);
CUDACHECK(cudaDeviceSynchronize());
res = mscclppProxyStop(comm);
if (res != mscclppSuccess) {
printf("mscclppProxyStop failed\n");
return -1;
}
// Read results from GPU
int *buf = (int *)calloc(world_size, sizeof(int));
if (buf == nullptr) {
printf("calloc failed\n");
return -1;
}
CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost));
bool failed = false;
for (int i = 0; i < world_size; ++i) {
if (buf[i] != i + 1) {
printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1);
failed = true;
}
}
if (failed) {
return -1;
}
res = mscclppCommDestroy(comm);
if (res != mscclppSuccess) {
printf("mscclppDestroy failed\n");
return -1;
}
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc == 2) {
MPI_Finalize();
}
#endif
printf("Succeeded! %d\n", rank);
return 0;
}