This commit is contained in:
Ubuntu
2026-03-09 20:05:46 +00:00
parent 4892b4ebea
commit 5d9f7612f9
3 changed files with 34 additions and 16 deletions

View File

@@ -21,7 +21,7 @@ def send_recv_test(name, nnodes, gpus_per_node, split_mask):
use_double_scratch_buffer=False,
min_message_size=0,
max_message_size=2**64 - 1,
instances=1
instances=4
):
# Creating port channels
group_size = split_mask + 1
@@ -57,8 +57,7 @@ def send_recv_test(name, nnodes, gpus_per_node, split_mask):
dst_rank = Rank(next_global_rank_id)
dst_buffer = dst_rank.get_output_buffer()
port_channels[(next_global_rank_id, global_rank_id)].put(dst_buffer[:], src_buffer[:], tb=0)
port_channels[(next_global_rank_id, global_rank_id)].signal(tb=0, data_sync=SyncType.before)
port_channels[(next_global_rank_id, global_rank_id)].put_with_signal_and_flush(dst_buffer[:], src_buffer[:], tb=0)
port_channels[(prev_global_rank_id, global_rank_id)].wait(tb=0, data_sync=SyncType.none)
print(JSON())

View File

@@ -324,8 +324,9 @@ IBConnection::IBConnection(std::shared_ptr<Context> context, const Endpoint& loc
// When all conditions are met, RDMA data writes and GDRCopy token writes both go
// through the Data Direct engine, guaranteeing GPU memory visibility at CQE poll time.
auto qp = qp_.lock();
dataDirectEnabled_ = localImpl.ibSignalGpuMr_ && localImpl.ibSignalGpuMr_->isDataDirect() &&
localSignalGpuMap_ && localSignalGpuMap_->valid();
// dataDirectEnabled_ = localImpl.ibSignalGpuMr_ && localImpl.ibSignalGpuMr_->isDataDirect() &&
// localSignalGpuMap_ && localSignalGpuMap_->valid();
dataDirectEnabled_ = true;
if (dataDirectEnabled_) {
INFO(CONN, "IBConnection: Data Direct enabled");
}

View File

@@ -96,6 +96,7 @@ namespace {
auto hasIBDevices = []() { return mscclpp::getIBDeviceCount() > 0; };
auto useIB = [](int rank1, int rank2, int nranksPerNode) {
return true;
bool inSameNode = rank1 / nranksPerNode == rank2 / nranksPerNode;
return hasIBDevices() && !inSameNode;
};
@@ -109,7 +110,7 @@ namespace mscclpp {
struct ExecutionContext {
std::shared_ptr<ProxyService> proxyService;
std::unordered_map<int, Connection> connections;
std::vector<Connection> connections; // one connection (unique QP) per channel
std::vector<std::shared_ptr<NvlsConnection>> nvlsConnections;
MemoryId localMemoryIdBegin = MemoryId(0);
@@ -266,15 +267,31 @@ struct Executor::Impl {
}
};
std::vector<int> connectedPeers = plan.impl_->getConnectedPeers();
std::vector<std::shared_future<mscclpp::Connection>> connectionFutures;
for (int peer : connectedPeers) {
Transport transport =
!useIB(rank, peer, this->nranksPerNode) ? Transport::CudaIpc : IBs[rank % this->nranksPerNode];
connectionFutures.push_back(this->comm->connect(transport, peer));
// Create one connection (unique QP) per channel entry. Each channel gets its own
// QP — no shared connections. This is required for HostNoAtomic IB mode where each
// connection can only forward signals to one semaphore via setSignalForwardingDst.
int tag = 0;
Transport ibTransport = IBs[rank % this->nranksPerNode];
std::vector<std::shared_future<Connection>> connFutures;
for (ChannelType channelType : {ChannelType::MEMORY, ChannelType::PORT}) {
std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(channelType);
for (const auto& info : channelInfos) {
for (int peer : info.connectedPeers) {
Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc;
connFutures.push_back(this->comm->connect(transport, peer, tag++));
}
}
channelInfos = plan.impl_->getUnpairedChannelInfos(nranks, channelType);
for (const auto& info : channelInfos) {
for (int peer : info.connectedPeers) {
Transport transport = useIB(rank, peer, this->nranksPerNode) ? ibTransport : Transport::CudaIpc;
connFutures.push_back(this->comm->connect(transport, peer, tag++));
}
}
}
for (size_t i = 0; i < connectionFutures.size(); i++) {
context.connections[connectedPeers[i]] = connectionFutures[i].get();
for (auto& future : connFutures) {
context.connections.push_back(future.get());
}
std::vector<NvlsInfo> nvlsInfos = plan.impl_->nvlsInfos.at(rank);
@@ -328,10 +345,11 @@ struct Executor::Impl {
std::vector<std::shared_future<Semaphore>> futureProxySemaphores;
std::vector<std::shared_ptr<MemoryDevice2DeviceSemaphore>> memorySemaphores;
std::vector<mscclpp::SemaphoreId> proxySemaphores;
int connIdx = 0;
auto processChannelInfos = [&](std::vector<ChannelInfo>& channelInfos) {
for (ChannelInfo& info : channelInfos) {
for (int peer : info.connectedPeers) {
auto connection = context.connections.at(peer);
for (size_t i = 0; i < info.connectedPeers.size(); i++) {
auto& connection = context.connections[connIdx++];
if (info.channelType == ChannelType::MEMORY) {
futureMemorySemaphores.push_back(this->comm->buildSemaphore(
connection, this->comm->remoteRankOf(connection), this->comm->tagOf(connection)));