diff --git a/Makefile b/Makefile index 0ebc2796..3369dc10 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ MSCCLPP_MINOR := 1 DEBUG ?= 0 VERBOSE ?= 1 TRACE ?= 0 +USE_MPI_FOR_TESTS ?= 0 ######## CUDA CUDA_HOME ?= /usr/local/cuda @@ -81,9 +82,17 @@ CXXFLAGS += -DNVTX_DISABLE endif #### MPI (only for test code) +ifeq ($(USE_MPI_FOR_TESTS), 1) MPI_HOME ?= /usr/local/mpi MPI_INC := -I$(MPI_HOME)/include MPI_LDFLAGS := -L$(MPI_HOME)/lib -lmpi +MPI_MACRO := -D MSCCLPP_USE_MPI_FOR_TESTS +else +MPI_HOME := +MPI_INC := +MPI_LDFLAGS := +MPI_MACRO := +endif #### MSCCL++ BUILDDIR ?= $(abspath ./build) @@ -92,8 +101,10 @@ LIBDIR := lib OBJDIR := obj BINDIR := bin +LDFLAGS := $(NVLDFLAGS) -libverbs + LIBSRCS := $(addprefix src/,debug.cc utils.cc param.cc) -LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc shmutils.cc) +LIBSRCS += $(addprefix src/bootstrap/,init.cc bootstrap.cc socket.cc proxy.cc ib.cc) LIBOBJS := $(patsubst %.cc,%.o,$(LIBSRCS)) LIBOBJTARGETS := $(LIBOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) @@ -105,7 +116,7 @@ LIBSONAME := $(LIBNAME).$(MSCCLPP_MAJOR) LIBTARGET := $(BUILDDIR)/$(LIBDIR)/$(LIBNAME).$(MSCCLPP_MAJOR).$(MSCCLPP_MINOR) TESTSDIR := tests -TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc bootstrap_test_mpi.cc p2p_test_mpi.cu) +TESTSSRCS := $(addprefix $(TESTSDIR)/,bootstrap_test.cc p2p_test.cu) TESTSOBJS := $(patsubst %.cc,%.o,$(TESTSSRCS)) $(patsubst %.cu,%.o,$(TESTSSRCS)) TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%) TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS)) @@ -140,17 +151,17 @@ $(LIBTARGET): $(LIBOBJTARGETS) # Compile .cc tests $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cc @mkdir -p $(@D) - $(CXX) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(CXXFLAGS) -c $< + $(CXX) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(CXXFLAGS) -c $< $(MPI_MACRO) # Compile .cu tests $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)/%.o: $(TESTSDIR)/%.cu @mkdir -p $(@D) - $(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) -c $< + $(NVCC) -o $@ -I$(BUILDDIR)/$(INCDIR) $(MPI_INC) $(NVCUFLAGS) -c $< $(MPI_MACRO) # Test bins $(BUILDDIR)/$(BINDIR)/%: $(BUILDDIR)/$(OBJDIR)/%.o $(LIBTARGET) @mkdir -p $(@D) - $(NVCC) -o $@ $< $(NVLDFLAGS) $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp + $(NVCC) -o $@ $< $(MPI_LDFLAGS) -L$(BUILDDIR)/$(LIBDIR) -lmscclpp clean: rm -rf $(BUILDDIR) diff --git a/scripts/tests.sh b/scripts/tests.sh index da3e853c..f97e3eda 100755 --- a/scripts/tests.sh +++ b/scripts/tests.sh @@ -5,7 +5,6 @@ mpirun -allow-run-as-root \ -tag-output \ -map-by ppr:8:node \ -bind-to numa \ - -x MSCCLPP_DEBUG=INFO \ -x MSCCLPP_DEBUG_SUBSYS=ALL \ -x MSCCLPP_SOCKET_IFNAME=eth0 \ - ./build/bin/tests/p2p_test_mpi 172.17.0.4:50000 + ./build/bin/tests/p2p_test 172.17.0.4:50000 diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index af1ce81e..651ae4db 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -315,7 +315,7 @@ mscclppResult_t bootstrapInit(struct mscclppBootstrapHandle* handle, struct mscc MSCCLPPCHECK(mscclppSocketListen(proxySocket)); MSCCLPPCHECK(mscclppSocketGetAddr(proxySocket, state->peerProxyAddresses+rank)); MSCCLPPCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union mscclppSocketAddress))); - MSCCLPPCHECK(mscclppProxyInit(comm, proxySocket, state->peerProxyAddresses)); + // MSCCLPPCHECK(mscclppProxyInit(comm, proxySocket, state->peerProxyAddresses)); TRACE(MSCCLPP_INIT, "rank %d nranks %d - DONE", rank, nranks); diff --git a/src/bootstrap/ib.cc b/src/bootstrap/ib.cc new file mode 100644 index 00000000..b8defcf7 --- /dev/null +++ b/src/bootstrap/ib.cc @@ -0,0 +1,352 @@ +#include +#include +#include +#include +#include +#include + +#include "debug.h" +#include "alloc.h" +#include "comm.h" +#include "ib.h" + +mscclppResult_t mscclppIbContextCreate(struct mscclppIbContext **ctx, const char *ibDevName) +{ + struct mscclppIbContext *_ctx; + MSCCLPPCHECK(mscclppCalloc(&_ctx, 1)); + MSCCLPPCHECK(mscclppCalloc(&_ctx->wcs, MSCCLPP_IB_CQ_POLL_NUM)); + + std::vector ports; + + int num; + struct ibv_device **devices = ibv_get_device_list(&num); + for (int i = 0; i < num; ++i) { + if (strncmp(devices[i]->name, ibDevName, IBV_SYSFS_NAME_MAX) == 0) { + _ctx->ctx = ibv_open_device(devices[i]); + break; + } + } + ibv_free_device_list(devices); + if (_ctx->ctx == nullptr) { + WARN("ibv_open_device failed (errno %d, device name %s)", errno, ibDevName); + goto fail; + } + + // Check available ports + struct ibv_device_attr devAttr; + if (ibv_query_device(_ctx->ctx, &devAttr) != 0) { + WARN("ibv_query_device failed (errno %d, device name %s)", errno, ibDevName); + goto fail; + } + + for (uint8_t i = 1; i <= devAttr.phys_port_cnt; ++i) { + struct ibv_port_attr portAttr; + if (ibv_query_port(_ctx->ctx, i, &portAttr) != 0) { + WARN("ibv_query_port failed (errno %d, port %d)", errno, i); + goto fail; + } + if (portAttr.state != IBV_PORT_ACTIVE) { + continue; + } + if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND && + portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) { + continue; + } + ports.push_back((int)i); + } + if (ports.size() == 0) { + WARN("no active IB port found"); + goto fail; + } + MSCCLPPCHECK(mscclppCalloc(&_ctx->ports, ports.size())); + _ctx->nPorts = (int)ports.size(); + for (int i = 0; i < _ctx->nPorts; ++i) { + _ctx->ports[i] = ports[i]; + } + + _ctx->cq = ibv_create_cq(_ctx->ctx, MSCCLPP_IB_CQ_SIZE, NULL, NULL, 0); + if (_ctx->cq == NULL) { + WARN("ibv_create_cq failed (errno %d)", errno); + goto fail; + } + + _ctx->pd = ibv_alloc_pd(_ctx->ctx); + if (_ctx->pd == NULL) { + WARN("ibv_alloc_pd failed (errno %d)", errno); + goto fail; + } + + *ctx = _ctx; + return mscclppSuccess; +fail: + *ctx = NULL; + if (_ctx->ports != NULL) { + free(_ctx->ports); + } + free(_ctx->wcs); + free(_ctx); + return mscclppInternalError; +} + +mscclppResult_t mscclppIbContextDestroy(struct mscclppIbContext *ctx) +{ + for (int i = 0; i < ctx->nMrs; ++i) { + ibv_dereg_mr(ctx->mrs[i].mr); + } + for (int i = 0; i < ctx->nQps; ++i) { + ibv_destroy_qp(ctx->qps[i].qp); + } + if (ctx->pd != NULL) { + ibv_dealloc_pd(ctx->pd); + } + if (ctx->cq != NULL) { + ibv_destroy_cq(ctx->cq); + } + if (ctx->ctx != NULL) { + ibv_close_device(ctx->ctx); + } + free(ctx->mrs); + free(ctx->qps); + free(ctx->ports); + free(ctx->wcs); + free(ctx); + return mscclppSuccess; +} + +mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct mscclppIbQp **ibQp, int port/*=-1*/) +{ + if (port < 0) { + port = ctx->ports[0]; + } else { + bool found = false; + for (int i = 0; i < ctx->nPorts; ++i) { + if (ctx->ports[i] == port) { + found = true; + break; + } + } + if (!found) { + WARN("invalid IB port: %d", port); + return mscclppInternalError; + } + } + struct ibv_qp_init_attr qp_init_attr; + std::memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr)); + qp_init_attr.sq_sig_all = 0; + qp_init_attr.send_cq = ctx->cq; + qp_init_attr.recv_cq = ctx->cq; + qp_init_attr.qp_type = IBV_QPT_RC; + qp_init_attr.cap.max_send_wr = MAXCONNECTIONS; + qp_init_attr.cap.max_recv_wr = MAXCONNECTIONS; + qp_init_attr.cap.max_send_sge = 1; + qp_init_attr.cap.max_recv_sge = 1; + qp_init_attr.cap.max_inline_data = 0; + struct ibv_qp *qp = ibv_create_qp(ctx->pd, &qp_init_attr); + if (qp == nullptr) { + WARN("ibv_create_qp failed (errno %d)", errno); + return mscclppInternalError; + } + struct ibv_port_attr port_attr; + if (ibv_query_port(ctx->ctx, port, &port_attr) != 0) { + WARN("ibv_query_port failed (errno %d, port %d)", errno, port); + return mscclppInternalError; + } + + // Register QP to this ctx + qp->context = ctx->ctx; + if (qp->context == NULL) { + WARN("IB context is NULL"); + return mscclppInternalError; + } + ctx->nQps++; + if (ctx->qps == NULL) { + MSCCLPPCHECK(mscclppCalloc(&ctx->qps, MAXCONNECTIONS)); + ctx->maxQps = MAXCONNECTIONS; + } + if (ctx->maxQps < ctx->nQps) { + WARN("too many QPs"); + return mscclppInternalError; + } + struct mscclppIbQp *_ibQp = &ctx->qps[ctx->nQps - 1]; + _ibQp->qp = qp; + _ibQp->info.lid = port_attr.lid; + _ibQp->info.port = port; + _ibQp->info.linkLayer = port_attr.link_layer; + _ibQp->info.qpn = qp->qp_num; + _ibQp->info.mtu = port_attr.active_mtu; + if (port_attr.link_layer != IBV_LINK_LAYER_INFINIBAND) { + union ibv_gid gid; + if (ibv_query_gid(ctx->ctx, port, 0, &gid) != 0) { + WARN("ibv_query_gid failed (errno %d)", errno); + return mscclppInternalError; + } + _ibQp->info.spn = gid.global.subnet_prefix; + } + + struct ibv_qp_attr qp_attr; + std::memset(&qp_attr, 0, sizeof(struct ibv_qp_attr)); + qp_attr.qp_state = IBV_QPS_INIT; + qp_attr.pkey_index = 0; + qp_attr.port_num = port; + qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; + if (ibv_modify_qp(qp, &qp_attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) { + WARN("ibv_modify_qp failed (errno %d)", errno); + return mscclppInternalError; + } + + MSCCLPPCHECK(mscclppCalloc(&_ibQp->wrs, MSCCLPP_IB_MAX_SENDS)); + MSCCLPPCHECK(mscclppCalloc(&_ibQp->sges, MSCCLPP_IB_MAX_SENDS)); + + *ibQp = _ibQp; + + return mscclppSuccess; +} + +mscclppResult_t mscclppIbContextRegisterMr(struct mscclppIbContext *ctx, void *buff, size_t size, struct mscclppIbMr **ibMr) +{ + if (size == 0) { + WARN("invalid size: %zu", size); + return mscclppInvalidArgument; + } + static __thread uintptr_t pageSize = 0; + if (pageSize == 0) { + pageSize = sysconf(_SC_PAGESIZE); + } + uintptr_t addr = reinterpret_cast(buff) & -pageSize; + size_t pages = (size + (reinterpret_cast(buff) - addr) + pageSize - 1) / pageSize; + struct ibv_mr *mr = + ibv_reg_mr(ctx->pd, reinterpret_cast(addr), pages * pageSize, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_RELAXED_ORDERING); + if (mr == nullptr) { + WARN("ibv_reg_mr failed (errno %d)", errno); + return mscclppInternalError; + } + ctx->nMrs++; + if (ctx->mrs == NULL) { + MSCCLPPCHECK(mscclppCalloc(&ctx->mrs, MAXCONNECTIONS)); + ctx->maxMrs = MAXCONNECTIONS; + } + if (ctx->maxMrs < ctx->nMrs) { + WARN("too many MRs"); + return mscclppInternalError; + } + struct mscclppIbMr *_ibMr = &ctx->mrs[ctx->nMrs - 1]; + _ibMr->mr = mr; + _ibMr->buff = buff; + _ibMr->info.addr = (uint64_t)buff; + _ibMr->info.rkey = mr->rkey; + *ibMr = _ibMr; + return mscclppSuccess; +} + +mscclppResult_t mscclppIbContextPollCq(struct mscclppIbContext *ctx, int *wcNum) +{ + int ret = ibv_poll_cq(ctx->cq, MSCCLPP_IB_CQ_POLL_NUM, ctx->wcs); + if (ret < 0) { + WARN("ibv_poll_cq failed (errno %d)", errno); + return mscclppInternalError; + } + ctx->wcn = ret; + *wcNum = ret; + return mscclppSuccess; +} + +////////////////////////////////////////////////////////////////////////////// + +int mscclppIbQp::rtr(const mscclppIbQpInfo *info) +{ + struct ibv_qp_attr qp_attr; + std::memset(&qp_attr, 0, sizeof(struct ibv_qp_attr)); + qp_attr.qp_state = IBV_QPS_RTR; + qp_attr.path_mtu = IBV_MTU_1024; + qp_attr.dest_qp_num = info->qpn; + qp_attr.rq_psn = 0; + qp_attr.max_dest_rd_atomic = 1; + qp_attr.min_rnr_timer = 0x12; + if (info->linkLayer == IBV_LINK_LAYER_ETHERNET) { + qp_attr.ah_attr.is_global = 1; + qp_attr.ah_attr.grh.dgid.global.subnet_prefix = info->spn; + qp_attr.ah_attr.grh.dgid.global.interface_id = info->lid; + qp_attr.ah_attr.grh.flow_label = 0; + qp_attr.ah_attr.grh.sgid_index = 0; + qp_attr.ah_attr.grh.hop_limit = 255; + qp_attr.ah_attr.grh.traffic_class = 0; + } else { + qp_attr.ah_attr.is_global = 0; + qp_attr.ah_attr.dlid = info->lid; + } + qp_attr.ah_attr.sl = 0; + qp_attr.ah_attr.src_path_bits = 0; + qp_attr.ah_attr.port_num = info->port; + return ibv_modify_qp(this->qp, &qp_attr, + IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER); +} + +int mscclppIbQp::rts() +{ + struct ibv_qp_attr qp_attr; + std::memset(&qp_attr, 0, sizeof(struct ibv_qp_attr)); + qp_attr.qp_state = IBV_QPS_RTS; + qp_attr.timeout = 18; + qp_attr.retry_cnt = 7; + qp_attr.rnr_retry = 7; + qp_attr.sq_psn = 0; + qp_attr.max_rd_atomic = 1; + return ibv_modify_qp(this->qp, &qp_attr, + IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); +} + +int mscclppIbQp::stageSend(struct mscclppIbMr *ibMr, const mscclppIbMrInfo *info, int size, + uint64_t wrId, unsigned int immData, int offset) +{ + if (this->wrn >= MSCCLPP_IB_MAX_SENDS) { + return -1; + } + int wrn = this->wrn; + struct ibv_send_wr *wr_ = &this->wrs[wrn]; + struct ibv_sge *sge_ = &this->sges[wrn]; + std::memset(wr_, 0, sizeof(struct ibv_send_wr)); + std::memset(sge_, 0, sizeof(struct ibv_sge)); + wr_->wr_id = wrId; + wr_->sg_list = sge_; + wr_->num_sge = 1; + wr_->opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr_->imm_data = immData; + wr_->send_flags = IBV_SEND_SIGNALED; + wr_->wr.rdma.remote_addr = info->addr; + wr_->wr.rdma.rkey = info->rkey; + wr_->next = nullptr; + sge_->addr = (uint64_t)(ibMr->buff) + (uint64_t)offset; + sge_->length = size; + sge_->lkey = ibMr->mr->lkey; + if (wrn > 0) { + this->wrs[wrn - 1].next = wr_; + } + this->wrn++; + return this->wrn; +} + +int mscclppIbQp::postSend() +{ + struct ibv_send_wr *bad_wr; + int ret = ibv_post_send(this->qp, this->wrs, &bad_wr); + if (ret != 0) { + return ret; + } + std::memset(this->wrs, 0, sizeof(struct ibv_send_wr) * this->wrn); + std::memset(this->sges, 0, sizeof(struct ibv_sge) * this->wrn); + this->wrn = 0; + return 0; +} + +int mscclppIbQp::postRecv(uint64_t wrId) +{ + struct ibv_recv_wr wr, *bad_wr; + wr.wr_id = wrId; + wr.sg_list = nullptr; + wr.num_sge = 0; + wr.next = nullptr; + return ibv_post_recv((struct ibv_qp *)this->qp, &wr, &bad_wr); +} diff --git a/src/bootstrap/init.cc b/src/bootstrap/init.cc index 32c8592b..4a2d60a6 100644 --- a/src/bootstrap/init.cc +++ b/src/bootstrap/init.cc @@ -1,7 +1,6 @@ #include "mscclpp.h" #include "bootstrap.h" #include "core.h" -#include "shmutils.h" #include #include @@ -128,6 +127,12 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){ if (comm == NULL) return mscclppSuccess; + for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { + if (comm->ibContext[i]) { + MSCCLPPCHECK(mscclppIbContextDestroy(comm->ibContext[i])); + } + } + if (comm->bootstrap) MSCCLPPCHECK(bootstrapClose(comm->bootstrap)); @@ -137,19 +142,50 @@ mscclppResult_t mscclppCommDestroy(mscclppComm_t comm){ } MSCCLPP_API(mscclppResult_t, mscclppConnect, mscclppComm_t comm, int rankRecv, int rankSend, - void *buff, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev); -mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, int *flag, int tag, - mscclppTransport_t transportType, const char *ibDev/*=NULL*/) + void *buff, size_t buffSize, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev); +mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, size_t buffSize, + int *flag, int tag, mscclppTransport_t transportType, const char *ibDev/*=NULL*/) { if (comm->rank == rankRecv || comm->rank == rankSend) { struct mscclppConn *conn = &comm->conns[comm->nConns++]; conn->transport = transportType; - conn->localRank = comm->rank; - conn->ibDev = ibDev; + conn->rankSend = rankSend; + conn->rankRecv = rankRecv; conn->tag = tag; conn->buff = buff; + conn->buffSize = buffSize; conn->flag = flag; - conn->remoteRank = (comm->rank == rankRecv) ? rankSend : rankRecv; + conn->ibCtx = NULL; + conn->ibQp = NULL; + + if (ibDev != NULL) { + // Check if an IB context exists + int ibDevIdx = -1; + int firstNullIdx = -1; + for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { + if (comm->ibContext[i] == NULL) { + if (firstNullIdx == -1) { + firstNullIdx = i; + } + } else if (strncmp(comm->ibContext[i]->ctx->device->name, ibDev, IBV_SYSFS_NAME_MAX) == 0) { + ibDevIdx = i; + break; + } + } + if (ibDevIdx == -1) { + // Create a new context. + if (firstNullIdx == -1) { + WARN("Too many IB devices"); + return mscclppInvalidUsage; + } + ibDevIdx = firstNullIdx; + if (mscclppIbContextCreate(&comm->ibContext[ibDevIdx], ibDev) != mscclppSuccess) { + WARN("Failed to create IB context"); + return mscclppInternalError; + } + } + conn->ibCtx = comm->ibContext[ibDevIdx]; + } } return mscclppSuccess; } @@ -157,68 +193,94 @@ mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, v MSCCLPP_API(mscclppResult_t, mscclppConnectionSetup, mscclppComm_t comm); mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm) { - struct ipcMemHandleInfo { - cudaIpcMemHandle_t handle_buff; - cudaIpcMemHandle_t handle_flag; - int tag; - int valid; + struct connInfo { + cudaIpcMemHandle_t handleBuff; + cudaIpcMemHandle_t handleFlag; + mscclppIbQpInfo qpInfo; + mscclppIbMrInfo mrInfo; }; - size_t shmSize = MAXCONNECTIONS * sizeof(struct ipcMemHandleInfo); - int fd; - struct ipcMemHandleInfo *handleInfos; - std::string shmname = mscclppShmFileName(comm, comm->localRank); - MSCCLPPCHECK(mscclppShmutilsMapCreate(shmname.c_str(), shmSize, &fd, (void **)&handleInfos)); - + // Send info to peers for (int i = 0; i < comm->nConns; ++i) { struct mscclppConn *conn = &comm->conns[i]; - CUDACHECK(cudaIpcGetMemHandle(&handleInfos[i].handle_buff, conn->buff)); - CUDACHECK(cudaIpcGetMemHandle(&handleInfos[i].handle_flag, conn->flag)); - handleInfos[i].tag = conn->tag; - handleInfos[i].valid = 1; + struct connInfo cInfo; + if (conn->transport == mscclppTransportP2P) { + CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleBuff, conn->buff)); + CUDACHECK(cudaIpcGetMemHandle(&cInfo.handleFlag, conn->flag)); + } else if (conn->transport == mscclppTransportIB) { + struct mscclppIbContext *ibCtx = conn->ibCtx; + if (conn->ibQp == NULL) { + MSCCLPPCHECK(mscclppIbContextCreateQp(ibCtx, &conn->ibQp)); + } + MSCCLPPCHECK(mscclppIbContextRegisterMr(ibCtx, conn->buff, conn->buffSize, &conn->ibMr)); + cInfo.qpInfo = conn->ibQp->info; + cInfo.mrInfo = conn->ibMr->info; + } + int peer = conn->rankSend == comm->rank ? conn->rankRecv : conn->rankSend; + MSCCLPPCHECK(bootstrapSend(comm->bootstrap, peer, conn->tag, &cInfo, sizeof(cInfo))); } - // Local intra-node barrier: wait for all local ranks to have written their memory handles - MSCCLPPCHECK(bootstrapBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0])); - + // Allocate connection info to be shared with GPU MSCCLPPCHECK(mscclppCudaHostCalloc(&comm->devConns, comm->nConns)); - for (int r = 0; r < comm->localRanks; ++r) { - if (r == comm->localRank) - continue; - int fd_r; - struct ipcMemHandleInfo *handleInfos_r; - std::string shmname_r = mscclppShmFileName(comm, r); - MSCCLPPCHECK(mscclppShmutilsMapOpen(shmname_r.c_str(), shmSize, &fd_r, (void **)&handleInfos_r)); + // Recv info from peers + for (int i = 0; i < comm->nConns; ++i) { + struct mscclppConn *conn = &comm->conns[i]; + struct mscclppDevConn *devConn = &comm->devConns[i]; - std::map> remoteHandles; - for (int i = 0; i < MAXCONNECTIONS; ++i) { - if (handleInfos_r[i].valid != 1) { - break; + devConn->tag = conn->tag; + devConn->localBuff = conn->buff; + devConn->localFlag = conn->flag; + + struct connInfo cInfo; + int peer = conn->rankSend == comm->rank ? conn->rankRecv : conn->rankSend; + MSCCLPPCHECK(bootstrapRecv(comm->bootstrap, peer, conn->tag, &cInfo, sizeof(cInfo))); + if (conn->transport == mscclppTransportP2P) { + CUDACHECK(cudaIpcOpenMemHandle(&devConn->remoteBuff, cInfo.handleBuff, cudaIpcMemLazyEnablePeerAccess)); + CUDACHECK(cudaIpcOpenMemHandle((void **)&devConn->remoteFlag, cInfo.handleFlag, cudaIpcMemLazyEnablePeerAccess)); + } else if (conn->transport == mscclppTransportIB) { + if (conn->ibQp->rtr(&cInfo.qpInfo) != 0) { + WARN("Failed to transition QP to RTR"); + return mscclppInvalidUsage; } - remoteHandles[handleInfos_r[i].tag] = std::make_pair(handleInfos_r[i].handle_buff, handleInfos_r[i].handle_flag); - } - - for (int i = 0; i < comm->nConns; ++i) { - struct mscclppConn *conn = &comm->conns[i]; - auto it = remoteHandles.find(conn->tag); - if (it != remoteHandles.end()) { - comm->devConns[i].tag = conn->tag; - comm->devConns[i].localBuff = conn->buff; - comm->devConns[i].localFlag = conn->flag; - CUDACHECK(cudaIpcOpenMemHandle(&comm->devConns[i].remoteBuff, it->second.first, cudaIpcMemLazyEnablePeerAccess)); - CUDACHECK(cudaIpcOpenMemHandle((void **)&comm->devConns[i].remoteFlag, it->second.second, cudaIpcMemLazyEnablePeerAccess)); + if (conn->ibQp->rts() != 0) { + WARN("Failed to transition QP to RTS"); + return mscclppInvalidUsage; } + conn->ibRemoteMrInfo = cInfo.mrInfo; + devConn->remoteBuff = NULL; + CUDACHECK(cudaMalloc(&devConn->remoteFlag, sizeof(int))); } - - MSCCLPPCHECK(mscclppShmutilsMapClose(shmname_r.c_str(), shmSize, fd_r, handleInfos_r)); } - // Local intra-node barrier: wait for all local ranks to have read all memory handles - MSCCLPPCHECK(bootstrapBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0])); + return mscclppSuccess; +} - MSCCLPPCHECK(mscclppShmutilsMapDestroy(shmname.c_str(), shmSize, fd, handleInfos)); +MSCCLPP_API(mscclppResult_t, mscclppProxyLaunch, mscclppComm_t comm); +mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm) +{ + MSCCLPPCHECK(mscclppProxyCreate(comm)); + return mscclppSuccess; +} +MSCCLPP_API(mscclppResult_t, mscclppProxyStop, mscclppComm_t comm); +mscclppResult_t mscclppProxyStop(mscclppComm_t comm) +{ + MSCCLPPCHECK(mscclppProxyDestroy(comm)); + return mscclppSuccess; +} + +MSCCLPP_API(mscclppResult_t, mscclppGetLocalRank, mscclppComm_t comm, int *localRank); +mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank) +{ + *localRank = comm->localRank; + return mscclppSuccess; +} + +MSCCLPP_API(mscclppResult_t, mscclppGetNodeFromRank, mscclppComm_t comm, int rank, int *node); +mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node) +{ + *node = comm->rankToNode[rank]; return mscclppSuccess; } diff --git a/src/bootstrap/proxy.cc b/src/bootstrap/proxy.cc index 189a1587..cee63587 100644 --- a/src/bootstrap/proxy.cc +++ b/src/bootstrap/proxy.cc @@ -1,1245 +1,145 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - #include "comm.h" -// #include "info.h" -// #include "collectives.h" #include "socket.h" -// #include "shm.h" -// #include "profiler.h" -// #define ENABLE_TIMER 0 -// #include "timer.h" +#include "debug.h" +#include "alloc.h" +#include "ib.h" +#include "checks.h" #include +#include -// enum { proxyRecv=0, proxySend=1 }; +struct proxyArgs { + struct mscclppComm* comm; + struct mscclppIbContext* ibCtx; + volatile int* stop; +}; -// static bool NeedProxy(int type, int pattern, int root, struct mscclppRing* ring, int nranks) { -// if (pattern == mscclppPatternRing || pattern == mscclppPatternRingTwice) return true; +void* mscclppProxyService(void* _args) { + struct proxyArgs *args = (struct proxyArgs *)_args; + struct mscclppComm *comm = args->comm; + struct mscclppIbContext *ibCtx = args->ibCtx; + volatile int *stop = args->stop; + free(_args); -// /* In chains, one rank does not need a proxy. Let's figure out which one it is */ -// /* Which index in the reorganized rings should we compare root against */ -// const int myrank = 0, nextrank = 1, prevrank = nranks-1; -// int index = pattern == mscclppPatternPipelineFrom ? -// /* no recv / no send if root = */ -// /* bcast */ (type == proxyRecv ? myrank : nextrank ): -// /* reduce */ (type == proxyRecv ? prevrank : myrank ); -// int rank = ring->userRanks[index]; -// return (root != rank); -// } + enum { + SEND_STATE_INIT, + SEND_STATE_INPROGRESS + }; -// #define PROXYARGS_ALLOCATE_SIZE MSCCLPP_MAX_OPS -// struct mscclppProxyPool { -// struct mscclppProxyPool *next; -// struct mscclppProxyArgs elems[PROXYARGS_ALLOCATE_SIZE]; -// }; + int rank = comm->rank; + std::map recvTagToConn; + std::map sendTagToConn; + std::map sendConnToState; + for (int i = 0; i < comm->nConns; ++i) { + struct mscclppConn *conn = &comm->conns[i]; + if (conn->transport != mscclppTransportIB) continue; + if (conn->ibCtx != ibCtx) continue; + if (conn->rankRecv == rank) { + recvTagToConn[conn->tag] = conn; + } else if (conn->rankSend == rank) { + sendTagToConn[conn->tag] = conn; + sendConnToState[conn] = SEND_STATE_INIT; + } + } + // Initial post recv + for (auto &pair : recvTagToConn) { + struct mscclppConn *conn = pair.second; + int tag = pair.first; + if (conn->ibQp->postRecv((uint64_t)-tag) != 0) { + WARN("postRecv failed: errno %d", errno); + } + } + // TODO(chhwang): run send and recv in different threads for lower latency + int wcNum; + while (*stop == 0) { + // Try send + for (auto &pair : sendConnToState) { + if (pair.second == SEND_STATE_INPROGRESS) continue; + // TODO(chhwang): do we need a thread per flag? + struct mscclppConn *conn = pair.first; + volatile int *flag = (volatile int *)conn->flag; + if (*flag == 0) continue; + // Do send + conn->ibQp->stageSend(conn->ibMr, &conn->ibRemoteMrInfo, conn->buffSize, + (uint64_t)conn->tag, (unsigned int)conn->tag); + if (conn->ibQp->postSend() != 0) { + WARN("postSend failed: errno %d", errno); + } + pair.second = SEND_STATE_INPROGRESS; + } -// static mscclppResult_t allocateArgs(struct mscclppProxyProgressState* state, struct mscclppProxyArgs** argsptr) { -// struct mscclppProxyArgs* elem; -// if (state->pool == NULL) { -// // Allocate a new pool of elements. Make sure we allocate the memory close -// // to the network thread -// struct mscclppProxyPool* newPool; -// MSCCLPPCHECK(mscclppCalloc(&newPool, 1)); + // Poll completions + mscclppIbContextPollCq(ibCtx, &wcNum); + if (wcNum > 0) { + for (int i = 0; i < wcNum; ++i) { + struct ibv_wc *wc = &ibCtx->wcs[i]; + if (wc->status != IBV_WC_SUCCESS) { + WARN("wc status %d", wc->status); + } + if (((int)wc->wr_id) < 0) { + // recv + auto search = recvTagToConn.find(wc->imm_data); + if (search == recvTagToConn.end()) { + WARN("unexpected imm_data %d", wc->imm_data); + } + struct mscclppConn *conn = search->second; + if (conn->ibQp->postRecv((uint64_t)-wc->imm_data) != 0) { + WARN("postRecv failed: errno %d", errno); + } + volatile int *flag = (volatile int *)conn->flag; + *flag = 1; + } else { + // send + int tag = (int)wc->wr_id; + auto search = sendTagToConn.find(tag); + if (search == sendTagToConn.end()) { + WARN("unexpected tag %d", tag); + } + struct mscclppConn *conn = search->second; + volatile int *flag = (volatile int *)conn->flag; + *flag = 0; + sendConnToState[conn] = SEND_STATE_INIT; + // WARN("send done rank %d", rank); + } + } + } + } + *stop = 0; + WARN("Proxy exits: rank %d", rank); + return NULL; +} -// struct mscclppProxyArgs* newElems = newPool->elems; -// // Chain newly allocated elements -// for (int i=0; ipool = newElems; -// // Save the pool memory block for later resource release -// newPool->next = state->pools; -// state->pools = newPool; -// } -// elem = state->pool; -// state->pool = state->pool->next; -// elem->next = elem->nextPeer = NULL; -// *argsptr = elem; +// mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses) { +// comm->proxyState.listenSock = sock; +// comm->proxyState.peerAddresses = peerAddresses; // return mscclppSuccess; // } -// //#define DEBUG_PROXY 1 -// #ifdef DEBUG_PROXY -// #define DEBUG_PROXY_PRINT printf -// #else -// #define DEBUG_PROXY_PRINT(...) -// #endif - -// #define OP_INDEX(op) ((op) ? (op)-state->pools->elems : -1) -// #define OP_SEEN 0x100000 - -// mscclppResult_t getOpIndex(struct mscclppProxyArgs* op, struct mscclppProxyProgressState* state, int* poolIndex, int* opIndex) { -// struct mscclppProxyPool* pool = state->pools; -// int p = 0; -// while (pool) { -// uint64_t o = op-pool->elems; -// if (o < PROXYARGS_ALLOCATE_SIZE) { -// *opIndex = o; -// *poolIndex = p; -// return mscclppSuccess; -// } -// pool = pool->next; -// p++; -// } -// WARN("Could not find pool of op %p\n", op); -// return mscclppInternalError; -// } - -// mscclppResult_t printProxyOp(struct mscclppProxyArgs* op, int poolIndex, int opIndex) { -// printf("[%d-%d|%ld| %s", poolIndex, opIndex, op->opCount, op->pattern == mscclppPatternSend ? "Send" : op->pattern == mscclppPatternRecv ? "Recv" : "Coll"); -// for (int s=0; snsubs; s++) { -// struct mscclppProxySubArgs* sub = op->subs+s; -// if (op->state == mscclppProxyOpProgress) { -// char status = ' '; -// if (op->pattern == mscclppPatternRecv) { -// if (sub->posted < sub->nsteps && sub->posted < sub->done + MSCCLPP_STEPS) status = 'I'; // Init -// else if (sub->received < sub->posted) status = 'R'; // Receiving -// else if (sub->received < sub->transmitted) status = 'R'; // Receiving -// else if (sub->transmitted < sub->received) status = 'F'; // Flushing -// else if (sub->done < sub->transmitted) status = 'G'; // Waiting on GPU -// else status = 'D'; // Done -// } else if (op->pattern == mscclppPatternSend) { -// if (sub->posted < sub->nsteps && sub->posted < sub->done + MSCCLPP_STEPS) status = 'I'; // Init -// else if (sub->transmitted < sub->posted) status = 'G'; // Waiting on GPU -// else if (sub->done < sub->transmitted) status = 'S'; // Sending -// else status = 'D'; // Done -// } -// printf(" %d%c/%d", sub->peer, status, sub->channelId); -// } else { -// printf(" %d/%d", sub->peer, sub->channelId); -// } -// } -// printf("]"); -// return mscclppSuccess; -// } -// mscclppResult_t dumpProxyState(struct mscclppProxyProgressState* state) { -// struct mscclppProxyArgs* op = state->active; -// int poolIndex, opIndex; -// printf("ACTIVE OPS\n"); -// while (op) { -// MSCCLPPCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); -// if (op->state & OP_SEEN) { -// WARN("List loop at element %d-%d", poolIndex, opIndex); -// } -// MSCCLPPCHECK(printProxyOp(op, poolIndex, opIndex)); -// op->state |= OP_SEEN; -// printf("\n"); -// struct mscclppProxyArgs* nextOp = op->nextPeer; -// while (nextOp) { -// MSCCLPPCHECK(getOpIndex(nextOp, state, &poolIndex, &opIndex)); -// if (nextOp->state & OP_SEEN) { -// WARN("List loop at element %d-%d", poolIndex, opIndex); -// } -// printf("| `-> "); -// MSCCLPPCHECK(printProxyOp(nextOp, poolIndex, opIndex)); -// nextOp->state |= OP_SEEN; -// printf("\n"); -// if (nextOp->next) { -// WARN("Inactive op has next set!\n"); -// } -// nextOp = nextOp->nextPeer; -// } -// if (op->nextPeer == NULL) printf("|\n"); -// op = op->next; -// printf("v\n"); -// } -// printf("[X]\n"); - -// # if 0 -// printf("FREE OPS\n"); -// op = state->pool; -// while (op) { -// MSCCLPPCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); -// if (op->state & OP_SEEN) { -// WARN("List loop at element %d-%d", poolIndex, opIndex); -// } -// MSCCLPPCHECK(printProxyOp(op, poolIndex, opIndex)); -// op->state |= OP_SEEN; -// printf("->"); -// op = op->next; -// } -// printf("[X]\n"); -// #else -// op = state->pool; -// while (op) { -// MSCCLPPCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); -// if (op->state & OP_SEEN) { -// WARN("List loop at element %d-%d", poolIndex, opIndex); -// } -// op->state |= OP_SEEN; -// op = op->next; -// } -// #endif - -// struct mscclppProxyPool* pool = state->pools; -// poolIndex = 0; -// while (pool) { -// struct mscclppProxyArgs* elem = pool->elems; -// for (int e=0; estate & OP_SEEN) == 0) { -// printf("Elem %d-%d is not in any list:\n", poolIndex, e); -// MSCCLPPCHECK(printProxyOp(elem, poolIndex, e)); -// printf("\n"); -// } else { -// elem->state -= OP_SEEN; -// } -// } -// pool = pool->next; -// poolIndex++; -// } -// return mscclppSuccess; -// } - -// static mscclppResult_t mscclppProxyOpToArgs(struct mscclppProxyOp* op, struct mscclppProxyArgs* args, int subIndex) { -// struct mscclppProxySubArgs* sub = args->subs+subIndex; -// if (subIndex >= MSCCLPP_PROXY_MAX_SUBS) { -// WARN("Proxy append out of bounds"); -// return mscclppInternalError; -// } - -// //memset(sub, 0, sizeof(struct mscclppProxySubArgs)); -// sub->connection = op->connection; -// sub->channelId = op->channelId; -// sub->nsteps = op->nsteps; -// sub->nbytes = op->nbytes; -// sub->peer = op->root; -// args->nsubs = subIndex+1; -// if (subIndex) { -// if ((args->sliceSteps != op->sliceSteps) || -// (args->chunkSteps != op->chunkSteps) || -// (args->protocol != op->protocol) || -// (args->dtype != op->dtype) || -// (args->redOp != op->redOp)) { -// WARN("Proxy append mismatch"); -// return mscclppInternalError; -// } -// if (args->state != mscclppProxyOpReady) { -// WARN("Proxy append on running operation"); -// return mscclppInternalError; -// } -// return mscclppSuccess; -// } -// //memset(&args->progress, 0, sizeof(struct mscclppProxyArgs)-offsetof(struct mscclppProxyArgs, progress)); -// args->done = 0; -// args->opCount = op->opCount; -// args->sliceSteps = op->sliceSteps; -// args->chunkSteps = op->chunkSteps; -// args->chunkSize = op->chunkSize; -// args->dtype = op->dtype; -// args->redOp = op->redOp; -// args->pattern = op->pattern; -// args->protocol = op->protocol; -// args->state = mscclppProxyOpReady; -// args->progress = op->connection->tcomm->proxyProgress; -// args->proxyAppendPtr = op->connection->proxyAppendPtr; -// return mscclppSuccess; -// } - -// static mscclppResult_t ProxyAppend(struct mscclppProxyProgressState* state, struct mscclppProxyOp* op) { -// struct mscclppProxyConnection* connection = op->connection; -// int shared = connection->shared; -// struct mscclppProxyArgs* args = *connection->proxyAppendPtr; - -// if (args) { -// if (shared && args->opCount == op->opCount) { -// MSCCLPPCHECK(mscclppProxyOpToArgs(op, args, args->nsubs)); -// DEBUG_PROXY_PRINT("Insert (%d/%5ld/%5ld) as group with %5ld\n", shared, args->opCount, op->opCount, OP_INDEX(args)); -// } else { -// struct mscclppProxyArgs* prevArgs = args; -// MSCCLPPCHECK(allocateArgs(state, &args)); -// MSCCLPPCHECK(mscclppProxyOpToArgs(op, args, 0)); -// prevArgs->nextPeer = args; -// DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld\n", OP_INDEX(args), shared, prevArgs->opCount, args->opCount, OP_INDEX(prevArgs)); -// *(args->proxyAppendPtr) = args; -// } -// } else { -// // Nothing running for that peer. Add to the list -// MSCCLPPCHECK(allocateArgs(state, &args)); -// MSCCLPPCHECK(mscclppProxyOpToArgs(op, args, 0)); -// if (state->active == NULL) { -// // Create the list -// DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element\n", OP_INDEX(args), shared, args->opCount); -// state->active = args; -// } else { -// // Append element at the end of the list -// struct mscclppProxyArgs* last = state->active; -// while (last->next) last = last->next; -// last->next = args; -// DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element\n", OP_INDEX(args), shared, args->opCount); -// } -// *(args->proxyAppendPtr) = args; -// } -// return mscclppSuccess; -// } - -// mscclppResult_t mscclppProxyPost(struct mscclppProxyOpsPool* pool, int nextOps, int nextOpsEnd) { -// pthread_mutex_lock(&pool->mutex); -// if (pool->nextOps == -1) { -// pool->nextOps = nextOps; -// pthread_cond_signal(&pool->cond); -// } else { -// pool->ops[pool->nextOpsEnd].next = nextOps; -// } -// pool->nextOpsEnd = nextOpsEnd; -// pthread_mutex_unlock(&pool->mutex); -// return mscclppSuccess; -// } - -// mscclppResult_t mscclppLocalOpAppend(struct mscclppComm* comm, struct mscclppProxyConnector* proxyConn, struct mscclppProxyOp* proxyOp) { -// struct mscclppProxyOps* proxyOps = proxyConn->comm->proxyState.proxyOps; -// if (proxyOps == NULL) return mscclppInternalError; -// proxyOps += proxyConn->localRank; -// struct mscclppProxyOpsPool* pool = proxyOps->pool; - -// TIME_START(0); -// int opIndex = proxyOps->freeOp; -// struct mscclppProxyOp* op; -// if (opIndex != -1) { -// op = pool->ops+opIndex; -// proxyOps->freeOp = op->next; -// } else { -// int freeOp; -// while ((freeOp = pool->freeOps[comm->localRank]) == -1) sched_yield(); -// int freeOpNew; -// while ((freeOpNew = __sync_val_compare_and_swap(pool->freeOps+comm->localRank, freeOp, -1)) != freeOp) freeOp = freeOpNew; -// opIndex = freeOp; -// op = pool->ops+opIndex; -// proxyOps->freeOp = op->next; -// } -// if (op->next != -1) __builtin_prefetch(pool->ops+op->next); // Prefetch next free op -// memcpy(op, proxyOp, sizeof(struct mscclppProxyOp)); -// op->next = -1; -// op->connection = proxyConn->connection; -// if (proxyOps->nextOps == -1) { -// proxyOps->nextOps = proxyOps->nextOpsEnd = opIndex; -// } else { -// pool->ops[proxyOps->nextOpsEnd].next = opIndex; -// proxyOps->nextOpsEnd = opIndex; -// } -// if (++proxyOps->count == MAX_OPS_PER_PEER) { -// // Post what we have so far to free some ops in the pool -// // Do not post last operations as we could have more coming with the same opCount, and posting -// // them in different batches would break proxyArgs aggregation with subs. -// uint64_t lastOpCount = pool->ops[proxyOps->nextOpsEnd].opCount; -// int lastOp = -1; -// int toSend = 0; -// int ops = 0; -// for (int op= proxyOps->nextOps; op != proxyOps->nextOpsEnd; op=pool->ops[op].next) { -// ops++; -// if (pool->ops[op].opCount != lastOpCount) { -// lastOp = op; -// toSend = ops; -// } -// } -// if (lastOp == -1) { -// WARN("Unable to post incomplete proxy op chain %d..%d (opCount %ld)\n", proxyOps->nextOps, proxyOps->nextOpsEnd, lastOpCount); -// return mscclppInternalError; -// } -// // Cut chain at lastOp -// int nextOps = proxyOps->nextOps; -// proxyOps->nextOps = pool->ops[lastOp].next; -// pool->ops[lastOp].next = -1; -// MSCCLPPCHECK(mscclppProxyPost(proxyOps->pool, nextOps, lastOp)); -// proxyOps->count -= toSend; -// } -// TIME_STOP(0); -// return mscclppSuccess; -// } - -// static mscclppResult_t SaveProxy(struct mscclppChannel* channel, int type, int peer, struct mscclppProxyOp* op, int connIndex, bool* justInquire) { -// if (peer < 0) return mscclppSuccess; - -// struct mscclppChannelPeer* peerComm = channel->peers+peer; -// struct mscclppConnector* connector = type == proxyRecv ? peerComm->recv+connIndex : peerComm->send+connIndex; -// if (connector->transportComm == NULL) { -// WARN("Rank %d has no transport for %s peer %d on channel %d/%d", connector->comm->rank, -// type == proxyRecv ? "recv" : "send", peer, channel->id, connIndex); -// return mscclppInternalError; -// } -// if (connector->transportComm->proxyProgress == NULL) return mscclppSuccess; - -// if (justInquire) *justInquire = true; -// else { -// MSCCLPPCHECK(mscclppLocalOpAppend(connector->comm, &connector->proxyConn, op)); -// } -// return mscclppSuccess; -// } - -// // justInquire != nullptr means don't actually do anything, just assertain need of -// // mscclppProxySaveOp for this op. -// mscclppResult_t mscclppProxySaveOp(struct mscclppComm* comm, struct mscclppProxyOp* op, bool* justInquire) { -// struct mscclppChannel* channel = &comm->channels[op->channelId]; -// if (justInquire) *justInquire = false; -// switch (op->pattern) { -// case mscclppPatternRing: -// case mscclppPatternRingTwice: -// case mscclppPatternPipelineFrom: -// case mscclppPatternPipelineTo: { -// struct mscclppRing* ring = &channel->ring; -// if (NeedProxy(proxyRecv, op->pattern, op->root, ring, comm->nRanks)) { -// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, ring->prev, op, 0, justInquire)); -// } -// if (NeedProxy(proxySend, op->pattern, op->root, ring, comm->nRanks)) { -// MSCCLPPCHECK(SaveProxy(channel, proxySend, ring->next, op, 0, justInquire)); -// } -// } break; -// case mscclppPatternTreeUp: -// case mscclppPatternTreeDown: -// case mscclppPatternTreeUpDown: { -// if (op->pattern != mscclppPatternTreeDown) { // Tree up -// struct mscclppTree* tree = &channel->tree; -// for (int i=0; idown[i], op, 0, justInquire)); -// } -// MSCCLPPCHECK(SaveProxy(channel, proxySend, tree->up, op, 0, justInquire)); -// } -// if (op->pattern != mscclppPatternTreeUp) { // Tree down -// struct mscclppTree* tree = &channel->tree; -// for (int i=0; i< MSCCLPP_MAX_TREE_ARITY; i++) { -// MSCCLPPCHECK(SaveProxy(channel, proxySend, tree->down[i], op, 0, justInquire)); -// } -// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, tree->up, op, 0, justInquire)); -// } -// } break; -// case mscclppPatternCollnetChain: { -// MSCCLPPCHECK(SaveProxy(channel, proxySend, channel->collnetChain.up, op, 1, justInquire)); -// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, channel->collnetChain.up, op, 0, justInquire)); -// } break; -// case mscclppPatternCollnetDirect: { -// MSCCLPPCHECK(SaveProxy(channel, proxySend, channel->collnetDirect.out, op, 1, justInquire)); -// MSCCLPPCHECK(SaveProxy(channel, proxyRecv, channel->collnetDirect.out, op, 0, justInquire)); -// } break; -// case mscclppPatternSend: -// case mscclppPatternRecv: { -// if (op->root == comm->rank) return mscclppSuccess; -// MSCCLPPCHECK(SaveProxy(channel, op->pattern == mscclppPatternSend ? proxySend : proxyRecv, op->root, op, 1, justInquire)); -// } break; -// } -// return mscclppSuccess; -// } - -// MSCCLPP_PARAM(ChunkSize, "CHUNK_SIZE", 0); - -// mscclppResult_t mscclppProxyComputeP2p(struct mscclppInfo* info, struct mscclppProxyOp* op) { -// memset(op, 0, sizeof(struct mscclppProxyOp)); -// int channelId = info->channelId; -// struct mscclppChannel* channel = info->comm->channels+channelId; -// op->channelId = channelId; -// op->sliceSteps = 1; -// op->chunkSteps = 1; -// op->dtype = info->datatype; -// op->protocol = info->protocol; - -// int stepSize = info->comm->buffSizes[op->protocol]/MSCCLPP_STEPS; - -// if (op->protocol == MSCCLPP_PROTO_SIMPLE) stepSize = info->comm->p2pChunkSize; -// info->chunkSize = stepSize; -// op->root = info->root; - -// struct mscclppChannelPeer* peer = channel->peers + op->root; -// if (info->coll == mscclppFuncSend) { -// op->pattern = mscclppPatternSend; -// if (op->root != info->comm->rank && peer->send[1].transportComm == &netTransport.send) { -// // Tune chunk size for the network -// if (info->count < stepSize) info->chunkSize /= 4; -// else if (info->count < 8*stepSize) info->chunkSize /= 2; -// } -// } else if (info->coll == mscclppFuncRecv) { -// op->pattern = mscclppPatternRecv; -// if (op->root != info->comm->rank && peer->recv[1].transportComm == &netTransport.recv) { -// // Tune chunk size for the network -// if (info->count < stepSize) info->chunkSize /= 4; -// else if (info->count < 8*stepSize) info->chunkSize /= 2; -// } -// } else { -// WARN("P2p operation is neither send or recv"); -// return mscclppInternalError; -// } -// if (mscclppParamChunkSize() != 0) { -// info->chunkSize = mscclppParamChunkSize(); -// } -// op->chunkSize = info->chunkSize; - -// // Compute nSteps for proxies -// int chunkEffectiveSize = op->chunkSize; -// if (op->protocol == MSCCLPP_PROTO_LL) { -// chunkEffectiveSize /= 2; -// } - -// op->nbytes = stepSize; -// op->nsteps = DIVUP(info->count, chunkEffectiveSize); -// if (op->nsteps == 0) op->nsteps = 1; - -// return mscclppSuccess; -// } - -// static mscclppResult_t removeOp(struct mscclppProxyProgressState* state, struct mscclppProxyArgs** opPtr, struct mscclppProxyArgs** prevOpPtr) { -// struct mscclppProxyArgs* freeOp = *opPtr; -// struct mscclppProxyArgs* next = freeOp->next; -// DEBUG_PROXY_PRINT("Remove %ld -> %ld -> %ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(freeOp), OP_INDEX(next)); -// *opPtr = next; -// if (freeOp->nextPeer) { -// // replace op by nextPeer -// struct mscclppProxyArgs* nextPeer = freeOp->nextPeer; -// if (*prevOpPtr) { -// (*prevOpPtr)->next = nextPeer; -// } else { -// state->active = nextPeer; -// } -// nextPeer->next = next; -// *(prevOpPtr) = nextPeer; -// } else { -// *(freeOp->proxyAppendPtr) = NULL; -// if (*prevOpPtr) { -// (*prevOpPtr)->next = next; -// } else { -// state->active = next; -// } -// } -// freeOp->next = state->pool; -// state->pool = freeOp; -// DEBUG_PROXY_PRINT("Removed %5ld (%5ld) : ", OP_INDEX(freeOp), OP_INDEX(*freeOp->proxyAppendPtr)); -// #ifdef DEBUG_PROXY -// MSCCLPPCHECK(dumpProxyState(state)); -// #endif -// return mscclppSuccess; -// } - -// static mscclppResult_t progressOps(struct mscclppComm* comm, struct mscclppProxyProgressState* state, struct mscclppProxyArgs* opStart, int* idle) { -// struct mscclppProxyArgs* prevOp = NULL; -// struct mscclppProxyArgs* op = opStart; -// while (op) { -// if (op->state == mscclppProxyOpNone) return mscclppInternalError; -// TIME_START(0); TIME_START(1); -// MSCCLPPCHECK(op->progress(comm, op)); -// if (op->idle) { TIME_STOP(1); TIME_CANCEL(0); } else { TIME_CANCEL(1); TIME_STOP(0); } -// *idle &= op->idle; -// if (op->state == mscclppProxyOpNone) { -// TIME_START(2); -// MSCCLPPCHECK(removeOp(state, &op, &prevOp)); -// TIME_STOP(2); -// } else { -// prevOp = op; -// op = op->next; -// } -// } -// return mscclppSuccess; -// } - -// MSCCLPP_PARAM(ProxyAppendBatchSize, "PROXY_APPEND_BATCH_SIZE", 16); - -// static mscclppResult_t mscclppProxyGetPostedOps(struct mscclppComm* comm, int* added) { -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// if (state->opsPool == NULL) return mscclppInternalError; -// struct mscclppProxyOpsPool* pool = state->opsPool; - -// struct mscclppProxyArgs profArgs; // Only used for profiling purposes -// if (state->nextOps != -1) goto process_nextops; - -// // If we have ops to progress, no need to block waiting for something to arrive or even wait for the lock -// // to be available. Exit, continue progress, and come back later. -// if (state->active != NULL && (pool->nextOps == -1 || pthread_mutex_trylock(&pool->mutex) != 0)) return mscclppSuccess; - -// if (state->active == NULL) { -// pthread_mutex_lock(&pool->mutex); -// while (pool->nextOps == -1 && !state->stop) { -// struct mscclppProxyArgs profArgs; // Only used for profiling purposes -// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileSleep); -// pthread_cond_wait(&pool->cond, &pool->mutex); -// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileWakeup); -// } -// if (state->stop) { // We might have been woken up to stop. -// pthread_mutex_unlock(&pool->mutex); -// return mscclppSuccess; -// } -// } - -// state->nextOps = pool->nextOps; -// pool->nextOps = pool->nextOpsEnd = -1; -// pthread_mutex_unlock(&pool->mutex); -// if (state->nextOps == -1) return mscclppInternalError; - -// process_nextops: -// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileAppend); -// TIME_START(2); -// int freeOp[MSCCLPP_MAX_LOCAL_RANKS]; -// int freeOpEnd[MSCCLPP_MAX_LOCAL_RANKS]; -// for (int i=0; ilocalRanks; i++) freeOp[i] = -1; - -// uint64_t lastOpCount = 0; -// int lastPeer = -1; -// int count = 0; -// for (int opIndex = state->nextOps; opIndex != -1;) { -// struct mscclppProxyOp* peerOp = pool->ops+opIndex; -// int peer = opIndex / MAX_OPS_PER_PEER; -// if ((lastOpCount && peerOp->opCount != lastOpCount) || ((lastPeer != -1) && peer != lastPeer)) count++; -// if (count == mscclppParamProxyAppendBatchSize()+1) break; -// lastOpCount = peerOp->opCount; -// lastPeer = peer; -// if (peerOp->connection == NULL) return mscclppInternalError; -// if (peerOp->next != -1) __builtin_prefetch(pool->ops+peerOp->next); -// MSCCLPPCHECK(ProxyAppend(state, peerOp)); -// (*added)++; -// int lastOpIndex = opIndex; -// opIndex = peerOp->next; -// // Return op to peer pool -// if (freeOp[peer] == -1) { -// freeOpEnd[peer] = lastOpIndex; -// } else { -// peerOp->next = freeOp[peer]; -// } -// freeOp[peer] = lastOpIndex; -// state->nextOps = opIndex; -// } - -// for (int i=0; ilocalRanks; i++) { -// if (freeOp[i] == -1) continue; -// int newFree = freeOp[i]; -// int oldFree = pool->freeOps[i]; -// pool->ops[freeOpEnd[i]].next = oldFree; -// if (oldFree == -1) { -// // Nothing for the main thread to consume, we can set it. -// pool->freeOps[i] = newFree; -// } else { -// // The main thread may recycle free ops at any time, replace the freeOps value atomically and check it worked. -// int swap = __sync_val_compare_and_swap(pool->freeOps+i, oldFree, newFree); -// if (swap != oldFree) { -// if (swap != -1) return mscclppInternalError; -// // Ops were recycled while we were trying to swap, just set the value directly now. -// pool->ops[freeOpEnd[i]].next = -1; -// pool->freeOps[i] = newFree; -// } -// } -// } -// profArgs.opCount = *added; -// mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileAppendEnd); -// TIME_STOP(2); -// return mscclppSuccess; -// } - -// #include -// static mscclppProxyProgressState* mscclppLastProxyState; -// void mscclppDumpProxyState(int signal) { -// dumpProxyState(mscclppLastProxyState); -// } - -// MSCCLPP_PARAM(CreateThreadContext, "CREATE_THREAD_CONTEXT", 0); -// mscclppResult_t mscclppSetThreadContext(struct mscclppComm* comm) { -// #if CUDART_VERSION >= 11030 -// static int createThreadContext = -1; - -// if (createThreadContext == -1) { -// createThreadContext = mscclppParamCreateThreadContext(); -// if (createThreadContext) { -// if (CUPFN(cuCtxCreate) == nullptr || CUPFN(cuCtxDestroy) == nullptr || CUPFN(cuCtxSetCurrent) == nullptr) { -// WARN("Unable to create thread context due to old driver, disabling."); -// createThreadContext = 0; -// } -// } -// } -// if (createThreadContext) { -// if (comm->proxyState.cudaCtx == NULL) { -// if (CUPFN(cuCtxCreate(&comm->proxyState.cudaCtx, -// CU_CTX_SCHED_SPIN|CU_CTX_MAP_HOST, comm->cudaDev)) != CUDA_SUCCESS) { -// WARN("Failed to create CUDA context on device %d", comm->cudaDev); -// createThreadContext = 0; -// return mscclppSuccess; -// } -// } else { -// if (CUPFN(cuCtxSetCurrent(comm->proxyState.cudaCtx)) != CUDA_SUCCESS) { -// WARN("Failed to set CUDA context on device %d", comm->cudaDev); -// return mscclppUnhandledCudaError; -// } -// } -// } -// #endif -// return mscclppSuccess; -// } - -// // Set to SIGUSR1 or SIGUSR2 to help debug proxy state during hangs -// MSCCLPP_PARAM(ProxyDumpSignal, "PROXY_DUMP_SIGNAL", -1); - -// void* mscclppProxyProgress(void *comm_) { -// struct mscclppComm* comm = (struct mscclppComm*)comm_; -// if (mscclppSetThreadContext(comm) != mscclppSuccess) { -// WARN("[Proxy Progress] Failed to set CUDA context on device %d", comm->cudaDev); -// } else if (cudaSetDevice(comm->cudaDev) != cudaSuccess) { -// WARN("[Proxy Progress] Failed to set CUDA device %d", comm->cudaDev); -// } -// if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); - -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// state->nextOps = -1; -// const int sig = mscclppParamProxyDumpSignal(); -// if (sig != -1) signal(sig, mscclppDumpProxyState); -// mscclppLastProxyState = state; -// char threadName[MSCCLPP_THREAD_NAMELEN]; -// snprintf(threadName, MSCCLPP_THREAD_NAMELEN, "MSCCLPP Progress%2d", comm->cudaDev); -// nvtxNameOsThreadA(syscall(SYS_gettid), threadName); - -// int lastIdle = 0; -// struct mscclppProxyArgs profArgs; // Only used for profiling purposes -// while ((state->stop == false || (state->stop == true && state->active)) && *comm->abortFlag == 0) { -// int idle = 1; -// mscclppResult_t ret = progressOps(comm, state, state->active, &idle); -// if (ret != mscclppSuccess) { -// (void) mscclppCommSetAsyncError(comm, ret); -// INFO(MSCCLPP_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); -// return NULL; -// } -// if (lastIdle == 0 && idle == 1) mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileIdle); -// if (lastIdle == 1 && idle == 0) mscclppProfilingRecord(&profArgs, 0, 0, mscclppProxyProfileActive); -// int added = 0; -// TIME_START(3); -// if (state->stop == false) -// ret = mscclppProxyGetPostedOps(comm, &added); -// if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } -// if (ret != mscclppSuccess) { -// (void) mscclppCommSetAsyncError(comm, ret); -// INFO(MSCCLPP_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); -// } -// if (added == 0) { -// sched_yield(); // No request progressed. Let others run. -// } -// lastIdle = idle; -// } -// return NULL; -// } - -// mscclppResult_t mscclppProxyStart(struct mscclppComm* comm) { -// struct mscclppProxyOps* proxyOps = comm->proxyState.proxyOps; -// if (proxyOps == NULL) return mscclppSuccess; -// TIME_START(1); -// for (int r=0; rlocalRanks; r++) { -// struct mscclppProxyOps* ops = proxyOps+r; -// if (ops->pool == NULL || ops->nextOps == -1) continue; -// MSCCLPPCHECK(mscclppProxyPost(ops->pool, ops->nextOps, ops->nextOpsEnd)); -// ops->nextOps = ops->nextOpsEnd = -1; -// ops->count = 0; -// } -// comm->opCount++; -// TIME_STOP(1); -// return mscclppSuccess; -// } - -// mscclppResult_t mscclppProxyProgressCreate(struct mscclppComm* comm) { -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// if (!state->thread) { -// pthread_create(&state->thread, NULL, mscclppProxyProgress, comm); -// mscclppSetThreadName(state->thread, "MSCCLPP Progress%2d", comm->cudaDev); -// } -// return mscclppSuccess; -// } - -// mscclppResult_t mscclppProxyProgressDestroy(struct mscclppComm* comm) { -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; - -// // Request the proxy to stop and then wake it -// if (state->opsPool) { -// pthread_mutex_lock(&state->opsPool->mutex); -// state->stop = true; -// pthread_cond_signal(&state->opsPool->cond); -// pthread_mutex_unlock(&state->opsPool->mutex); -// pthread_join(state->thread, NULL); -// } - -// // Free off any memory allocated for the proxy arg pools -// while (state->pools != NULL) { -// struct mscclppProxyPool *next = state->pools->next; -// free(state->pools); -// state->pools = next; -// } - -// mscclppProfilingDump(); -// TIME_PRINT("Proxy"); -// return mscclppSuccess; -// } - -// struct mscclppProxyAsyncOp { -// int type; -// struct mscclppProxyConnection* connection; -// int reqSize, respSize; -// char *reqBuff, *respBuff; -// }; - -// struct mscclppProxyLocalPeer { -// struct mscclppSocket sock; -// int localRank; -// struct mscclppProxyAsyncOp asyncOps; -// }; - -// #define MSCCLPP_PROXY_CONN_POOL_SIZE_POW2 7 -// #define MSCCLPP_PROXY_CONN_POOL_SIZE (1<<(MSCCLPP_PROXY_CONN_POOL_SIZE_POW2)) -// #define MSCCLPP_PROXY_CONN_POOL_MASK ((MSCCLPP_PROXY_CONN_POOL_SIZE)-1) -// struct mscclppProxyConnectionPool { -// struct mscclppProxyConnection** pools; -// int banks; -// int offset; -// struct mscclppProxyAsyncOp* ops; -// }; - -// static mscclppResult_t mscclppProxyNewConnection(struct mscclppProxyConnectionPool* pool, int* id) { -// if (pool->offset == MSCCLPP_PROXY_CONN_POOL_SIZE) { -// MSCCLPPCHECK(mscclppRealloc(&pool->pools, pool->banks, pool->banks+1)); -// MSCCLPPCHECK(mscclppCalloc(pool->pools+pool->banks, MSCCLPP_PROXY_CONN_POOL_SIZE)); -// pool->banks++; -// pool->offset = 0; -// } -// *id = ((pool->banks-1) << MSCCLPP_PROXY_CONN_POOL_SIZE_POW2) + pool->offset; -// pool->offset++; -// return mscclppSuccess; -// } - -// static mscclppResult_t mscclppProxyGetConnection(struct mscclppProxyConnectionPool* pool, int id, struct mscclppProxyConnection** conn) { -// int bank = id>>MSCCLPP_PROXY_CONN_POOL_SIZE_POW2; -// int offset = id&MSCCLPP_PROXY_CONN_POOL_MASK; -// if ((pool->pools == NULL) || (bank > pool->banks) || (pool->pools[bank] == NULL)) return mscclppInternalError; -// *conn = pool->pools[bank]+offset; -// return mscclppSuccess; -// } - -// static mscclppResult_t proxyFree(struct mscclppProxyConnection* connection, struct mscclppComm* comm) { -// if (connection->send) { -// if (mscclppTransports[connection->transport]->send.proxyFree) { -// MSCCLPPCHECK(mscclppTransports[connection->transport]->send.proxyFree(connection, comm)); -// } -// } else { -// if (mscclppTransports[connection->transport]->recv.proxyFree) { -// MSCCLPPCHECK(mscclppTransports[connection->transport]->recv.proxyFree(connection, comm)); -// } -// } -// return mscclppSuccess; -// } - -// static mscclppResult_t mscclppProxyFreeConnections(struct mscclppProxyConnectionPool* pool, struct mscclppComm* comm) { -// for (int b=0; bbanks; b++) { -// int max = b == pool->banks-1 ? pool->offset : MSCCLPP_PROXY_CONN_POOL_SIZE; -// for (int i=0; ipools[b]+i; -// if (connection->state != connUninitialized) { -// MSCCLPPCHECK(proxyFree(connection, comm)); -// } -// } -// free(pool->pools[b]); -// } -// free(pool->pools); -// return mscclppSuccess; -// } - -// #include "transport.h" - -// mscclppResult_t mscclppProxyConnect(struct mscclppComm* comm, int transport, int send, int rank, struct mscclppProxyConnector* proxyConn) { -// struct mscclppSocket* sock; -// int ready; -// int type = mscclppProxyMsgInit; - -// // Keep one connection per mlocal rank -// proxyConn->connection = NULL; -// proxyConn->rank = rank; -// if (comm->proxyState.peerSocks == NULL) { -// MSCCLPPCHECK(mscclppCalloc(&comm->proxyState.peerSocks, comm->localRanks)); -// MSCCLPPCHECK(mscclppCalloc(&comm->proxyState.proxyOps, comm->localRanks)); -// MSCCLPPCHECK(mscclppCalloc(&comm->proxyState.sharedDevMems, comm->localRanks)); -// for (int i = 0; i < comm->localRanks; ++i) { -// MSCCLPPCHECK(mscclppSocketSetFd(-1, &comm->proxyState.peerSocks[i])); -// } -// } - -// MSCCLPPCHECK(mscclppTopoGetLocalRank(comm->topo, rank, &proxyConn->localRank)); -// sock = comm->proxyState.peerSocks + proxyConn->localRank; -// MSCCLPPCHECK(mscclppSocketReady(sock, &ready)); -// if (!ready) { -// MSCCLPPCHECK(mscclppSocketInit(sock, comm->proxyState.peerAddresses+rank, comm->magic, mscclppSocketTypeProxy, comm->abortFlag)); -// MSCCLPPCHECK(mscclppSocketConnect(sock)); -// } -// MSCCLPPCHECK(mscclppSocketSend(sock, &type, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketSend(sock, &transport, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketSend(sock, &send, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketSend(sock, &comm->localRank, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketRecv(sock, &proxyConn->connection, sizeof(void*))); -// struct mscclppTransportComm* tcomm = send ? &mscclppTransports[transport]->send : &mscclppTransports[transport]->recv; -// // If we need proxy progress, map progress ops -// if (tcomm->proxyProgress) { -// char poolPath[] = "/dev/shm/mscclpp-XXXXXX"; -// MSCCLPPCHECK(mscclppSocketRecv(sock, poolPath+sizeof("/dev/shm/mscclpp-")-1, sizeof("XXXXXX")-1)); -// struct mscclppProxyOps* proxyOps = comm->proxyState.proxyOps+proxyConn->localRank; -// if (proxyOps->pool == NULL) { -// MSCCLPPCHECK(mscclppShmOpen(poolPath, sizeof(struct mscclppProxyOpsPool), (void**)(&proxyOps->pool), NULL, -1, &proxyOps->handle)); -// proxyOps->nextOps = proxyOps->nextOpsEnd = proxyOps->freeOp = -1; -// } -// } -// INFO(MSCCLPP_NET, "Connection to proxy localRank %d -> connection %p", proxyConn->localRank, proxyConn->connection); -// proxyConn->comm = comm; -// return mscclppSuccess; -// } - -// const char* mscclppProxyMsgTypeStr[] = { "Unknown", "Init", "SharedInit", "Setup", "Connect", "Start", "Close", "Abort", "Stop" }; -// mscclppResult_t mscclppProxyCall(struct mscclppProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize) { -// struct mscclppSocket* sock; -// mscclppResult_t ret = mscclppSuccess; - -// if (proxyConn->comm->proxyState.peerSocks == NULL) return mscclppInternalError; -// sock = proxyConn->comm->proxyState.peerSocks + proxyConn->localRank; -// if (sock == NULL) return mscclppInternalError; -// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &type, sizeof(int)), ret, error); -// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &proxyConn->connection, sizeof(void*)), ret, error); -// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &reqSize, sizeof(int)), ret, error); -// MSCCLPPCHECKGOTO(mscclppSocketSend(sock, &respSize, sizeof(int)), ret, error); -// if (reqSize) MSCCLPPCHECKGOTO(mscclppSocketSend(sock, reqBuff, reqSize), ret, error); -// if (respSize) MSCCLPPCHECKGOTO(mscclppSocketRecv(sock, respBuff, respSize), ret, error); -// return mscclppSuccess; -// error: -// WARN("Proxy Call to rank %d failed (%s)", proxyConn->comm->localRankToRank[proxyConn->localRank], mscclppProxyMsgTypeStr[type]); -// return ret; -// } - -// static mscclppResult_t proxyProgressInit(struct mscclppComm* comm) { -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// if (state->opsPool == NULL) { -// int size = sizeof(struct mscclppProxyOpsPool); -// struct mscclppProxyOpsPool* pool = NULL; - -// // The service thread may be launched already but localRanks may not be set yet. -// while (comm->localRanks == 0) sched_yield(); - -// char shmPath[sizeof("/dev/shm/mscclpp-XXXXXX")]; -// shmPath[0] = '\0'; -// MSCCLPPCHECK(mscclppShmOpen(shmPath, size, (void**)&pool, NULL, comm->localRanks + 1, &state->handle)); -// // Init pool -// pool->nextOps = -1; - -// for (int r=0; rlocalRanks; r++) { -// pool->freeOps[r] = r*MAX_OPS_PER_PEER; -// for (int i=0; iops[r*MAX_OPS_PER_PEER+i].next = r*MAX_OPS_PER_PEER+i+1; -// pool->ops[(r+1)*MAX_OPS_PER_PEER-1].next = -1; -// } - -// // Setup mutex/cond to work inter-process -// pthread_mutexattr_t mutexAttr; -// pthread_mutexattr_init(&mutexAttr); -// pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED); -// pthread_mutex_init(&pool->mutex, &mutexAttr); -// pthread_condattr_t condAttr; -// pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED); -// pthread_cond_init(&pool->cond, &condAttr); -// state->opsPool = pool; - -// memcpy(state->opsPoolShmSuffix, shmPath+sizeof("/dev/shm/mscclpp-")-1, sizeof("XXXXXX")-1); - -// // All ops structures are created, we can start the progress thread -// MSCCLPPCHECK(mscclppProxyProgressCreate(comm)); -// } -// return mscclppSuccess; -// } - -// static void proxyOpsFree(struct mscclppComm* comm) { -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// if (mscclppShmClose(state->handle) != mscclppSuccess) { -// WARN("[Service thread] shm close failed"); -// } -// } - -// mscclppResult_t mscclppProxyShmUnlink(struct mscclppComm* comm) { -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// if (state->opsPool == NULL) return mscclppSuccess; - -// if (mscclppShmUnlink(state->handle) != mscclppSuccess) { -// WARN("[Service thread] proxy ops shm unlink failed"); -// } -// return mscclppSuccess; -// } - -// static mscclppResult_t proxyConnInit(struct mscclppProxyLocalPeer* peer, struct mscclppProxyConnectionPool* connectionPool, struct mscclppComm* comm) { -// struct mscclppSocket* sock = &peer->sock; -// int id; -// struct mscclppProxyConnection* connection; -// MSCCLPPCHECK(mscclppProxyNewConnection(connectionPool, &id)); -// MSCCLPPCHECK(mscclppProxyGetConnection(connectionPool, id, &connection)); -// connection->sock = sock; -// MSCCLPPCHECK(mscclppSocketRecv(sock, &connection->transport, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketRecv(sock, &connection->send, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketRecv(sock, &peer->localRank, sizeof(int))); -// connection->localRank = peer->localRank; -// MSCCLPPCHECK(mscclppSocketSend(sock, &connection, sizeof(void*))); -// connection->tcomm = connection->send ? &mscclppTransports[connection->transport]->send : &mscclppTransports[connection->transport]->recv; -// // If we need proxy progress, let's allocate ops and start the thread -// if (connection->tcomm->proxyProgress) { -// MSCCLPPCHECK(proxyProgressInit(comm)); -// struct mscclppProxyProgressState* state = &comm->proxyState.progressState; -// MSCCLPPCHECK(mscclppSocketSend(sock, state->opsPoolShmSuffix, sizeof("XXXXXX")-1)); -// } -// INFO(MSCCLPP_NET, "New proxy %s connection %d from local rank %d, transport %d", connection->send ? "send":"recv", id, connection->localRank, connection->transport); -// __atomic_store_n(&connection->state, connInitialized, __ATOMIC_RELEASE); -// return mscclppSuccess; -// } - -// static mscclppResult_t proxyConnSharedInit(struct mscclppProxyLocalPeer* peer, struct mscclppProxyConnectionPool* connectionPool, struct mscclppComm* comm) { -// struct mscclppSocket* sock = &peer->sock; -// struct mscclppProxyConnection* connection; -// MSCCLPPCHECK(mscclppSocketRecv(sock, &connection, sizeof(void*))); -// int reqSize, respSize; -// MSCCLPPCHECK(mscclppSocketRecv(sock, &reqSize, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketRecv(sock, &respSize, sizeof(int))); -// if (reqSize != sizeof(int) || respSize != 0) return mscclppInternalError; -// int nChannels; -// MSCCLPPCHECK(mscclppSocketRecv(sock, &nChannels, sizeof(int))); -// if (connection->tcomm->proxySharedInit) MSCCLPPCHECK(connection->tcomm->proxySharedInit(connection, comm, nChannels)); -// __atomic_store_n(&connection->state, connSharedInitialized, __ATOMIC_RELEASE); -// return mscclppSuccess; -// } - -// static mscclppResult_t proxyProgressAsync(struct mscclppProxyAsyncOp* op, struct mscclppComm* comm, int* asyncOpCount) { -// int done = 1; -// if (op->type == mscclppProxyMsgSetup) { -// MSCCLPPCHECK(op->connection->tcomm->proxySetup(op->connection, comm, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done)); -// } else if (op->type == mscclppProxyMsgConnect) { -// MSCCLPPCHECK(op->connection->tcomm->proxyConnect(op->connection, comm, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done)); -// } else return mscclppInternalError; -// if (done) { -// if (op->type == mscclppProxyMsgSetup) -// __atomic_store_n(&op->connection->state, connSetupDone, __ATOMIC_RELEASE); -// else if (op->type == mscclppProxyMsgConnect) -// __atomic_store_n(&op->connection->state, connConnected, __ATOMIC_RELEASE); -// /* if setup or connect is done, we should not return any error at this point since -// * mscclppSocketSend might already send the respBuff to the requester. If we still choose -// * to abort and close the connection, it can cause segfault if the requester is using -// * the respBuff. */ -// if (op->respSize) mscclppSocketSend(op->connection->sock, op->respBuff, op->respSize); -// if (op->reqBuff) { -// free(op->reqBuff); -// op->reqBuff = NULL; -// } -// if (op->respBuff) { -// free(op->respBuff); -// op->respBuff = NULL; -// } -// op->type = 0; -// (*asyncOpCount)--; -// } else if (*comm->abortFlag != 0) { -// return mscclppInternalError; -// } - -// return mscclppSuccess; -// } - -// static mscclppResult_t proxyConnSetupConnect(int type, struct mscclppProxyLocalPeer* peer, struct mscclppProxyConnectionPool* connectionPool, struct mscclppComm* comm, int* asyncOpCount) { -// struct mscclppSocket* sock = &peer->sock; -// struct mscclppProxyAsyncOp* asyncOp = &peer->asyncOps; -// asyncOp->type = type; -// MSCCLPPCHECK(mscclppSocketRecv(sock, &asyncOp->connection, sizeof(void*))); - -// MSCCLPPCHECK(mscclppSocketRecv(sock, &asyncOp->reqSize, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketRecv(sock, &asyncOp->respSize, sizeof(int))); -// if (asyncOp->reqSize) { -// MSCCLPPCHECK(mscclppCalloc(&asyncOp->reqBuff, asyncOp->reqSize)); -// MSCCLPPCHECK(mscclppSocketRecv(sock, asyncOp->reqBuff, asyncOp->reqSize)); -// } -// if (asyncOp->respSize) MSCCLPPCHECK(mscclppCalloc(&asyncOp->respBuff, asyncOp->respSize)); -// (*asyncOpCount)++; -// MSCCLPPCHECK(proxyProgressAsync(asyncOp, comm, asyncOpCount)); -// return mscclppSuccess; -// } - -// #include - -// void* mscclppProxyService(void* _args) { -// struct mscclppComm* comm = (struct mscclppComm *) _args; -// if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); -// if (mscclppSetThreadContext(comm) != mscclppSuccess) { -// WARN("[Proxy Service] Failed to set CUDA context on device %d", comm->cudaDev); -// } else if (cudaSetDevice(comm->cudaDev) != cudaSuccess) { -// WARN("[Proxy Service] Failed to set CUDA device %d", comm->cudaDev); -// } -// if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); - -// // Prepare poll descriptor -// struct mscclppProxyConnectionPool connectionPool; -// connectionPool.pools = NULL; -// connectionPool.banks = 0; -// connectionPool.offset = MSCCLPP_PROXY_CONN_POOL_SIZE; - -// struct pollfd pollfds[MSCCLPP_MAX_LOCAL_RANKS+1]; -// struct mscclppProxyLocalPeer peers[MSCCLPP_MAX_LOCAL_RANKS]; -// memset(&peers, 0, sizeof(struct mscclppProxyLocalPeer)*MSCCLPP_MAX_LOCAL_RANKS); -// for (int s=0; sproxyState.listenSock, &pollfds[MSCCLPP_MAX_LOCAL_RANKS].fd) != mscclppSuccess) { -// WARN("[Proxy Service] Get listenSock fd fails\n"); -// return NULL; -// }; -// pollfds[MSCCLPP_MAX_LOCAL_RANKS].events = POLLIN; - -// int maxnpeers = 0; -// int npeers = 0; -// int stop = 0; -// int asyncOpCount = 0; -// while (stop == 0 || (stop == 1 && npeers > 0)) { -// /* Even if local comm aborts, we cannot let proxy thread exit if we still have peer -// * connections. Need to wait until all other related comms call abort and safely exit -// * together, or we could face segmentation fault. */ -// if (*comm->abortFlag != 0) stop = 1; -// /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ -// int ret; -// do { -// ret = poll(pollfds, MSCCLPP_MAX_LOCAL_RANKS+1, asyncOpCount ? 0 : 500); -// } while (ret < 0 && errno == EINTR); -// if (ret < 0) { -// WARN("[Proxy Service] Poll failed: %s", strerror(errno)); -// return NULL; -// } -// if (pollfds[MSCCLPP_MAX_LOCAL_RANKS].revents) { -// int s = 0; -// while (s < MSCCLPP_MAX_LOCAL_RANKS && pollfds[s].fd >= 0) s++; -// if (s == MSCCLPP_MAX_LOCAL_RANKS) { -// WARN("[Proxy service] Too many connections (%d max)", MSCCLPP_MAX_LOCAL_RANKS); -// return NULL; -// } -// if (maxnpeers < s+1) maxnpeers = s+1; -// if (mscclppSocketInit(&peers[s].sock) != mscclppSuccess) { -// WARN("[Service thread] Initialize peers[%d].sock fails\n", s); -// return NULL; -// } -// if (mscclppSocketAccept(&peers[s].sock, comm->proxyState.listenSock) != mscclppSuccess) { -// WARN("[Service thread] Accept failed %s", strerror(errno)); -// } else { -// if (mscclppSocketGetFd(&peers[s].sock, &pollfds[s].fd) != mscclppSuccess) { -// WARN("[Service thread] Get peers[%d].sock fd fails\n", s); -// return NULL; -// } -// npeers++; -// peers[s].localRank = -1; -// } -// } -// for (int s=0; ssock; -// struct mscclppProxyAsyncOp* op = &peer->asyncOps; -// int closeConn = 0; -// int type = 0; -// mscclppResult_t res = mscclppSuccess; - -// if (pollfds[s].fd == -1) continue; -// if (op->type != 0) { -// res = proxyProgressAsync(op, comm, &asyncOpCount); -// type = op->type; -// if (res != mscclppSuccess) closeConn = 1; -// } else if (pollfds[s].revents & POLLIN) { -// int closed; -// if (mscclppSocketTryRecv(sock, &type, sizeof(int), &closed) != mscclppSuccess) { -// WARN("[Service thread] Could not receive type from localRank %d", peer->localRank); -// closeConn = 1; -// } else if (closed) { -// INFO(MSCCLPP_INIT|MSCCLPP_NET, "[Service thread] Connection closed by localRank %d", peer->localRank); -// closeConn = 1; -// } else { -// if (type == mscclppProxyMsgStop) { -// stop = 1; -// closeConn = 1; -// } else if (type == mscclppProxyMsgClose) { -// closeConn = 1; -// } else if (type == mscclppProxyMsgInit) { -// res = proxyConnInit(peers+s, &connectionPool, comm); -// } else if (type == mscclppProxyMsgSharedInit) { -// res = proxyConnSharedInit(peers+s, &connectionPool, comm); -// } else if (type == mscclppProxyMsgSetup || type == mscclppProxyMsgConnect) { -// res = proxyConnSetupConnect(type, peers+s, &connectionPool, comm, &asyncOpCount); -// } else { -// WARN("[Service thread] Unknown command %d from localRank %d\n", type, peer->localRank); -// closeConn = 1; -// } -// } -// } else if (pollfds[s].revents & POLLHUP) { -// closeConn = 1; -// } -// if (res != mscclppSuccess) { -// WARN("[Proxy Service %d] Failed to execute operation %s from rank %d, retcode %d", comm->rank, mscclppProxyMsgTypeStr[type], comm->localRankToRank[peer->localRank], res); -// closeConn = 1; -// } -// if (closeConn) { -// mscclppSocketClose(sock); -// if (op->reqBuff) { -// free(op->reqBuff); -// op->reqBuff = NULL; -// } -// if (op->respBuff) { -// free(op->respBuff); -// op->respBuff = NULL; -// } -// op->type = 0; -// pollfds[s].fd = -1; -// npeers--; -// } -// } -// } - -// // Wait for all operations to complete and stop progress thread before freeing any resource -// if (mscclppProxyProgressDestroy(comm) != mscclppSuccess) { -// WARN("[Proxy Service] proxyDestroy failed"); -// } -// for (int s=0; sproxyState.listenSock); -// proxyOpsFree(comm); -// return NULL; -// } - -mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses) { - comm->proxyState.listenSock = sock; - comm->proxyState.peerAddresses = peerAddresses; +mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { + // comm->proxyState.thread is pthread_join()'d by commFree() in init.cc + for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { + if (comm->ibContext[i] != NULL) { + struct proxyArgs *args; + MSCCLPPCHECK(mscclppCalloc(&args, 1)); + args->comm = comm; + args->ibCtx = comm->ibContext[i]; + args->stop = &comm->proxyState[i].stop; + pthread_create(&comm->proxyState[i].thread, NULL, mscclppProxyService, args); + mscclppSetThreadName(comm->proxyState[i].thread, "MSCCLPP Service %2d", i); + } + } return mscclppSuccess; } -// mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm) { -// // comm->proxyState.thread is pthread_join()'d by commFree() in init.cc -// pthread_create(&comm->proxyState.thread, NULL, mscclppProxyService, comm); -// mscclppSetThreadName(comm->proxyState.thread, "MSCCLPP Service %2d", comm->cudaDev); -// return mscclppSuccess; -// } - -// mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { -// struct mscclppProxyState* state = &comm->proxyState; - -// if (state == NULL) return mscclppSuccess; -// if (state->peerAddresses) { -// if (*comm->abortFlag == 0) { -// struct mscclppSocket sock; -// int type = mscclppProxyMsgStop; -// MSCCLPPCHECK(mscclppSocketInit(&sock, comm->proxyState.peerAddresses + comm->rank, comm->magic, mscclppSocketTypeProxy, comm->abortFlag)); -// MSCCLPPCHECK(mscclppSocketConnect(&sock)); -// MSCCLPPCHECK(mscclppSocketSend(&sock, &type, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketClose(&sock)); -// } -// free(state->peerAddresses); -// } - -// if (state->peerSocks) { -// for (int i=0; ilocalRanks; i++) { -// int fd; -// MSCCLPPCHECK(mscclppSocketGetFd(state->peerSocks + i, &fd)); -// if (fd >= 0) { -// if (state->proxyOps[i].pool) { -// MSCCLPPCHECK(mscclppShmClose(state->proxyOps[i].handle)); -// } -// if (state->sharedDevMems[i]) { -// CUDACHECK(cudaIpcCloseMemHandle(state->sharedDevMems[i])); -// } -// int type = mscclppProxyMsgClose; -// if (*comm->abortFlag == 0) MSCCLPPCHECK(mscclppSocketSend(state->peerSocks + i, &type, sizeof(int))); -// MSCCLPPCHECK(mscclppSocketClose(state->peerSocks + i)); -// } -// } -// free(state->peerSocks); -// free(state->proxyOps); -// free(state->sharedDevMems); -// } -// return mscclppSuccess; -// } +mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm) { + for (int i = 0; i < MSCCLPP_IB_MAX_DEVS; ++i) { + if (comm->ibContext[i] != NULL) { + volatile int *stop = (volatile int *)&comm->proxyState[i].stop; + *stop = 1; + while (*stop != 0 && *comm->abortFlag == 0) { + usleep(1000); + } + } + } + return mscclppSuccess; +} diff --git a/src/bootstrap/shmutils.cc b/src/bootstrap/shmutils.cc deleted file mode 100644 index eebe4e52..00000000 --- a/src/bootstrap/shmutils.cc +++ /dev/null @@ -1,80 +0,0 @@ -#include "shmutils.h" -#include "debug.h" -#include -#include -#include -#include - -#define SHM_MODE 0666 - -// Open a shme file and create an mmap. -static mscclppResult_t shmutilsMapOpen(const char *name, size_t size, int *fd, void **map, int flag) -{ - int _fd = shm_open(name, flag, SHM_MODE); - if (_fd == -1) { - WARN("Failed to open shm file %s (flag: %d)", name, flag); - return mscclppInternalError; - } - void *_map = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0); - if (_map == MAP_FAILED) { - WARN("Failed to mmap shm file %s", name); - goto fail; - } - if (flag & O_CREAT) { - if (ftruncate(_fd, 0) == -1) { - WARN("Failed to ftruncate shm file %s", name); - goto fail; - } - } - if (ftruncate(_fd, size) == -1) { - WARN("Failed to ftruncate shm file %s", name); - goto fail; - } - *fd = _fd; - *map = _map; - return mscclppSuccess; -fail: - close(_fd); - shm_unlink(name); - return mscclppInternalError; -} - -// Open or create a shm file. -mscclppResult_t mscclppShmutilsMapCreate(const char *name, size_t size, int *fd, void **map) -{ - return shmutilsMapOpen(name, size, fd, map, O_CREAT | O_RDWR); -} - -// Open an existing shm file. -mscclppResult_t mscclppShmutilsMapOpen(const char *name, size_t size, int *fd, void **map) -{ - return shmutilsMapOpen(name, size, fd, map, O_RDWR); -} - -// Close a shm mmap. -mscclppResult_t mscclppShmutilsMapClose(const char *name, size_t size, int fd, void *map) -{ - int err = 0; - if (munmap(map, size) == -1) { - WARN("Failed to munmap shm file %s", name); - err = 1; - } - close(fd); - return err ? mscclppInternalError : mscclppSuccess; -} - -// Close a shm mmap and destroy a shm file. -mscclppResult_t mscclppShmutilsMapDestroy(const char *name, size_t size, int fd, void *map) -{ - int err = 0; - if (munmap(map, size) == -1) { - WARN("Failed to munmap shm file %s", name); - err = 1; - } - close(fd); - if (shm_unlink(name) == -1) { - WARN("Failed to unlink shm file %s: errno %d", name, errno); - err = 1; - } - return err ? mscclppInternalError : mscclppSuccess; -} diff --git a/src/include/comm.h b/src/include/comm.h index 0d374ab9..1bbb9491 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -12,6 +12,7 @@ // #include "collectives.h" #include "proxy.h" // #include "strongstream.h" +#include "ib.h" // #if CUDART_VERSION < 9000 // struct cudaLaunchParams { @@ -159,13 +160,17 @@ struct mscclppConn { mscclppTransport_t transport; - int localRank; - int remoteRank; - const char* ibDev; + int rankSend; + int rankRecv; int tag; void* buff; + int buffSize; int* flag; struct mscclppDevConn *devConn; + struct mscclppIbContext *ibCtx; + struct mscclppIbQp *ibQp; + struct mscclppIbMr *ibMr; + struct mscclppIbMrInfo ibRemoteMrInfo; }; struct mscclppComm { @@ -268,7 +273,9 @@ struct mscclppComm { // char intraPad2[64 - sizeof(uint64_t)]; // uint64_t intraBarrierGate; // only used if this is intraComm0 - struct mscclppProxyState proxyState; + struct mscclppIbContext *ibContext[MSCCLPP_IB_MAX_DEVS]; + + struct mscclppProxyState proxyState[MSCCLPP_IB_MAX_DEVS]; // // Whether this communicator uses collNet // int collNetSupport; diff --git a/src/include/ib.h b/src/include/ib.h new file mode 100644 index 00000000..77212fa3 --- /dev/null +++ b/src/include/ib.h @@ -0,0 +1,78 @@ +#ifndef MSCCLPP_IB_H_ +#define MSCCLPP_IB_H_ + +#include "mscclpp.h" +#include +#include +#include +#include + +#define MSCCLPP_IB_CQ_SIZE 1024 +#define MSCCLPP_IB_CQ_POLL_NUM 4 +#define MSCCLPP_IB_MAX_SENDS 64 +#define MSCCLPP_IB_MAX_DEVS 8 + +// MR info to be shared with the remote peer +struct mscclppIbMrInfo { + uint64_t addr; + uint32_t rkey; +}; + +// IB memory region +struct mscclppIbMr { + struct ibv_mr *mr; + void *buff; + struct mscclppIbMrInfo info; +}; + +// QP info to be shared with the remote peer +struct mscclppIbQpInfo { + uint16_t lid; + uint8_t port; + uint8_t linkLayer; + uint32_t qpn; + uint64_t spn; + int mtu; +}; + +// IB queue pair +struct mscclppIbQp { + struct ibv_qp *qp; + struct mscclppIbQpInfo info; + struct ibv_send_wr *wrs; + struct ibv_sge *sges; + int wrn; + + int rtr(const mscclppIbQpInfo *info); + int rts(); + int stageSend(struct mscclppIbMr *ibMr, const mscclppIbMrInfo *info, int size, + uint64_t wrId, unsigned int immData, int offset = 0); + int postSend(); + int postRecv(uint64_t wrId); +}; + +// Holds resources of a single IB device. +struct mscclppIbContext { + int numa_node; + struct ibv_context *ctx; + struct ibv_cq *cq; + struct ibv_pd *pd; + struct ibv_wc *wcs; + int wcn; + int *ports; + int nPorts; + struct mscclppIbQp *qps; + int nQps; + int maxQps; + struct mscclppIbMr *mrs; + int nMrs; + int maxMrs; +}; + +mscclppResult_t mscclppIbContextCreate(struct mscclppIbContext **ctx, const char *ibDevName); +mscclppResult_t mscclppIbContextDestroy(struct mscclppIbContext *ctx); +mscclppResult_t mscclppIbContextCreateQp(struct mscclppIbContext *ctx, struct mscclppIbQp **ibQp, int port = -1); +mscclppResult_t mscclppIbContextRegisterMr(struct mscclppIbContext *ctx, void *buff, size_t size, struct mscclppIbMr **ibMr); +mscclppResult_t mscclppIbContextPollCq(struct mscclppIbContext *ctx, int *wcNum); + +#endif diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index 860d7cf3..3d338fdf 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -102,11 +102,17 @@ mscclppResult_t mscclppBootStrapAllGather(mscclppComm_t comm, void* data, int si mscclppResult_t mscclppCommDestroy(mscclppComm_t comm); -mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, int *flag, int tag, +mscclppResult_t mscclppConnect(mscclppComm_t comm, int rankRecv, int rankSend, void *buff, size_t buffSize, int *flag, int tag, mscclppTransport_t transportType, const char *ibDev=NULL); mscclppResult_t mscclppConnectionSetup(mscclppComm_t comm); +mscclppResult_t mscclppProxyLaunch(mscclppComm_t comm); + +mscclppResult_t mscclppProxyStop(mscclppComm_t comm); + +mscclppResult_t mscclppGetLocalRank(mscclppComm_t comm, int *localRank); +mscclppResult_t mscclppGetNodeFromRank(mscclppComm_t comm, int rank, int *node); mscclppResult_t mscclppGetDevConns(mscclppComm_t comm, mscclppDevConn_t* devConns); #ifdef __cplusplus diff --git a/src/include/proxy.h b/src/include/proxy.h index e47dcf0a..fe997709 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -1,229 +1,16 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ - #ifndef MSCCLPP_PROXY_H_ #define MSCCLPP_PROXY_H_ -// #include "devcomm.h" -// #include "info.h" -#include "socket.h" -// #include -// #include "shm.h" - -// enum mscclppProxyOpState { mscclppProxyOpNone, mscclppProxyOpReady, mscclppProxyOpProgress }; - -// struct mscclppProxyArgs; -// typedef mscclppResult_t (*proxyProgressFunc_t)(struct mscclppComm*, struct mscclppProxyArgs*); - -// #define MSCCLPP_PROXY_MAX_SUBS MAXCHANNELS -// static_assert(MSCCLPP_MAX_WORK_ELEMENTS <= MAXCHANNELS, "Not enough sub space for max work elements"); - -// struct mscclppProxyOp { -// struct mscclppProxyConnection* connection; -// int channelId; -// int nsteps; -// ssize_t nbytes; -// int root; -// int next; - -// uint64_t opCount; -// int sliceSteps; -// int chunkSteps; -// int chunkSize; -// uint8_t /*mscclppDataType_t*/ dtype; -// uint8_t /*mscclppDevRedOp_t*/ redOp; -// uint8_t /*mscclppPattern_t*/ pattern; -// uint8_t protocol; - -// union { -// uint64_t unused; -// // For use by enqueue.cc -// struct mscclppProxyOp *enqNext; -// }; -// }; -// static_assert(sizeof(struct mscclppProxyOp) == 64, "Keep ProxyOp aligned with cache lines for effective prefetch"); - -// struct mscclppProxySubArgs { -// struct mscclppProxyConnection* connection; -// int channelId; -// int nsteps; -// ssize_t nbytes; -// int peer; - -// int groupSize; // Number of consecutive sub operations sharing the same recvComm -// uint64_t base; -// uint64_t posted; -// uint64_t received; -// uint64_t flushed; -// uint64_t transmitted; -// uint64_t done; -// uint64_t end; -// void* requests[MSCCLPP_STEPS]; -// void* profilingEvents[MSCCLPP_STEPS]; -// }; - -// struct mscclppProxyArgs { -// struct mscclppProxySubArgs subs[MSCCLPP_PROXY_MAX_SUBS]; -// proxyProgressFunc_t progress; -// int nsubs; -// int done; -// uint64_t opCount; -// int sliceSteps; -// int chunkSteps; -// int chunkSize; -// uint8_t /*mscclppDataType_t*/ dtype; -// uint8_t /*mscclppDevRedOp_t*/ redOp; -// uint8_t /*mscclppPattern_t*/ pattern; -// uint8_t protocol; -// int state; -// char* sharedBuff[MSCCLPP_STEPS]; -// int sharedSize[MSCCLPP_STEPS]; - -// int idle; - -// // Element linking -// struct mscclppProxyArgs* next; -// struct mscclppProxyArgs* nextPeer; -// struct mscclppProxyArgs** proxyAppendPtr; -// }; -// #define MSCCLPP_MAX_NETDEVS 128 - -// // ProxyOps are used to communicate between main thread and service thread -// // Make sure we have enough to store two full rounds of operations on all channels. -// // Otherwise we'd be unable to post half of them to free new elements. -// #define MAX_OPS_PER_PEER (2*MAXCHANNELS*MSCCLPP_MAX_WORK_ELEMENTS_P2P) -// #define MSCCLPP_MAX_LOCAL_RANKS 64 -// struct mscclppProxyOpsPool { -// struct mscclppProxyOp ops[MAX_OPS_PER_PEER*MSCCLPP_MAX_LOCAL_RANKS]; -// volatile int nextOps; -// volatile int nextOpsEnd; -// volatile int freeOps[MSCCLPP_MAX_LOCAL_RANKS]; -// pthread_mutex_t mutex; -// pthread_cond_t cond; -// }; - -// struct mscclppProxyOps { -// mscclppProxyOpsPool* pool; -// mscclppShmHandle_t handle; -// int count; -// int freeOp; -// int nextOps; -// int nextOpsEnd; -// }; - -// struct mscclppProxySharedP2p { -// int refcount; -// int size; -// char* cudaBuff; -// char* hostBuff; -// cudaIpcMemHandle_t ipc; -// struct mscclppProxyArgs* proxyAppend[MAXCHANNELS]; // Separate send and recv -// }; - -// struct mscclppProxySharedCollNet { -// int size; -// char* cudaBuff; -// char* hostBuff; -// struct mscclppProxyArgs* proxyAppend[2*MSCCLPP_MAX_NETDEVS]; -// void* resources; -// }; - -// struct mscclppProxyPeer { -// struct mscclppProxySharedP2p send; -// struct mscclppProxySharedP2p recv; -// }; - -// struct mscclppSharedNetComms { -// void* sendComm[MAXCHANNELS]; -// void* recvComm[MAXCHANNELS]; -// int sendRefCount[MAXCHANNELS]; -// int recvRefCount[MAXCHANNELS]; -// }; - -// struct mscclppProxyPool; -// struct mscclppProxyProgressState { -// // Used by main threads to send work to progress thread -// struct mscclppProxyOpsPool* opsPool; -// mscclppShmHandle_t handle; -// char opsPoolShmSuffix[6]; - -// pthread_t thread; -// bool stop; -// struct mscclppProxyPeer** localPeers; -// struct mscclppSharedNetComms* netComms[MSCCLPP_MAX_NETDEVS]; -// struct mscclppProxySharedCollNet collNet; -// struct mscclppProxyArgs* active; -// struct mscclppProxyArgs* pool; -// struct mscclppProxyPool* pools; -// int nextOps; -// }; +#include "mscclpp.h" +#include "comm.h" +#include struct mscclppProxyState { - // Service thread pthread_t thread; - struct mscclppSocket* listenSock; int stop; -// CUcontext cudaCtx; - - // Used by main thread - union mscclppSocketAddress* peerAddresses; - struct mscclppSocket* peerSocks; -// struct mscclppProxyOps* proxyOps; -// void** sharedDevMems; - - // Progress thread -// struct mscclppProxyProgressState progressState; }; -// enum proxyConnectState { -// connUninitialized = 0, -// connInitialized = 1, -// connSharedInitialized = 2, -// connSetupDone = 3, -// connConnected = 4, -// numConnStates = 5 -// }; +mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); +mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm); -// struct mscclppProxyConnection { -// int send, transport, shared; -// int localRank; -// struct mscclppSocket* sock; -// struct mscclppTransportComm* tcomm; -// struct mscclppProxyArgs *proxyAppend; -// struct mscclppProxyArgs **proxyAppendPtr; -// void* transportResources; -// proxyConnectState state; -// }; - -// typedef mscclppResult_t (*threadFunc_t)(struct mscclppProxyArgs*); - -// enum proxyMode { -// proxyRing = 0, -// proxyFrom = 1, -// proxyTo = 2 -// }; - -// mscclppResult_t mscclppProxySaveOp(struct mscclppComm* comm, struct mscclppProxyOp* proxyOp, bool *justInquire); -// mscclppResult_t mscclppProxyComputeP2p(struct mscclppInfo* info, struct mscclppProxyOp* proxyOp); -// mscclppResult_t mscclppProxyStart(struct mscclppComm* comm); -mscclppResult_t mscclppProxyInit(struct mscclppComm* comm, struct mscclppSocket* sock, union mscclppSocketAddress* peerAddresses); -// mscclppResult_t mscclppProxyCreate(struct mscclppComm* comm); -// mscclppResult_t mscclppProxyConnect(struct mscclppComm* comm, int transport, int send, int rank, struct mscclppProxyConnector* proxyConn); -// enum mscclppProxyMsgType { -// mscclppProxyMsgInit = 1, -// mscclppProxyMsgSharedInit = 2, -// mscclppProxyMsgSetup = 3, -// mscclppProxyMsgConnect = 4, -// mscclppProxyMsgStart = 5, -// mscclppProxyMsgClose = 6, -// mscclppProxyMsgAbort = 7, -// mscclppProxyMsgStop = 8 -// }; - -// mscclppResult_t mscclppProxyCall(struct mscclppProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize); -// mscclppResult_t mscclppProxyDestroy(struct mscclppComm* comm); -// mscclppResult_t mscclppProxyShmUnlink(struct mscclppComm* comm); #endif diff --git a/src/include/shmutils.h b/src/include/shmutils.h deleted file mode 100644 index 21dfa011..00000000 --- a/src/include/shmutils.h +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef MSCCLPP_SHMUTILS_H_ -#define MSCCLPP_SHMUTILS_H_ - -#include "mscclpp.h" - -mscclppResult_t mscclppShmutilsMapCreate(const char *name, size_t size, int *fd, void **map); -mscclppResult_t mscclppShmutilsMapOpen(const char *name, size_t size, int *fd, void **map); -mscclppResult_t mscclppShmutilsMapClose(const char *name, size_t size, int fd, void *map); -mscclppResult_t mscclppShmutilsMapDestroy(const char *name, size_t size, int fd, void *map); - -#endif diff --git a/tests/bootstrap_test.cc b/tests/bootstrap_test.cc index 502f5c0d..029cc405 100644 --- a/tests/bootstrap_test.cc +++ b/tests/bootstrap_test.cc @@ -1,24 +1,43 @@ #include "mscclpp.h" +#ifdef MSCCLPP_USE_MPI_FOR_TESTS +#include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS #include #include void print_usage(const char *prog) { +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT\n", prog); +#else printf("usage: %s IP:PORT rank nranks\n", prog); +#endif } int main(int argc, const char *argv[]) { +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc != 2) { + print_usage(argv[0]); + return -1; + } + MPI_Init(NULL, NULL); + const char *ip_port = argv[1]; + int rank; + int world_size; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); +#else if (argc != 4) { print_usage(argv[0]); return -1; } - - mscclppComm_t comm; const char *ip_port = argv[1]; int rank = atoi(argv[2]); int world_size = atoi(argv[3]); +#endif + mscclppComm_t comm; mscclppCommInitRank(&comm, world_size, rank, ip_port); int *buf = (int *)calloc(world_size, sizeof(int)); @@ -46,6 +65,10 @@ int main(int argc, const char *argv[]) return -1; } +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + MPI_Finalize(); +#endif + printf("Succeeded! %d\n", rank); return 0; } diff --git a/tests/bootstrap_test_mpi.cc b/tests/bootstrap_test_mpi.cc deleted file mode 100644 index 3ebb064d..00000000 --- a/tests/bootstrap_test_mpi.cc +++ /dev/null @@ -1,58 +0,0 @@ -#include "mscclpp.h" -#include "mpi.h" -#include -#include - -void print_usage(const char *prog) -{ - printf("usage: %s IP:PORT\n", prog); -} - -int main(int argc, const char *argv[]) -{ - if (argc != 2) { - print_usage(argv[0]); - return -1; - } - - MPI_Init(NULL, NULL); - - int rank; - int world_size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - - mscclppComm_t comm; - const char *ip_port = argv[1]; - mscclppCommInitRank(&comm, world_size, rank, ip_port); - - int *buf = (int *)calloc(world_size, sizeof(int)); - if (buf == nullptr) { - printf("calloc failed\n"); - return -1; - } - buf[rank] = rank; - mscclppResult_t res = mscclppBootStrapAllGather(comm, buf, sizeof(int)); - if (res != mscclppSuccess) { - printf("bootstrapAllGather failed\n"); - return -1; - } - - for (int i = 0; i < world_size; ++i) { - if (buf[i] != i) { - printf("wrong data: %d, expected %d\n", buf[i], i); - return -1; - } - } - - res = mscclppCommDestroy(comm); - if (res != mscclppSuccess) { - printf("mscclppDestroy failed\n"); - return -1; - } - - MPI_Finalize(); - - printf("Succeeded! %d\n", rank); - return 0; -} diff --git a/tests/p2p_test.cu b/tests/p2p_test.cu new file mode 100644 index 00000000..e4b15e8c --- /dev/null +++ b/tests/p2p_test.cu @@ -0,0 +1,260 @@ +#include "mscclpp.h" +#ifdef MSCCLPP_USE_MPI_FOR_TESTS +#include "mpi.h" +#endif // MSCCLPP_USE_MPI_FOR_TESTS +#include +#include +#include +#include + +#define RANKS_PER_NODE 8 +#define TEST_CONN_TYPE 1 // 0: P2P(for local)+IB(for remote), 1: IB-Only + +// Check CUDA RT calls +#define CUDACHECK(cmd) do { \ + cudaError_t err = cmd; \ + if( err != cudaSuccess ) { \ + printf("Cuda failure '%s'", cudaGetErrorString(err)); \ + exit(EXIT_FAILURE); \ + } \ +} while(false) + +__global__ void kernel(mscclppDevConn_t devConns, int rank, int world_size) +{ + int tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid == 0) { + // Get sending data and send flag + volatile int *data; + for (int i = 0; i < (world_size - 1) * 2; ++i) { + mscclppDevConn_t devConn = &devConns[i]; + int tag = devConn->tag; + int rankSend = tag % world_size; + if (rankSend == rank) { // I am a sender + data = (volatile int *)devConn->localBuff; + // We are sending the same data to all peers, so just break here + break; + } + } + + // Set my data + *data = rank + 1; + + // Set send flags to inform all peers that the data is ready + for (int i = 0; i < (world_size - 1) * 2; ++i) { + mscclppDevConn_t devConn = &devConns[i]; + int tag = devConn->tag; + int rankSend = tag % world_size; + if (rankSend == rank) { // I am a sender + *((volatile int *)devConn->localFlag) = 1; + } + } + + // Read data from all other peers + for (int i = 0; i < (world_size - 1) * 2; ++i) { + mscclppDevConn_t devConn = &devConns[i]; + int tag = devConn->tag; + int rankSend = tag % world_size; + int rankRecv = tag / world_size; + if (rankRecv == rank) { // I am a receiver + if (devConn->remoteBuff == NULL) { // IB + volatile int *localFlag = (volatile int *)devConn->localFlag; + + // Wait until the data comes in via proxy + while (*localFlag != 1) {} + } else { // P2P + volatile int *remoteData = (volatile int *)devConn->remoteBuff; + volatile int *remoteFlag = (volatile int *)devConn->remoteFlag; + + // Wait until the remote data is set + while (*remoteFlag != 1) {} + + // Read remote data + data[rankSend] = remoteData[rankSend]; + } + } + } + + // Wait until the proxy have sent my data to all peers + for (int i = 0; i < (world_size - 1) * 2; ++i) { + mscclppDevConn_t devConn = &devConns[i]; + int tag = devConn->tag; + int rankSend = tag % world_size; + if (rankSend == rank) { // I am a sender + volatile int *flag = (volatile int *)devConn->localFlag; + while (*flag == 1) {} + } + } + } +} + +int rankToLocalRank(int rank) +{ + return rank % RANKS_PER_NODE; +} + +int rankToNode(int rank) +{ + return rank / RANKS_PER_NODE; +} + +void print_usage(const char *prog) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + printf("usage: %s IP:PORT [rank nranks]\n", prog); +#else + printf("usage: %s IP:PORT rank nranks\n", prog); +#endif +} + +int main(int argc, const char *argv[]) +{ +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc != 2 && argc != 4) { + print_usage(argv[0]); + return -1; + } + const char *ip_port = argv[1]; + int rank; + int world_size; + if (argc == 4) { + rank = atoi(argv[2]); + world_size = atoi(argv[3]); + } else { + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + } +#else + if (argc != 4) { + print_usage(argv[0]); + return -1; + } + const char *ip_port = argv[1]; + int rank = atoi(argv[2]); + int world_size = atoi(argv[3]); +#endif + + mscclppComm_t comm; + mscclppResult_t res = mscclppCommInitRank(&comm, world_size, rank, ip_port); + if (res != mscclppSuccess) { + printf("mscclppCommInitRank failed\n"); + return -1; + } + + int *data_d; + int *send_flags_d; + int *recv_flags_d; + CUDACHECK(cudaMalloc(&data_d, sizeof(int) * world_size)); + CUDACHECK(cudaHostAlloc(&send_flags_d, sizeof(int) * (world_size - 1), cudaHostAllocMapped)); + CUDACHECK(cudaHostAlloc(&recv_flags_d, sizeof(int) * (world_size - 1), cudaHostAllocMapped)); + + CUDACHECK(cudaMemset(data_d, 0, sizeof(int) * world_size)); + // CUDACHECK(cudaMemcpy(data_d, tmp, sizeof(int) * 2, cudaMemcpyHostToDevice)); + // printf("rank %d CPU: setting data at %p\n", rank, data_d + rank); + memset(send_flags_d, 0, sizeof(int) * (world_size - 1)); + memset(recv_flags_d, 0, sizeof(int) * (world_size - 1)); + + int localRank = rankToLocalRank(rank); + int thisNode = rankToNode(rank); + std::string ibDev = "mlx5_ib" + std::to_string(localRank); + + // Read from all other ranks + int idx = 0; + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + int tag = rank * world_size + r; +#if (TEST_CONN_TYPE == 0) // P2P+IB + int node = rankToNode(r); + if (node == thisNode) { + res = mscclppConnect(comm, rank, r, data_d + r, sizeof(int), recv_flags_d + idx, tag, mscclppTransportP2P); + } else { + res = mscclppConnect(comm, rank, r, data_d + r, sizeof(int), recv_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str()); + } +#else // (TEST_CONN_TYPE == 1) // IB-Only + res = mscclppConnect(comm, rank, r, data_d + r, sizeof(int), recv_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str()); +#endif + if (res != mscclppSuccess) { + printf("mscclppConnect failed\n"); + return -1; + } + ++idx; + } + // Let others read from me + idx = 0; + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + int tag = r * world_size + rank; +#if (TEST_CONN_TYPE == 0) // P2P+IB + int node = rankToNode(r); + if (node == thisNode) { + res = mscclppConnect(comm, r, rank, data_d + rank, sizeof(int), send_flags_d + idx, tag, mscclppTransportP2P); + } else { + res = mscclppConnect(comm, r, rank, data_d + rank, sizeof(int), send_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str()); + } +#else // (TEST_CONN_TYPE == 1) // IB-Only + res = mscclppConnect(comm, r, rank, data_d + rank, sizeof(int), send_flags_d + idx, tag, mscclppTransportIB, ibDev.c_str()); +#endif + if (res != mscclppSuccess) { + printf("mscclppConnect failed\n"); + return -1; + } + ++idx; + } + + res = mscclppConnectionSetup(comm); + if (res != mscclppSuccess) { + printf("mscclppConnectionSetup failed\n"); + return -1; + } + + res = mscclppProxyLaunch(comm); + if (res != mscclppSuccess) { + printf("mscclppProxyLaunch failed\n"); + return -1; + } + + mscclppDevConn_t devConns; + mscclppGetDevConns(comm, &devConns); + + kernel<<<1, 1>>>(devConns, rank, world_size); + CUDACHECK(cudaDeviceSynchronize()); + + res = mscclppProxyStop(comm); + if (res != mscclppSuccess) { + printf("mscclppProxyStop failed\n"); + return -1; + } + + // Read results from GPU + int *buf = (int *)calloc(world_size, sizeof(int)); + if (buf == nullptr) { + printf("calloc failed\n"); + return -1; + } + CUDACHECK(cudaMemcpy(buf, data_d, sizeof(int) * world_size, cudaMemcpyDeviceToHost)); + + bool failed = false; + for (int i = 0; i < world_size; ++i) { + if (buf[i] != i + 1) { + printf("rank: %d, wrong data: %d, expected %d\n", rank, buf[i], i + 1); + failed = true; + } + } + if (failed) { + return -1; + } + + res = mscclppCommDestroy(comm); + if (res != mscclppSuccess) { + printf("mscclppDestroy failed\n"); + return -1; + } + +#ifdef MSCCLPP_USE_MPI_FOR_TESTS + if (argc == 2) { + MPI_Finalize(); + } +#endif + printf("Succeeded! %d\n", rank); + return 0; +}