mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-04-19 22:39:11 +00:00
Run pass the multinode test
This commit is contained in:
@@ -137,6 +137,33 @@ def main():
|
||||
unique_id = pickle.loads(uid_bytes)
|
||||
|
||||
bootstrap.initialize(unique_id)
|
||||
|
||||
# ── Multi-node diagnostics ─────────────────────────────────────────
|
||||
import subprocess, platform
|
||||
hostname = platform.node()
|
||||
n_ranks_per_node = bootstrap.get_n_ranks_per_node()
|
||||
is_multi_node = (world_size > n_ranks_per_node)
|
||||
|
||||
# Check IB device availability
|
||||
try:
|
||||
ib_out = subprocess.check_output(["ibv_devinfo", "-l"], stderr=subprocess.DEVNULL, timeout=5).decode().strip()
|
||||
ib_devices = [l.strip() for l in ib_out.splitlines() if l.strip() and "device" not in l.lower()]
|
||||
except Exception:
|
||||
ib_devices = []
|
||||
|
||||
if rank == 0:
|
||||
print(f" Hostname: {hostname}")
|
||||
print(f" nRanksPerNode: {n_ranks_per_node}, isMultiNode: {is_multi_node}")
|
||||
print(f" IB devices: {ib_devices if ib_devices else 'NONE FOUND'}")
|
||||
print(f" MSCCLPP_SOCKET_IFNAME: {os.environ.get('MSCCLPP_SOCKET_IFNAME', '<not set>')}")
|
||||
if is_multi_node and not ib_devices:
|
||||
print(f" WARNING: Multi-node detected but no IB devices! Cross-node will fail.")
|
||||
# Also print from rank n_ranks_per_node (first rank on node 1) for comparison
|
||||
if is_multi_node and rank == n_ranks_per_node:
|
||||
print(f" [Node 1] Hostname: {hostname}, rank={rank}")
|
||||
print(f" [Node 1] IB devices: {ib_devices if ib_devices else 'NONE FOUND'}")
|
||||
# ── End diagnostics ────────────────────────────────────────────────
|
||||
|
||||
comm = Communicator(bootstrap)
|
||||
|
||||
# Create MscclppAlltoAllV with existing communicator
|
||||
|
||||
@@ -157,12 +157,15 @@ RegisteredMemory::Impl::Impl(const std::vector<char>::const_iterator& begin,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (transports.has(Transport::CudaIpc)) {
|
||||
} else if (transports.has(Transport::CudaIpc) && getHostHash() == this->hostHash) {
|
||||
auto entry = getTransportInfo(Transport::CudaIpc);
|
||||
auto gpuIpcMem = GpuIpcMem::create(entry.gpuIpcMemHandle);
|
||||
// Create a memory map for the remote GPU memory. The memory map will keep the GpuIpcMem instance alive.
|
||||
this->remoteMemMap = gpuIpcMem->map();
|
||||
this->data = this->remoteMemMap.get();
|
||||
} else if (transports.has(Transport::CudaIpc) && getHostHash() != this->hostHash) {
|
||||
WARN(GPU, "Skipping CudaIpc map for cross-node peer (local hostHash=", getHostHash(),
|
||||
", remote hostHash=", this->hostHash, ")");
|
||||
}
|
||||
if (this->data != nullptr) {
|
||||
INFO(GPU, "Opened CUDA IPC handle at pointer ", this->data);
|
||||
|
||||
@@ -33,11 +33,17 @@ std::vector<mscclpp::MemoryChannel> setupMemoryChannels(
|
||||
const std::vector<mscclpp::RegisteredMemory>& remoteMemories, mscclpp::RegisteredMemory localMemory,
|
||||
int nChannelsPerConnection) {
|
||||
std::vector<mscclpp::MemoryChannel> channels;
|
||||
size_t nConnections = connections.size();
|
||||
// Count number of CudaIpc connections for proper dense indexing into memorySemaphores
|
||||
size_t nCudaIpcConns = 0;
|
||||
for (size_t cid = 0; cid < connections.size(); ++cid) {
|
||||
if (connections[cid].transport() == mscclpp::Transport::CudaIpc) nCudaIpcConns++;
|
||||
}
|
||||
for (int idx = 0; idx < nChannelsPerConnection; ++idx) {
|
||||
for (size_t cid = 0; cid < nConnections; ++cid) {
|
||||
size_t semIdx = 0;
|
||||
for (size_t cid = 0; cid < connections.size(); ++cid) {
|
||||
if (connections[cid].transport() == mscclpp::Transport::CudaIpc) {
|
||||
channels.emplace_back(memorySemaphores[idx * nConnections + cid], remoteMemories[cid], localMemory, nullptr);
|
||||
channels.emplace_back(memorySemaphores[idx * nCudaIpcConns + semIdx], remoteMemories[cid], localMemory, nullptr);
|
||||
semIdx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user