This commit is contained in:
Ubuntu
2026-03-09 20:05:46 +00:00
committed by Ubuntu
parent 07d97f6f17
commit 8cecfee270
3 changed files with 23 additions and 9 deletions

View File

@@ -21,7 +21,7 @@ def send_recv_test(name, nnodes, gpus_per_node, split_mask):
use_double_scratch_buffer=False,
min_message_size=0,
max_message_size=2**64 - 1,
instances=1
instances=4
):
# Creating separate port channels for next and prev directions.
# When prev and next are the same peer (e.g., 2-node ring), both channels go to the same peer

View File

@@ -309,6 +309,14 @@ IBConnection::IBConnection(std::shared_ptr<Context> context, const Endpoint& loc
// Pre-post receive requests for incoming WRITE_WITH_IMM notifications.
// The recv CQE guarantees the preceding data WRITE has been committed to GPU memory.
auto qp = qp_.lock();
// dataDirectEnabled_ = localImpl.ibSignalGpuMr_ && localImpl.ibSignalGpuMr_->isDataDirect() &&
// localSignalGpuMap_ && localSignalGpuMap_->valid();
dataDirectEnabled_ = true;
if (dataDirectEnabled_) {
INFO(CONN, "IBConnection: Data Direct enabled");
}
// Pre-post receive requests for incoming write-with-imm
int maxRecvWr = localEndpoint.config().ib.maxRecvWr;
for (int i = 0; i < maxRecvWr; ++i) {
qp->stageRecv(/*wrId=*/0);

View File

@@ -96,6 +96,7 @@ namespace {
auto hasIBDevices = []() { return mscclpp::getIBDeviceCount() > 0; };
auto useIB = [](int rank1, int rank2, int nranksPerNode) {
return true;
bool inSameNode = rank1 / nranksPerNode == rank2 / nranksPerNode;
return hasIBDevices() && !inSameNode;
};
@@ -109,7 +110,7 @@ namespace mscclpp {
struct ExecutionContext {
std::shared_ptr<ProxyService> proxyService;
std::vector<Connection> connections;
std::vector<Connection> connections; // one connection (unique QP) per channel
std::vector<std::shared_ptr<NvlsConnection>> nvlsConnections;
MemoryId localMemoryIdBegin = MemoryId(0);
@@ -264,7 +265,10 @@ struct Executor::Impl {
}
};
std::unordered_map<int, int> peerTags;
// Create one connection (unique QP) per channel entry. Each channel gets its own
// QP — no shared connections. This is required for HostNoAtomic IB mode where each
// connection can only forward signals to one semaphore via setSignalForwardingDst.
int tag = 0;
Transport ibTransport = IBs[rank % this->nranksPerNode];
std::vector<std::shared_future<Connection>> connFutures;
for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) {
@@ -272,19 +276,20 @@ struct Executor::Impl {
for (const auto& info : channelInfos) {
for (int peer : info.connectedPeers) {
Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc;
connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++));
connFutures.push_back(this->comm->connect(transport, peer, tag++));
}
}
channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType);
for (const auto& info : channelInfos) {
for (int peer : info.connectedPeers) {
Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc;
connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++));
connFutures.push_back(this->comm->connect(transport, peer, tag++));
}
}
}
for (size_t i = 0; i < connectionFutures.size(); i++) {
context.connections[connectedPeers[i]] = connectionFutures[i].get();
for (auto& future : connFutures) {
context.connections.push_back(future.get());
}
std::vector<NvlsInfo> nvlsInfos = plan.impl_->nvlsInfos.at(rank);
@@ -338,10 +343,11 @@ struct Executor::Impl {
std::vector<std::shared_future<Semaphore>> futureProxySemaphores;
std::vector<std::shared_ptr<MemoryDevice2DeviceSemaphore>> memorySemaphores;
std::vector<mscclpp::SemaphoreId> proxySemaphores;
int connIdx = 0;
auto processChannelInfos = [&](std::vector<ChannelInfo>& channelInfos) {
for (ChannelInfo& info : channelInfos) {
for (int peer : info.connectedPeers) {
auto connection = context.connections.at(peer);
for (size_t i = 0; i < info.connectedPeers.size(); i++) {
auto& connection = context.connections[connIdx++];
if (info.channelType == ChannelType::MEMORY) {
futureMemorySemaphores.push_back(this->comm->buildSemaphore(
connection, this->comm->remoteRankOf(connection), this->comm->tagOf(connection)));