From 060982d25350ffd38d5ec03578f8476edccc243f Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Thu, 26 Feb 2026 12:40:58 -0800 Subject: [PATCH] updates --- src/core/connection.cc | 2 +- src/core/endpoint.cc | 2 +- src/core/ib.cc | 18 ++++++++++-------- src/core/include/ib.hpp | 7 ++++--- test/mp_unit/ib_tests.cu | 9 ++++++++- 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/core/connection.cc b/src/core/connection.cc index 1c528a01..e8672277 100644 --- a/src/core/connection.cc +++ b/src/core/connection.cc @@ -276,7 +276,7 @@ IBConnection::IBConnection(std::shared_ptr context, const Endpoint& loc if (ibNoAtomic_) { #if defined(MSCCLPP_USE_CUDA) if (!gdrEnabled()) { - const char* reason = "unknown"; + std::string reason = "unknown"; switch (gdrStatus()) { case GdrStatus::NotBuilt: reason = "mscclpp was not built with GDRCopy support (MSCCLPP_USE_GDRCOPY not set)"; diff --git a/src/core/endpoint.cc b/src/core/endpoint.cc index 05653885..6569a31e 100644 --- a/src/core/endpoint.cc +++ b/src/core/endpoint.cc @@ -51,7 +51,7 @@ Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl) ibQp_ = contextImpl.getIbContext(config_.transport) ->createQp(config_.ib.port, config_.ib.gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum, - config_.ib.maxSendWr, maxRecvWr, config_.ib.maxWrPerSend); + config_.ib.maxSendWr, maxRecvWr, config_.ib.maxWrPerSend, ibNoAtomic_); ibQpInfo_ = ibQp_->getInfo(); // Allocate a 64-bit signal GPU buffer for write-with-imm data payload (ibNoAtomic_ only). diff --git a/src/core/ib.cc b/src/core/ib.cc index 2e7b867d..baa01727 100644 --- a/src/core/ib.cc +++ b/src/core/ib.cc @@ -131,7 +131,7 @@ const void* IbMr::getBuff() const { return buff_; } uint32_t IbMr::getLkey() const { return mr_->lkey; } IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendCqSize, int maxSendCqPollNum, - int maxSendWr, int maxRecvWr, int maxWrPerSend) + int maxSendWr, int maxRecvWr, int maxWrPerSend, bool noAtomic) : portNum_(portNum), gidIndex_(gidIndex), info_(), @@ -151,7 +151,8 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendC maxSendCqPollNum_(maxSendCqPollNum), maxSendWr_(maxSendWr), maxWrPerSend_(maxWrPerSend), - maxRecvWr_(maxRecvWr) { + maxRecvWr_(maxRecvWr), + noAtomic_(noAtomic) { sendCq_ = IBVerbs::ibv_create_cq(ctx, maxSendCqSize, nullptr, nullptr, 0); if (sendCq_ == nullptr) { THROW(NET, IbError, errno, "ibv_create_cq failed (errno ", errno, ")"); @@ -211,7 +212,8 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendC qpAttr.qp_state = IBV_QPS_INIT; qpAttr.pkey_index = 0; qpAttr.port_num = portNum_; - qpAttr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC; + qpAttr.qp_access_flags = noAtomic_ ? IBV_ACCESS_REMOTE_WRITE + : (IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC); if (IBVerbs::ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) { THROW(NET, IbError, errno, "ibv_modify_qp failed (errno ", errno, ")"); } @@ -240,7 +242,7 @@ void IbQp::rtr(const IbQpInfo& info) { qp_attr.path_mtu = static_cast(info.mtu); qp_attr.dest_qp_num = info.qpn; qp_attr.rq_psn = 0; - qp_attr.max_dest_rd_atomic = 1; + qp_attr.max_dest_rd_atomic = noAtomic_ ? 0 : 1; qp_attr.min_rnr_timer = 0x12; if (info.linkLayer == IBV_LINK_LAYER_ETHERNET || info.isGrh) { qp_attr.ah_attr.is_global = 1; @@ -272,7 +274,7 @@ void IbQp::rts() { qp_attr.retry_cnt = 7; qp_attr.rnr_retry = 7; qp_attr.sq_psn = 0; - qp_attr.max_rd_atomic = 1; + qp_attr.max_rd_atomic = noAtomic_ ? 0 : 1; int ret = IBVerbs::ibv_modify_qp( qp_, &qp_attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC); @@ -512,7 +514,7 @@ int IbCtx::getAnyUsablePort(int gidIndex) const { } std::shared_ptr IbCtx::createQp(int port, int gidIndex, int maxSendCqSize, int maxSendCqPollNum, int maxSendWr, - int maxRecvWr, int maxWrPerSend) { + int maxRecvWr, int maxWrPerSend, bool noAtomic) { if (port == -1) { port = this->getAnyUsablePort(gidIndex); if (port == -1) { @@ -521,8 +523,8 @@ std::shared_ptr IbCtx::createQp(int port, int gidIndex, int maxSendCqSize, } else if (!this->isPortUsable(port, gidIndex)) { THROW(NET, Error, ErrorCode::InvalidUsage, "invalid IB port: ", port); } - return std::shared_ptr( - new IbQp(ctx_, pd_, port, gidIndex, maxSendCqSize, maxSendCqPollNum, maxSendWr, maxRecvWr, maxWrPerSend)); + return std::shared_ptr(new IbQp(ctx_, pd_, port, gidIndex, maxSendCqSize, maxSendCqPollNum, maxSendWr, + maxRecvWr, maxWrPerSend, noAtomic)); } std::unique_ptr IbCtx::registerMr(void* buff, std::size_t size) { diff --git a/src/core/include/ib.hpp b/src/core/include/ib.hpp index e9363e9c..bfa6e314 100644 --- a/src/core/include/ib.hpp +++ b/src/core/include/ib.hpp @@ -101,7 +101,7 @@ class IbQp { }; IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendCqSize, int maxSendCqPollNum, int maxSendWr, - int maxRecvWr, int maxWrPerSend); + int maxRecvWr, int maxWrPerSend, bool noAtomic); SendWrInfo getNewSendWrInfo(); RecvWrInfo getNewRecvWrInfo(); @@ -128,6 +128,7 @@ class IbQp { const int maxSendWr_; const int maxWrPerSend_; const int maxRecvWr_; + const bool noAtomic_; friend class IbCtx; }; @@ -139,14 +140,14 @@ class IbCtx { ~IbCtx(); std::shared_ptr createQp(int port, int gidIndex, int maxSendCqSize, int maxSendCqPollNum, int maxSendWr, - int maxRecvWr, int maxWrPerSend); + int maxRecvWr, int maxWrPerSend, bool noAtomic); std::unique_ptr registerMr(void* buff, std::size_t size); bool supportsRdmaAtomics() const; #else IbCtx([[maybe_unused]] const std::string& devName) {} ~IbCtx() {} - std::shared_ptr createQp(int, int, int, int, int, int, int) { return nullptr; } + std::shared_ptr createQp(int, int, int, int, int, int, int, bool) { return nullptr; } std::unique_ptr registerMr([[maybe_unused]] void* buff, [[maybe_unused]] std::size_t size) { return nullptr; } diff --git a/test/mp_unit/ib_tests.cu b/test/mp_unit/ib_tests.cu index 051030ac..4397a04f 100644 --- a/test/mp_unit/ib_tests.cu +++ b/test/mp_unit/ib_tests.cu @@ -42,7 +42,8 @@ void IbPeerToPeerTest::SetUp() { int ib_gid_index = std::stoi(gEnv->args["ib_gid_index"]); ibCtx = std::make_shared(ibDevName); - qp = ibCtx->createQp(-1, ib_gid_index, 1024, 1, 8192, 0, 64); + bool noAtomic = !ibCtx->supportsRdmaAtomics(); + qp = ibCtx->createQp(-1, ib_gid_index, 1024, 1, 8192, 0, 64, noAtomic); qpInfo[gEnv->rank] = qp->getInfo(); bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo)); @@ -200,6 +201,9 @@ TEST_F(IbPeerToPeerTest, MemoryConsistency) { // This test needs only two ranks return; } + if (!ibCtx->supportsRdmaAtomics()) { + GTEST_SKIP() << "This test requires RDMA atomics support."; + } const uint64_t signalPeriod = 1024; const uint64_t maxIter = 10000; @@ -308,6 +312,9 @@ TEST_F(IbPeerToPeerTest, SimpleAtomicAdd) { // This test needs only two ranks return; } + if (!ibCtx->supportsRdmaAtomics()) { + GTEST_SKIP() << "This test requires RDMA atomics support."; + } mscclpp::Timer timeout(3);