From 251873ca8eea007d659eee2c5dfdd553ab366133 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 9 Mar 2026 22:38:08 +0000 Subject: [PATCH] update --- include/mscclpp/core.hpp | 3 ++- src/core/executor/executor.cc | 35 +++++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 37bdbd51..5b184f0a 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -430,7 +431,7 @@ struct EndpointConfig { int maxWrPerSend = DefaultMaxWrPerSend, Mode mode = Mode::Default) : deviceIndex(deviceIndex), port(port), - gidIndex(gidIndex), + gidIndex(env()->ibGidIndex > 0 ? env()->ibGidIndex : gidIndex), maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), diff --git a/src/core/executor/executor.cc b/src/core/executor/executor.cc index bf2caf97..3020cbec 100644 --- a/src/core/executor/executor.cc +++ b/src/core/executor/executor.cc @@ -109,7 +109,7 @@ namespace mscclpp { struct ExecutionContext { std::shared_ptr proxyService; - std::unordered_map connections; + std::vector connections; std::vector> nvlsConnections; MemoryId localMemoryIdBegin = MemoryId(0); @@ -121,8 +121,6 @@ struct ExecutionContext { // local registered memories to keep resources alive std::vector localRegisteredMemories; - std::vector> memorySemaphores; - std::vector proxySemaphores; std::vector memoryChannels; std::vector portChannels; std::vector nvlsChannels; @@ -266,12 +264,24 @@ struct Executor::Impl { } }; - std::vector connectedPeers = plan.impl_->getConnectedPeers(); - std::vector> connectionFutures; - for (int peer : connectedPeers) { - Transport transport = - !useIB(rank, peer, this->nranksPerNode) ? Transport::CudaIpc : IBs[rank % this->nranksPerNode]; - connectionFutures.push_back(this->comm->connect(transport, peer)); + std::unordered_map peerTags; + Transport ibTransport = IBs[rank % this->nranksPerNode]; + std::vector> connFutures; + for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) { + std::vector channelInfos = plan.impl_->getChannelInfos(channelType); + for (const auto& info : channelInfos) { + for (int peer : info.connectedPeers) { + Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; + connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++)); + } + } + channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType); + for (const auto& info : channelInfos) { + for (int peer : info.connectedPeers) { + Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc; + connFutures.push_back(this->comm->connect(transport, peer, peerTags[peer]++)); + } + } } for (size_t i = 0; i < connectionFutures.size(); i++) { context.connections[connectedPeers[i]] = connectionFutures[i].get(); @@ -360,18 +370,15 @@ struct Executor::Impl { proxySemaphores.push_back(context.proxyService->addSemaphore(sem.get())); } - context.memorySemaphores = std::move(memorySemaphores); - context.proxySemaphores = std::move(proxySemaphores); - for (ChannelType channelType : channelTypes) { std::vector channelInfos = plan.impl_->getChannelInfos(channelType); int index = 0; for (ChannelInfo& info : channelInfos) { for (size_t i = 0; i < info.connectedPeers.size(); i++) { if (channelType == ChannelType::MEMORY) { - context.memoryChannels.emplace_back(context.memorySemaphores[index++]); + context.memoryChannels.emplace_back(memorySemaphores[index++]); } else if (channelType == ChannelType::PORT) { - context.portChannels.emplace_back(context.proxyService->basePortChannel(context.proxySemaphores[index++])); + context.portChannels.emplace_back(context.proxyService->basePortChannel(proxySemaphores[index++])); } } }