diff --git a/CMakeLists.txt b/CMakeLists.txt index ef8b785a..49154e0b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,6 +54,7 @@ option(MSCCLPP_BUILD_EXT_COLLECTIVES "Build collective algorithms" ON) option(MSCCLPP_USE_CUDA "Use NVIDIA/CUDA." OFF) option(MSCCLPP_USE_ROCM "Use AMD/ROCm." OFF) option(MSCCLPP_USE_IB "Use InfiniBand." ON) +option(MSCCLPP_USE_MRC "Enable MRC support" OFF) option(MSCCLPP_BYPASS_GPU_CHECK "Bypass GPU check." OFF) option(MSCCLPP_NPKIT_FLAGS "Set NPKIT flags" OFF) option(MSCCLPP_ENABLE_COVERAGE "Enable code coverage" OFF) diff --git a/docs/quickstart.md b/docs/quickstart.md index 83a08d6a..716fcf61 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -126,6 +126,45 @@ $ python -m pip install ".[cuda12,benchmark]" $ python -m pip install ".[cuda12,benchmark,test]" ``` +(mrc-support)= +## MRC Support + +MSCCL++ supports execution over **Multi-path Reliable Connection (MRC)**, which enables the use of multiple network paths to improve bandwidth utilization and resilience. + +To enable MRC support, you must configure both the **build-time** and **runtime** environments as described below. + +--- + +### 1. Install MRC Verbs Shim + +MSCCL++ relies on a custom verbs shim library that intercepts standard `libibverbs` calls and redirects them to an MRC-enabled implementation. + +- Install the [MRC verbs shim library](https://github.com/microsoft/mrc-verbs-shim-lib) on all nodes in the cluster. +- Ensure that the underlying system has MRC support enabled. + +--- + +### 2. Build MSCCL++ with MRC Enabled + +Enable MRC support during the build by adding the following CMake option: + +```bash +-DMSCCLPP_USE_MRC=ON +``` + +This configures MSCCL++ to use the MRC-enabled verbs layer at runtime. + +### 3. Configure Runtime Environment + +At runtime, you must configure environment variables to override the default RDMA libraries and link against the MRC-enabled stack: + +```bash +-x MSCCLPP_IBV_SO=:$MRC-SHIM-HOME/libibverbs.so +-x LD_LIBRARY_PATH=$MRC-SHIM-HOME/mrc-header-lib:$LD_LIBRARY_PATH +-x VMRC_LIBMRC_SO=/opt/mellanox/doca/lib/aarch64-linux-gnu/libnv_mrc.so" +-x VMRC_LIBIBVERBS_SO=/lib/aarch64-linux-gnu/libibverbs.so.1 +``` + (vscode-dev-container)= ## VSCode Dev Container diff --git a/python/mscclpp/__main__.py b/python/mscclpp/__main__.py index 6a6f5f28..450ec748 100644 --- a/python/mscclpp/__main__.py +++ b/python/mscclpp/__main__.py @@ -13,7 +13,7 @@ from mscclpp.language.utils import AlgoSpec default_algo_configs = [ { "filename": "allreduce_2nodes_1K_64K.json", - "function": def_algo.allreduce_2nodes, + "function": def_algo.allreduce_multi_nodes, "spec": AlgoSpec( name="allreduce_2nodes_1K_64K", collective=AllReduce(16, 1, True), @@ -34,7 +34,7 @@ default_algo_configs = [ }, { "filename": "allreduce_2nodes_128K_2M.json", - "function": def_algo.allreduce_2nodes, + "function": def_algo.allreduce_multi_nodes, "spec": AlgoSpec( name="allreduce_2nodes_128K_2M", collective=AllReduce(16, 1, True), @@ -53,6 +53,48 @@ default_algo_configs = [ ), "additional_kwargs": {"thread_block_group_size": 4}, }, + { + "filename": "allreduce_4nodes_1K_64K.json", + "function": def_algo.allreduce_multi_nodes, + "spec": AlgoSpec( + name="allreduce_4nodes_1K_64K", + collective=AllReduce(32, 1, True), + nranks_per_node=8, + world_size=32, + in_place=True, + instances=1, + protocol="LL", + auto_sync=False, + num_threads_per_block=1024, + reuse_resources=True, + use_double_scratch_buffer=True, + min_message_size=1 << 10, + max_message_size=64 << 10, + tags={"default": 1}, + ), + "additional_kwargs": {"thread_block_group_size": 1}, + }, + { + "filename": "allreduce_4nodes_128K_2M.json", + "function": def_algo.allreduce_multi_nodes, + "spec": AlgoSpec( + name="allreduce_4nodes_128K_2M", + collective=AllReduce(32, 1, True), + nranks_per_node=8, + world_size=32, + in_place=True, + instances=1, + protocol="LL", + auto_sync=False, + num_threads_per_block=1024, + reuse_resources=True, + use_double_scratch_buffer=True, + min_message_size=128 << 10, + max_message_size=2 << 20, + tags={"default": 1}, + ), + "additional_kwargs": {"thread_block_group_size": 4}, + }, ] diff --git a/python/mscclpp/default_algos/__init__.py b/python/mscclpp/default_algos/__init__.py index a5cfa882..1767aab6 100644 --- a/python/mscclpp/default_algos/__init__.py +++ b/python/mscclpp/default_algos/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from mscclpp.default_algos.allreduce_2nodes import allreduce_2nodes +from mscclpp.default_algos.allreduce_multi_nodes import allreduce_multi_nodes -__all__ = ["allreduce_2nodes"] +__all__ = ["allreduce_multi_nodes"] diff --git a/python/mscclpp/default_algos/allreduce_2nodes.py b/python/mscclpp/default_algos/allreduce_multi_nodes.py similarity index 61% rename from python/mscclpp/default_algos/allreduce_2nodes.py rename to python/mscclpp/default_algos/allreduce_multi_nodes.py index 5a355887..5697a0e3 100644 --- a/python/mscclpp/default_algos/allreduce_2nodes.py +++ b/python/mscclpp/default_algos/allreduce_multi_nodes.py @@ -2,9 +2,11 @@ # Licensed under the MIT License. """ -Multi-node AllReduce implementation using packet-based communication. -This implements a hierarchical AllReduce: intra-node allreduce followed by -inter-node exchange and final intra-node allreduce. +Generalized multi-node AllReduce implementation using packet-based communication. +This implements a hierarchical AllReduce for N nodes: +1. Intra-node reduce-scatter (each GPU reduces its assigned chunk across the node) +2. Inter-node allreduce (exchange fully intra-reduced chunks across all nodes) +3. Intra-node broadcast (distribute the fully reduced chunks back to all GPUs in the node) """ from mscclpp.language.utils import AlgoSpec @@ -15,7 +17,7 @@ from mscclpp.language.program import * from mscclpp.language.collectives import * -def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> CollectiveProgram: +def allreduce_multi_nodes(spec: AlgoSpec, thread_block_group_size: int) -> CollectiveProgram: """ Implements a multi-node AllReduce using a hierarchical approach: 1. Intra-node allreduce @@ -23,10 +25,10 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective 3. Intra-node allreduce """ # Configuration constants - num_nodes = 2 + num_nodes = spec.world_size // spec.nranks_per_node gpus_per_node = spec.nranks_per_node total_gpus = num_nodes * gpus_per_node - packets_per_gpu = 2 + packets_per_gpu = num_nodes with CollectiveProgram.from_spec(spec) as prog: # Initialize communication channels and buffers @@ -54,11 +56,21 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective ) ) - scratch_buffer_size = packets_per_gpu * (total_gpus + 1) + # Scratch buffer layout (3 contiguous regions): + # Region 1 [0, total_gpus): + # Intra-node reduce-scatter. Each GPU receives chunks from gpus_per_node peers, + # packets_per_gpu each → gpus_per_node * packets_per_gpu = total_gpus slots. + # Region 2 [total_gpus, total_gpus + num_nodes * packets_per_gpu): + # Inter-node exchange. Each GPU receives reduced chunks from num_nodes nodes, + # packets_per_gpu each → num_nodes * packets_per_gpu slots. + # Region 3 [total_gpus + num_nodes * packets_per_gpu, end): + # Intra-node broadcast. Each GPU receives final reduced data from gpus_per_node peers, + # packets_per_gpu each → gpus_per_node * packets_per_gpu = total_gpus slots. + # Total = 2 * total_gpus + num_nodes * packets_per_gpu + scratch_buffer_size = 2 * total_gpus + packets_per_gpu * num_nodes for node_id in range(num_nodes): for local_gpu_id in range(gpus_per_node): current_rank_id = local_gpu_id + gpus_per_node * node_id - next_node_rank_id = (local_gpu_id + gpus_per_node * (node_id + 1)) % total_gpus scratch_buffers.append(Buffer(current_rank_id, scratch_buffer_size)) for peer_gpu_id in range(gpus_per_node): if peer_gpu_id != local_gpu_id: @@ -66,7 +78,12 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective intra_node_memory_channels[(peer_rank_id, current_rank_id)] = MemoryChannel( peer_rank_id, current_rank_id ) - inter_node_port_channels[current_rank_id] = PortChannel(next_node_rank_id, current_rank_id) + for peer_node_id in range(num_nodes): + if peer_node_id != node_id: + peer_node_rank_id = (local_gpu_id + gpus_per_node * peer_node_id) % total_gpus + inter_node_port_channels[(current_rank_id, peer_node_rank_id)] = PortChannel( + peer_node_rank_id, current_rank_id + ) # AllReduce for node_id in range(num_nodes): @@ -74,7 +91,6 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective current_rank_id = local_gpu_id + gpus_per_node * node_id current_rank = Rank(current_rank_id) input_buffer = current_rank.get_input_buffer() - next_node_rank_id = (local_gpu_id + gpus_per_node * (node_id + 1)) % total_gpus # Intra Node Exchange Data for peer_gpu_id in range(gpus_per_node): @@ -118,27 +134,32 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective ) inter_node_offset = total_gpus - inter_node_port_channels[current_rank_id].put_packets( - scratch_buffers[next_node_rank_id][ - inter_node_offset - + local_gpu_id * packets_per_gpu : inter_node_offset - + local_gpu_id * packets_per_gpu - + packets_per_gpu - ], - scratch_buffers[current_rank_id][ - local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu - ], - tb=0, - ) + for peer_node_id in range(num_nodes): + if peer_node_id != node_id: + peer_node_rank_id = (local_gpu_id + gpus_per_node * peer_node_id) % total_gpus + inter_node_port_channels[(current_rank_id, peer_node_rank_id)].put_packets( + scratch_buffers[peer_node_rank_id][ + inter_node_offset + + node_id * packets_per_gpu : inter_node_offset + + node_id * packets_per_gpu + + packets_per_gpu + ], + scratch_buffers[current_rank_id][ + local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu + ], + tb=0, + ) # Reduce Received Data from Remote Node inter_node_data = [ scratch_buffers[current_rank_id][ inter_node_offset - + local_gpu_id * packets_per_gpu : inter_node_offset - + local_gpu_id * packets_per_gpu + + peer_node_id * packets_per_gpu : inter_node_offset + + peer_node_id * packets_per_gpu + packets_per_gpu ] + for peer_node_id in range(num_nodes) + if peer_node_id != node_id ] current_rank.reduce( input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu], @@ -148,12 +169,18 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective ) current_rank.copy_packets( - scratch_buffers[current_rank_id][scratch_buffer_size - packets_per_gpu : scratch_buffer_size], + scratch_buffers[current_rank_id][ + inter_node_offset + + node_id * packets_per_gpu : inter_node_offset + + node_id * packets_per_gpu + + packets_per_gpu + ], input_buffer[local_gpu_id * packets_per_gpu : local_gpu_id * packets_per_gpu + packets_per_gpu], tb_group=global_intra_node_tbg, ) # Broadcast Reduced Data + broadcast_offset = total_gpus + packets_per_gpu * num_nodes for peer_gpu_id in range(gpus_per_node): peer_rank_id = peer_gpu_id + gpus_per_node * node_id @@ -161,13 +188,16 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective tbg_id = peer_gpu_id if peer_gpu_id < local_gpu_id else peer_gpu_id - 1 intra_node_memory_channels[(peer_rank_id, current_rank_id)].read_put_packets( scratch_buffers[peer_rank_id][ - inter_node_offset - + local_gpu_id * packets_per_gpu : inter_node_offset + broadcast_offset + + local_gpu_id * packets_per_gpu : broadcast_offset + local_gpu_id * packets_per_gpu + packets_per_gpu ], scratch_buffers[current_rank_id][ - scratch_buffer_size - packets_per_gpu : scratch_buffer_size + inter_node_offset + + node_id * packets_per_gpu : inter_node_offset + + node_id * packets_per_gpu + + packets_per_gpu ], tb_group=thread_block_groups[tbg_id], ) @@ -181,8 +211,8 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective peer_gpu_id * packets_per_gpu : peer_gpu_id * packets_per_gpu + packets_per_gpu ], scratch_buffers[current_rank_id][ - inter_node_offset - + peer_gpu_id * packets_per_gpu : inter_node_offset + broadcast_offset + + peer_gpu_id * packets_per_gpu : broadcast_offset + peer_gpu_id * packets_per_gpu + packets_per_gpu ], @@ -190,3 +220,37 @@ def allreduce_2nodes(spec: AlgoSpec, thread_block_group_size: int) -> Collective ) return prog + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--name", type=str, help="name of the program") + parser.add_argument("--num_gpus", type=int, help="total number of gpus") + parser.add_argument("--gpus_per_node", type=int, help="number of gpus per node") + parser.add_argument("--tbg", type=int, default=1, help="thread block group size") + parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block") + parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size") + parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size") + + args = parser.parse_args() + + spec = AlgoSpec( + name=args.name, + collective=AllReduce(args.num_gpus, 1, True), + nranks_per_node=args.gpus_per_node, + world_size=args.num_gpus, + in_place=True, + instances=1, + protocol="LL", + auto_sync=False, + num_threads_per_block=args.num_threads_per_block, + reuse_resources=True, + use_double_scratch_buffer=True, + min_message_size=args.min_message_size, + max_message_size=args.max_message_size, + ) + + prog = allreduce_multi_nodes(spec, args.tbg) + print(prog.to_json()) diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 9ca5fed3..5b89eedc 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -59,6 +59,10 @@ if(MSCCLPP_NPKIT_FLAGS) target_compile_definitions(mscclpp_obj PRIVATE ${MSCCLPP_NPKIT_FLAGS}) endif() +if(MSCCLPP_USE_MRC) + target_compile_definitions(mscclpp_obj PRIVATE MSCCLPP_USE_MRC) +endif() + # libmscclpp add_library(mscclpp SHARED) target_link_libraries(mscclpp PUBLIC mscclpp_obj) diff --git a/src/core/ibverbs_wrapper.cc b/src/core/ibverbs_wrapper.cc index 51f3f29c..60ee0694 100644 --- a/src/core/ibverbs_wrapper.cc +++ b/src/core/ibverbs_wrapper.cc @@ -10,11 +10,28 @@ #include "logger.hpp" +// Adding MSCCLPP_USE_MRC micro for MRC enablement. +// Non-MRC environments will not be affected by this macro as long as VMRC_LIBIBVERBS_SO +// environment variable is not set. +#if (MSCCLPP_USE_MRC) +#include +#include +#endif // (MSCCLPP_USE_MRC) + namespace mscclpp { static std::unique_ptr globalIBVerbsHandle(nullptr, &::dlclose); +#if (MSCCLPP_USE_MRC) +static std::unique_ptr globalOrigIBVerbsHandle(nullptr, &::dlclose); +#endif // (MSCCLPP_USE_MRC) void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) { +#if (MSCCLPP_USE_MRC) + static std::set mrcSymbols = { + "ibv_get_device_list", "ibv_get_device_name", "ibv_open_device", "ibv_close_device", "ibv_query_qp", + "ibv_create_cq", "ibv_destroy_cq", "ibv_create_qp", "ibv_modify_qp", "ibv_destroy_qp", + }; +#endif // (MSCCLPP_USE_MRC) if (!globalIBVerbsHandle) { if (mscclpp::env()->ibvSo != "") { void* handle = ::dlopen(mscclpp::env()->ibvSo.c_str(), RTLD_NOW); @@ -38,7 +55,26 @@ void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) { THROW(NET, SysError, errno, "Failed to open libibverbs: ", std::string(::dlerror())); } } +#if (MSCCLPP_USE_MRC) + // In MRC mode, `VMRC_LIBIBVERBS_SO` should be set. + char* vmrcLibibverbsSo = ::getenv("VMRC_LIBIBVERBS_SO"); + void* ptr; + if (vmrcLibibverbsSo != nullptr && mrcSymbols.find(symbol) == mrcSymbols.end()) { + // If we are in MRC mode and the symbol is not in the table, get it from the original libibverbs. + if (!globalOrigIBVerbsHandle) { + void* handle = ::dlopen(vmrcLibibverbsSo, RTLD_NOW); + if (!handle) { + THROW(NET, SysError, errno, "Failed to open ", std::string(vmrcLibibverbsSo)); + } + globalOrigIBVerbsHandle.reset(handle); + } + ptr = ::dlsym(globalOrigIBVerbsHandle.get(), symbol.c_str()); + } else { + ptr = ::dlsym(globalIBVerbsHandle.get(), symbol.c_str()); + } +#else // !(MSCCLPP_USE_MRC) void* ptr = ::dlsym(globalIBVerbsHandle.get(), symbol.c_str()); +#endif // !(MSCCLPP_USE_MRC) if (!ptr && !allowReturnNull) { THROW(NET, SysError, errno, "Failed to load libibverbs symbol: ", symbol); } diff --git a/src/ext/collectives/algorithm_collection_builder.cc b/src/ext/collectives/algorithm_collection_builder.cc index 2a7e6e91..5d196d12 100644 --- a/src/ext/collectives/algorithm_collection_builder.cc +++ b/src/ext/collectives/algorithm_collection_builder.cc @@ -113,7 +113,9 @@ AlgorithmCollection AlgorithmCollectionBuilder::buildDefaultDslAlgorithms(int ra }; static const std::vector defaultAlgoConfigs = { {"allreduce_2nodes_1K_64K.json", "allreduce", 8, 16, {{"default", 1}}}, - {"allreduce_2nodes_64K_2M.json", "allreduce", 8, 16, {{"default", 1}}}}; + {"allreduce_2nodes_128K_2M.json", "allreduce", 8, 16, {{"default", 1}}}, + {"allreduce_4nodes_1K_64K.json", "allreduce", 8, 32, {{"default", 1}}}, + {"allreduce_4nodes_128K_2M.json", "allreduce", 4, 64, {{"default", 1}}}}; AlgorithmCollection collection; static auto generateFileId = [](const std::string& input) {