From 8cecfee270ebf7ac169f4c1a388dde5198c43b70 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 20:05:46 +0000 Subject: [PATCH] debug --- .../default_algos/mscclpp_send_recv.py | 2 +- src/core/connection.cc | 8 +++++++ src/core/executor/executor.cc | 22 ++++++++++++------- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/python/mscclpp/default_algos/mscclpp_send_recv.py b/python/mscclpp/default_algos/mscclpp_send_recv.py index ed7cc9b7..ef052210 100644 --- a/python/mscclpp/default_algos/mscclpp_send_recv.py +++ b/python/mscclpp/default_algos/mscclpp_send_recv.py @@ -21,7 +21,7 @@ def send_recv_test(name, nnodes, gpus_per_node, split_mask): use_double_scratch_buffer=False, min_message_size=0, max_message_size=2**64 - 1, - instances=1 + instances=4 ): # Creating separate port channels for next and prev directions. # When prev and next are the same peer (e.g., 2-node ring), both channels go to the same peer diff --git a/src/core/connection.cc b/src/core/connection.cc index 8b6c0afb..d0fb19e7 100644 --- a/src/core/connection.cc +++ b/src/core/connection.cc @@ -309,6 +309,14 @@ IBConnection::IBConnection(std::shared_ptr context, const Endpoint& loc // Pre-post receive requests for incoming WRITE_WITH_IMM notifications. // The recv CQE guarantees the preceding data WRITE has been committed to GPU memory. auto qp = qp_.lock(); + // dataDirectEnabled_ = localImpl.ibSignalGpuMr_ && localImpl.ibSignalGpuMr_->isDataDirect() && + // localSignalGpuMap_ && localSignalGpuMap_->valid(); + dataDirectEnabled_ = true; + if (dataDirectEnabled_) { + INFO(CONN, "IBConnection: Data Direct enabled"); + } + + // Pre-post receive requests for incoming write-with-imm int maxRecvWr = localEndpoint.config().ib.maxRecvWr; for (int i = 0; i < maxRecvWr; ++i) { qp->stageRecv(/*wrId=*/0); diff --git a/src/core/executor/executor.cc b/src/core/executor/executor.cc index 3020cbec..b5510b63 100644 --- a/src/core/executor/executor.cc +++ b/src/core/executor/executor.cc @@ -96,6 +96,7 @@ namespace { auto hasIBDevices = []() { return mscclpp::getIBDeviceCount() > 0; }; auto useIB = [](int rank1, int rank2, int nranksPerNode) { + return true; bool inSameNode = rank1 / nranksPerNode == rank2 / nranksPerNode; return hasIBDevices() && !inSameNode; }; @@ -109,7 +110,7 @@ namespace mscclpp { struct ExecutionContext { std::shared_ptr proxyService; - std::vector connections; + std::vector connections; // one connection (unique QP) per channel std::vector> nvlsConnections; MemoryId localMemoryIdBegin = MemoryId(0); @@ -264,7 +265,10 @@ struct Executor::Impl { } }; - std::unordered_map peerTags; + // Create one connection (unique QP) per channel entry. Each channel gets its own + // QP — no shared connections. This is required for HostNoAtomic IB mode where each + // connection can only forward signals to one semaphore via setSignalForwardingDst. + int tag = 0; Transport ibTransport = IBs[rank % this->nranksPerNode]; std::vector> connFutures; for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) { @@ -272,19 +276,20 @@ struct Executor::Impl { for (const auto& info : channelInfos) { for (int peer : info.connectedPeers) { Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; - connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++)); + connFutures.push_back(this->comm->connect(transport, peer, tag++)); } } channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType); for (const auto& info : channelInfos) { for (int peer : info.connectedPeers) { Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; - connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++)); + connFutures.push_back(this->comm->connect(transport, peer, tag++)); } } } - for (size_t i = 0; i < connectionFutures.size(); i++) { - context.connections[connectedPeers[i]] = connectionFutures[i].get(); + + for (auto& future : connFutures) { + context.connections.push_back(future.get()); } std::vector nvlsInfos = plan.impl_->nvlsInfos.at(rank); @@ -338,10 +343,11 @@ struct Executor::Impl { std::vector> futureProxySemaphores; std::vector> memorySemaphores; std::vector proxySemaphores; + int connIdx = 0; auto processChannelInfos = [&](std::vector& channelInfos) { for (ChannelInfo& info : channelInfos) { - for (int peer : info.connectedPeers) { - auto connection = context.connections.at(peer); + for (size_t i = 0; i < info.connectedPeers.size(); i++) { + auto& connection = context.connections[connIdx++]; if (info.channelType == ChannelType::MEMORY) { futureMemorySemaphores.push_back(this->comm->buildSemaphore( connection, this->comm->remoteRankOf(connection), this->comm->tagOf(connection)));