Merge branch 'main' into binyli/mnnvl

This commit is contained in:
Binyang Li
2026-05-12 20:17:02 -07:00
committed by GitHub
8 changed files with 223 additions and 35 deletions

View File

@@ -54,6 +54,7 @@ option(MSCCLPP_BUILD_EXT_COLLECTIVES "Build collective algorithms" ON)
option(MSCCLPP_USE_CUDA "Use NVIDIA/CUDA." OFF)
option(MSCCLPP_USE_ROCM "Use AMD/ROCm." OFF)
option(MSCCLPP_USE_IB "Use InfiniBand." ON)
option(MSCCLPP_USE_MRC "Enable MRC support" OFF)
option(MSCCLPP_BYPASS_GPU_CHECK "Bypass GPU check." OFF)
option(MSCCLPP_NPKIT_FLAGS "Set NPKIT flags" OFF)
option(MSCCLPP_ENABLE_COVERAGE "Enable code coverage" OFF)

View File

@@ -126,6 +126,45 @@ $ python -m pip install ".[cuda12,benchmark]"
$ python -m pip install ".[cuda12,benchmark,test]"
```
(mrc-support)=
## MRC Support
MSCCL++ supports execution over **Multi-path Reliable Connection (MRC)**, which enables the use of multiple network paths to improve bandwidth utilization and resilience.
To enable MRC support, you must configure both the **build-time** and **runtime** environments as described below.
---
### 1. Install MRC Verbs Shim
MSCCL++ relies on a custom verbs shim library that intercepts standard `libibverbs` calls and redirects them to an MRC-enabled implementation.
- Install the [MRC verbs shim library](https://github.com/microsoft/mrc-verbs-shim-lib) on all nodes in the cluster.
- Ensure that the underlying system has MRC support enabled.
---
### 2. Build MSCCL++ with MRC Enabled
Enable MRC support during the build by adding the following CMake option:
```bash
-DMSCCLPP_USE_MRC=ON
```
This configures MSCCL++ to use the MRC-enabled verbs layer at runtime.
### 3. Configure Runtime Environment
At runtime, you must configure environment variables to override the default RDMA libraries and link against the MRC-enabled stack:
```bash
-x MSCCLPP_IBV_SO=:$MRC-SHIM-HOME/libibverbs.so
-x LD_LIBRARY_PATH=$MRC-SHIM-HOME/mrc-header-lib:$LD_LIBRARY_PATH
-x VMRC_LIBMRC_SO=/opt/mellanox/doca/lib/aarch64-linux-gnu/libnv_mrc.so"
-x VMRC_LIBIBVERBS_SO=/lib/aarch64-linux-gnu/libibverbs.so.1
```
(vscode-dev-container)=
## VSCode Dev Container

View File

@@ -13,7 +13,7 @@ from mscclpp.language.utils import AlgoSpec
default_algo_configs = [
{
"filename": "allreduce_2nodes_1K_64K.json",
"function": def_algo.allreduce_2nodes,
"function": def_algo.allreduce_multi_nodes,
"spec": AlgoSpec(
name="allreduce_2nodes_1K_64K",
collective=AllReduce(16, 1, True),
@@ -34,7 +34,7 @@ default_algo_configs = [
},
{
"filename": "allreduce_2nodes_128K_2M.json",
"function": def_algo.allreduce_2nodes,
"function": def_algo.allreduce_multi_nodes,
"spec": AlgoSpec(
name="allreduce_2nodes_128K_2M",
collective=AllReduce(16, 1, True),
@@ -53,6 +53,48 @@ default_algo_configs = [
),
"additional_kwargs": {"thread_block_group_size": 4},
},
{
"filename": "allreduce_4nodes_1K_64K.json",
"function": def_algo.allreduce_multi_nodes,
"spec": AlgoSpec(
name="allreduce_4nodes_1K_64K",
collective=AllReduce(32, 1, True),
nranks_per_node=8,
world_size=32,
in_place=True,
instances=1,
protocol="LL",
auto_sync=False,
num_threads_per_block=1024,
reuse_resources=True,
use_double_scratch_buffer=True,
min_message_size=1 << 10,
max_message_size=64 << 10,
tags={"default": 1},
),
"additional_kwargs": {"thread_block_group_size": 1},
},
{
"filename": "allreduce_4nodes_128K_2M.json",
"function": def_algo.allreduce_multi_nodes,
"spec": AlgoSpec(
name="allreduce_4nodes_128K_2M",
collective=AllReduce(32, 1, True),
nranks_per_node=8,
world_size=32,
in_place=True,
instances=1,
protocol="LL",
auto_sync=False,
num_threads_per_block=1024,
reuse_resources=True,
use_double_scratch_buffer=True,
min_message_size=128 << 10,
max_message_size=2 << 20,
tags={"default": 1},
),
"additional_kwargs": {"thread_block_group_size": 4},
},
]

View File

@@ -1,6 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from mscclpp.default_algos.allreduce_2nodes import allreduce_2nodes
from mscclpp.default_algos.allreduce_multi_nodes import allreduce_multi_nodes
__all__ = ["allreduce_2nodes"]
__all__ = ["allreduce_multi_nodes"]

View File

@@ -2,9 +2,11 @@
# Licensed under the MIT License.
"""
Multi-node AllReduce implementation using packet-based communication.
This implements a hierarchical AllReduce: intra-node allreduce followed by
inter-node exchange and final intra-node allreduce.
Generalized multi-node AllReduce implementation using packet-based communication.
This implements a hierarchical AllReduce for N nodes:
1. Intra-node reduce-scatter (each GPU reduces its assigned chunk across the node)
2. Inter-node allreduce (exchange fully intra-reduced chunks across all nodes)
3. Intra-node broadcast (distribute the fully reduced chunks back to all GPUs in the node)
"""
from mscclpp.language.utils import AlgoSpec
@@ -15,7 +17,7 @@ from mscclpp.language.program import *
from mscclpp.language.collectives import *
def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> CollectiveProgram:
def allreduce_multi_nodes(spec: AlgoSpec, thread_block_group_size: int) -> CollectiveProgram:
"""
Implements a multi-node AllReduce using a hierarchical approach:
1. Intra-node allreduce
@@ -23,10 +25,10 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
3. Intra-node allreduce
"""
# Configuration constants
num_nodes = 2
num_nodes = spec.world_size // spec.nranks_per_node
gpus_per_node = spec.nranks_per_node
total_gpus = num_nodes * gpus_per_node
packets_per_gpu = 2
packets_per_gpu = num_nodes
with CollectiveProgram.from_spec(spec) as prog:
# Initialize communication channels and buffers
@@ -54,11 +56,21 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
)
)
scratch_buffer_size = packets_per_gpu * (total_gpus + 1)
# Scratch buffer layout (3 contiguous regions):
# Region 1 [0, total_gpus):
# Intra-node reduce-scatter. Each GPU receives chunks from gpus_per_node peers,
# packets_per_gpu each → gpus_per_node * packets_per_gpu = total_gpus slots.
# Region 2 [total_gpus, total_gpus + num_nodes * packets_per_gpu):
# Inter-node exchange. Each GPU receives reduced chunks from num_nodes nodes,
# packets_per_gpu each → num_nodes * packets_per_gpu slots.
# Region 3 [total_gpus + num_nodes * packets_per_gpu, end):
# Intra-node broadcast. Each GPU receives final reduced data from gpus_per_node peers,
# packets_per_gpu each → gpus_per_node * packets_per_gpu = total_gpus slots.
# Total = 2 * total_gpus + num_nodes * packets_per_gpu
scratch_buffer_size = 2 * total_gpus + packets_per_gpu * num_nodes
for node_id in range(num_nodes):
for local_gpu_id in range(gpus_per_node):
current_rank_id = local_gpu_id + gpus_per_node * node_id
next_node_rank_id = (local_gpu_id + gpus_per_node * (node_id + 1)) % total_gpus
scratch_buffers.append(Buffer(current_rank_id, scratch_buffer_size))
for peer_gpu_id in range(gpus_per_node):
if peer_gpu_id != local_gpu_id:
@@ -66,7 +78,12 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
intra_node_memory_channels[(peer_rank_id, current_rank_id)] = MemoryChannel(
peer_rank_id, current_rank_id
)
inter_node_port_channels[current_rank_id] = PortChannel(next_node_rank_id, current_rank_id)
for peer_node_id in range(num_nodes):
if peer_node_id != node_id:
peer_node_rank_id = (local_gpu_id + gpus_per_node * peer_node_id) % total_gpus
inter_node_port_channels[(current_rank_id, peer_node_rank_id)] = PortChannel(
peer_node_rank_id, current_rank_id
)
# AllReduce
for node_id in range(num_nodes):
@@ -74,7 +91,6 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
current_rank_id = local_gpu_id + gpus_per_node * node_id
current_rank = Rank(current_rank_id)
input_buffer = current_rank.get_input_buffer()
next_node_rank_id = (local_gpu_id + gpus_per_node * (node_id + 1)) % total_gpus
# Intra Node Exchange Data
for peer_gpu_id in range(gpus_per_node):
@@ -118,27 +134,32 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
)
inter_node_offset = total_gpus
inter_node_port_channels[current_rank_id].put_packets(
scratch_buffers[next_node_rank_id][
inter_node_offset
+ local_gpu_id * packets_per_gpu : inter_node_offset
+ local_gpu_id * packets_per_gpu
+ packets_per_gpu
],
scratch_buffers[current_rank_id][
local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu
],
tb=0,
)
for peer_node_id in range(num_nodes):
if peer_node_id != node_id:
peer_node_rank_id = (local_gpu_id + gpus_per_node * peer_node_id) % total_gpus
inter_node_port_channels[(current_rank_id, peer_node_rank_id)].put_packets(
scratch_buffers[peer_node_rank_id][
inter_node_offset
+ node_id * packets_per_gpu : inter_node_offset
+ node_id * packets_per_gpu
+ packets_per_gpu
],
scratch_buffers[current_rank_id][
local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu
],
tb=0,
)
# Reduce Received Data from Remote Node
inter_node_data = [
scratch_buffers[current_rank_id][
inter_node_offset
+ local_gpu_id * packets_per_gpu : inter_node_offset
+ local_gpu_id * packets_per_gpu
+ peer_node_id * packets_per_gpu : inter_node_offset
+ peer_node_id * packets_per_gpu
+ packets_per_gpu
]
for peer_node_id in range(num_nodes)
if peer_node_id != node_id
]
current_rank.reduce(
input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu],
@@ -148,12 +169,18 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
)
current_rank.copy_packets(
scratch_buffers[current_rank_id][scratch_buffer_size - packets_per_gpu : scratch_buffer_size],
scratch_buffers[current_rank_id][
inter_node_offset
+ node_id * packets_per_gpu : inter_node_offset
+ node_id * packets_per_gpu
+ packets_per_gpu
],
input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu],
tb_group=global_intra_node_tbg,
)
# Broadcast Reduced Data
broadcast_offset = total_gpus + packets_per_gpu * num_nodes
for peer_gpu_id in range(gpus_per_node):
peer_rank_id = peer_gpu_id + gpus_per_node * node_id
@@ -161,13 +188,16 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
tbg_id = peer_gpu_id if peer_gpu_id < local_gpu_id else peer_gpu_id - 1
intra_node_memory_channels[(peer_rank_id, current_rank_id)].read_put_packets(
scratch_buffers[peer_rank_id][
inter_node_offset
+ local_gpu_id * packets_per_gpu : inter_node_offset
broadcast_offset
+ local_gpu_id * packets_per_gpu : broadcast_offset
+ local_gpu_id * packets_per_gpu
+ packets_per_gpu
],
scratch_buffers[current_rank_id][
scratch_buffer_size - packets_per_gpu : scratch_buffer_size
inter_node_offset
+ node_id * packets_per_gpu : inter_node_offset
+ node_id * packets_per_gpu
+ packets_per_gpu
],
tb_group=thread_block_groups[tbg_id],
)
@@ -181,8 +211,8 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
peer_gpu_id * packets_per_gpu : peer_gpu_id * packets_per_gpu + packets_per_gpu
],
scratch_buffers[current_rank_id][
inter_node_offset
+ peer_gpu_id * packets_per_gpu : inter_node_offset
broadcast_offset
+ peer_gpu_id * packets_per_gpu : broadcast_offset
+ peer_gpu_id * packets_per_gpu
+ packets_per_gpu
],
@@ -190,3 +220,37 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective
)
return prog
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--num_gpus", type=int, help="total number of gpus")
parser.add_argument("--gpus_per_node", type=int, help="number of gpus per node")
parser.add_argument("--tbg", type=int, default=1, help="thread block group size")
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
args = parser.parse_args()
spec = AlgoSpec(
name=args.name,
collective=AllReduce(args.num_gpus, 1, True),
nranks_per_node=args.gpus_per_node,
world_size=args.num_gpus,
in_place=True,
instances=1,
protocol="LL",
auto_sync=False,
num_threads_per_block=args.num_threads_per_block,
reuse_resources=True,
use_double_scratch_buffer=True,
min_message_size=args.min_message_size,
max_message_size=args.max_message_size,
)
prog = allreduce_multi_nodes(spec, args.tbg)
print(prog.to_json())

View File

@@ -59,6 +59,10 @@ if(MSCCLPP_NPKIT_FLAGS)
target_compile_definitions(mscclpp_obj PRIVATE ${MSCCLPP_NPKIT_FLAGS})
endif()
if(MSCCLPP_USE_MRC)
target_compile_definitions(mscclpp_obj PRIVATE MSCCLPP_USE_MRC)
endif()
# libmscclpp
add_library(mscclpp SHARED)
target_link_libraries(mscclpp PUBLIC mscclpp_obj)

View File

@@ -10,11 +10,28 @@
#include "logger.hpp"
// Adding MSCCLPP_USE_MRC micro for MRC enablement.
// Non-MRC environments will not be affected by this macro as long as VMRC_LIBIBVERBS_SO
// environment variable is not set.
#if (MSCCLPP_USE_MRC)
#include <cstdlib>
#include <set>
#endif // (MSCCLPP_USE_MRC)
namespace mscclpp {
static std::unique_ptr<void, int (*)(void*)> globalIBVerbsHandle(nullptr, &::dlclose);
#if (MSCCLPP_USE_MRC)
static std::unique_ptr<void, int (*)(void*)> globalOrigIBVerbsHandle(nullptr, &::dlclose);
#endif // (MSCCLPP_USE_MRC)
void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) {
#if (MSCCLPP_USE_MRC)
static std::set<std::string> mrcSymbols = {
"ibv_get_device_list", "ibv_get_device_name", "ibv_open_device", "ibv_close_device", "ibv_query_qp",
"ibv_create_cq", "ibv_destroy_cq", "ibv_create_qp", "ibv_modify_qp", "ibv_destroy_qp",
};
#endif // (MSCCLPP_USE_MRC)
if (!globalIBVerbsHandle) {
if (mscclpp::env()->ibvSo != "") {
void* handle = ::dlopen(mscclpp::env()->ibvSo.c_str(), RTLD_NOW);
@@ -38,7 +55,26 @@ void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) {
THROW(NET, SysError, errno, "Failed to open libibverbs: ", std::string(::dlerror()));
}
}
#if (MSCCLPP_USE_MRC)
// In MRC mode, `VMRC_LIBIBVERBS_SO` should be set.
char* vmrcLibibverbsSo = ::getenv("VMRC_LIBIBVERBS_SO");
void* ptr;
if (vmrcLibibverbsSo != nullptr && mrcSymbols.find(symbol) == mrcSymbols.end()) {
// If we are in MRC mode and the symbol is not in the table, get it from the original libibverbs.
if (!globalOrigIBVerbsHandle) {
void* handle = ::dlopen(vmrcLibibverbsSo, RTLD_NOW);
if (!handle) {
THROW(NET, SysError, errno, "Failed to open ", std::string(vmrcLibibverbsSo));
}
globalOrigIBVerbsHandle.reset(handle);
}
ptr = ::dlsym(globalOrigIBVerbsHandle.get(), symbol.c_str());
} else {
ptr = ::dlsym(globalIBVerbsHandle.get(), symbol.c_str());
}
#else // !(MSCCLPP_USE_MRC)
void* ptr = ::dlsym(globalIBVerbsHandle.get(), symbol.c_str());
#endif // !(MSCCLPP_USE_MRC)
if (!ptr && !allowReturnNull) {
THROW(NET, SysError, errno, "Failed to load libibverbs symbol: ", symbol);
}

View File

@@ -113,7 +113,9 @@ AlgorithmCollection AlgorithmCollectionBuilder::buildDefaultDslAlgorithms(int ra
};
static const std::vector<DslAlgoConfig> defaultAlgoConfigs = {
{"allreduce_2nodes_1K_64K.json", "allreduce", 8, 16, {{"default", 1}}},
{"allreduce_2nodes_64K_2M.json", "allreduce", 8, 16, {{"default", 1}}}};
{"allreduce_2nodes_128K_2M.json", "allreduce", 8, 16, {{"default", 1}}},
{"allreduce_4nodes_1K_64K.json", "allreduce", 8, 32, {{"default", 1}}},
{"allreduce_4nodes_128K_2M.json", "allreduce", 4, 64, {{"default", 1}}}};
AlgorithmCollection collection;
static auto generateFileId = [](const std::string& input) {