Unique QP per channel and env-controlled GID index

- Change executor to create one connection (unique QP) per channel entry
  instead of sharing connections per peer. This is required for HostNoAtomic
  IB mode where each connection can only forward signals to one semaphore
  via setSignalForwardingDst.

- Add MSCCLPP_IB_GID_INDEX environment variable to override the default
  GID index (3) used for IB transport. Set to the desired GID index value,
  or leave unset/-1 to use the default.
This commit is contained in:
Ubuntu
2026-03-09 20:27:28 +00:00
parent bf946ea51e
commit 30777565ac
4 changed files with 42 additions and 13 deletions

View File

@@ -110,6 +110,10 @@ class Env {
/// Default is false.
const bool forceDisableNvls;
/// Env name: `MSCCLPP_IB_GID_INDEX`. The GID index to use for IB transport.
/// If unset or set to -1, it defaults to `EndpointConfig::Ib::DefaultGidIndex` (3).
const int ibGidIndex;
private:
Env();

View File

@@ -49,8 +49,14 @@ Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl)
int maxRecvWr = ibNoAtomic_ ? config_.ib.maxRecvWr : 0;
// Override GID index from environment variable if set
int gidIndex = config_.ib.gidIndex;
if (env()->ibGidIndex >= 0) {
gidIndex = env()->ibGidIndex;
}
ibQp_ = contextImpl.getIbContext(config_.transport)
->createQp(config_.ib.port, config_.ib.gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum,
->createQp(config_.ib.port, gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum,
config_.ib.maxSendWr, maxRecvWr, config_.ib.maxWrPerSend);
ibQpInfo_ = ibQp_->getInfo();
} else if (config_.transport == Transport::Ethernet) {

View File

@@ -65,7 +65,8 @@ Env::Env()
ncclSharedLibPath(readEnv<std::string>("MSCCLPP_NCCL_LIB_PATH", "")),
forceNcclFallbackOperation(readEnv<std::string>("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", "")),
ncclSymmetricMemory(readEnv<bool>("MSCCLPP_NCCL_SYMMETRIC_MEMORY", false)),
forceDisableNvls(readEnv<bool>("MSCCLPP_FORCE_DISABLE_NVLS", false)) {}
forceDisableNvls(readEnv<bool>("MSCCLPP_FORCE_DISABLE_NVLS", false)),
ibGidIndex(readEnv<int>("MSCCLPP_IB_GID_INDEX", -1)) {}
std::shared_ptr<Env> env() {
static std::shared_ptr<Env> globalEnv = std::shared_ptr<Env>(new Env());
@@ -93,6 +94,7 @@ std::shared_ptr<Env> env() {
logEnv("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", globalEnv->forceNcclFallbackOperation);
logEnv("MSCCLPP_NCCL_SYMMETRIC_MEMORY", globalEnv->ncclSymmetricMemory);
logEnv("MSCCLPP_FORCE_DISABLE_NVLS", globalEnv->forceDisableNvls);
logEnv("MSCCLPP_IB_GID_INDEX", globalEnv->ibGidIndex);
}
return globalEnv;
}

View File

@@ -109,7 +109,7 @@ namespace mscclpp {
struct ExecutionContext {
std::shared_ptr<ProxyService> proxyService;
std::unordered_map<int, Connection> connections;
std::vector<Connection> connections; // one connection (unique QP) per channel
std::vector<std::shared_ptr<NvlsConnection>> nvlsConnections;
MemoryId localMemoryIdBegin = MemoryId(0);
@@ -266,15 +266,31 @@ struct Executor::Impl {
}
};
std::vector<int> connectedPeers = plan.impl_->getConnectedPeers();
std::vector<std::shared_future<mscclpp::Connection>> connectionFutures;
for (int peer : connectedPeers) {
Transport transport =
!useIB(rank, peer, this->nranksPerNode) ? Transport::CudaIpc : IBs[rank % this->nranksPerNode];
connectionFutures.push_back(this->comm->connect(transport, peer));
// Create one connection (unique QP) per channel entry. Each channel gets its own
// QP — no shared connections. This is required for HostNoAtomic IB mode where each
// connection can only forward signals to one semaphore via setSignalForwardingDst.
int tag = 0;
Transport ibTransport = IBs[rank % this->nranksPerNode];
std::vector<std::shared_future<Connection>> connFutures;
for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) {
std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(channelType);
for (const auto& info : channelInfos) {
for (int peer : info.connectedPeers) {
Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc;
connFutures.push_back(this->comm->connect(transport, peer, tag++));
}
}
channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType);
for (const auto& info : channelInfos) {
for (int peer : info.connectedPeers) {
Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc;
connFutures.push_back(this->comm->connect(transport, peer, tag++));
}
}
}
for (size_t i = 0; i < connectionFutures.size(); i++) {
context.connections[connectedPeers[i]] = connectionFutures[i].get();
for (auto& future : connFutures) {
context.connections.push_back(future.get());
}
std::vector<NvlsInfo> nvlsInfos = plan.impl_->nvlsInfos.at(rank);
@@ -328,10 +344,11 @@ struct Executor::Impl {
std::vector<std::shared_future<Semaphore>> futureProxySemaphores;
std::vector<std::shared_ptr<MemoryDevice2DeviceSemaphore>> memorySemaphores;
std::vector<mscclpp::SemaphoreId> proxySemaphores;
int connIdx = 0;
auto processChannelInfos = [&](std::vector<ChannelInfo>& channelInfos) {
for (ChannelInfo& info : channelInfos) {
for (int peer : info.connectedPeers) {
auto connection = context.connections.at(peer);
for (size_t i = 0; i < info.connectedPeers.size(); i++) {
auto& connection = context.connections[connIdx++];
if (info.channelType == ChannelType::MEMORY) {
futureMemorySemaphores.push_back(this->comm->buildSemaphore(
connection, this->comm->remoteRankOf(connection), this->comm->tagOf(connection)));