Refactor algo selection logic and introduce symmetric_memory env (#741)

This PR refactors the algorithm selection logic in MSCCL++ and
introduces support for symmetric memory configuration through
environment variables.


1. Algorithm Selection Refactoring
Use separate class for algo selection. Could introduce more complex
logic for algo selection based on message size, arch, if cuda graph is
enabled and memory allocation method

2. Symmetric Memory Support
Introduced symmetricMemory parameter in algorithm context key
generation. Remove disableChannelCache env as is ambiguous

3. Add new args for build_default_algorithms 
Add flag_buffer, and flag_buffer_size args to build default algorithm.
Then we could use unified flag buffer for different algorithms, avoid
application hanging when switch algo for different message size.

---------

Co-authored-by: chhwang <8018170+chhwang@users.noreply.github.com>
Co-authored-by: Qinghua Zhou <qinghuazhou@microsoft.com>
Co-authored-by: Caio Rocha <caiorocha@microsoft.com>
This commit is contained in:
Binyang Li
2026-02-12 19:06:18 -08:00
committed by GitHub
parent dff3bc7bbb
commit bd68319e3e
43 changed files with 657 additions and 389 deletions

View File

@@ -68,16 +68,17 @@ void register_algorithm(nb::module_& m) {
"execute",
[](Algorithm& self, std::shared_ptr<Communicator> comm, uintptr_t input, uintptr_t output,
size_t inputSize, size_t outputSize, DataType dtype, ReduceOp op, uintptr_t stream,
std::shared_ptr<Executor> executor, int nBlocks, int nThreadsPerBlock,
std::shared_ptr<Executor> executor, int nBlocks, int nThreadsPerBlock, bool symmetricMemory,
std::unordered_map<std::string, uintptr_t> extras) {
return self.execute(comm, reinterpret_cast<const void*>(input), reinterpret_cast<void*>(output),
inputSize, outputSize, dtype, op, reinterpret_cast<cudaStream_t>(stream), executor,
nBlocks, nThreadsPerBlock, extras);
nBlocks, nThreadsPerBlock, symmetricMemory, extras);
},
nb::arg("comm"), nb::arg("input"), nb::arg("output"), nb::arg("input_size"), nb::arg("output_size"),
nb::arg("dtype"), nb::arg("op") = ReduceOp::NOP, nb::arg("stream") = 0, nb::arg("executor") = nullptr,
nb::arg("n_blocks") = 0, nb::arg("n_threads_per_block") = 0,
nb::arg("extras") = std::unordered_map<std::string, uintptr_t>());
nb::arg("n_blocks") = 0, nb::arg("n_threads_per_block") = 0, nb::arg("symmetric_memory") = false,
nb::arg("extras") = std::unordered_map<std::string, uintptr_t>())
.def("reset", &Algorithm::reset);
nb::class_<Algorithm::Constraint>(algorithmClass, "Constraint")
.def(nb::init<>())
@@ -108,8 +109,17 @@ void register_algorithm(nb::module_& m) {
.def_prop_ro("output_buffer",
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.outputBuffer); })
.def_ro("message_size", &CollectiveRequest::messageSize)
.def_prop_ro("stream", [](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.stream); })
.def_prop_ro("collective", [](const CollectiveRequest& self) { return self.collective; })
.def_ro("dtype", &CollectiveRequest::dtype)
.def_prop_ro("hints", [](const CollectiveRequest& self) { return self.hints; })
.def("buffer_mode", &CollectiveRequest::bufferMode);
m.def(
"cpp_get_default_flag_buffer",
[]() {
auto [buffer, size] = getDefaultFlagBuffer();
return std::make_pair(reinterpret_cast<uintptr_t>(buffer.get()), size);
},
"Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size).");
}

View File

@@ -44,7 +44,9 @@ void register_core(nb::module_& m) {
.value("uint32", DataType::UINT32)
.value("float16", DataType::FLOAT16)
.value("float32", DataType::FLOAT32)
.value("bfloat16", DataType::BFLOAT16);
.value("bfloat16", DataType::BFLOAT16)
.value("float8_e4m3", DataType::FP8_E4M3)
.value("float8_e5m2", DataType::FP8_E5M2);
nb::class_<Bootstrap>(m, "CppBootstrap")
.def("get_rank", &Bootstrap::getRank)

View File

@@ -29,6 +29,6 @@ void register_algorithm_collection_builder(nb::module_& m) {
nb::arg("selector"))
.def("build", &AlgorithmCollectionBuilder::build)
.def("build_default_algorithms", &AlgorithmCollectionBuilder::buildDefaultAlgorithms, nb::arg("scratch_buffer"),
nb::arg("scratch_buffer_size"), nb::arg("rank"))
nb::arg("scratch_buffer_size"), nb::arg("flag_buffer"), nb::arg("flag_buffer_size"), nb::arg("rank"))
.def_static("reset", &AlgorithmCollectionBuilder::reset);
}

View File

@@ -4,6 +4,7 @@
from __future__ import annotations
from typing import Optional, Tuple, Dict
from functools import cached_property
import cupy as cp
from mscclpp._mscclpp import (
@@ -18,6 +19,7 @@ from mscclpp._mscclpp import (
CppReduceOp,
CppAlgorithmBuilder,
CppAlgorithmCollection,
cpp_get_default_flag_buffer,
)
__all__ = ["Algorithm", "AlgorithmBuilder", "AlgorithmCollection"]
@@ -160,6 +162,7 @@ class Algorithm:
executor: Optional[CppExecutor] = None,
nblocks=0,
nthreads_per_block=0,
symmetric_memory: bool = False,
extras: Optional[Dict[str, int]] = None,
) -> int:
"""Execute the collective algorithm.
@@ -176,6 +179,7 @@ class Algorithm:
executor: The executor for DSL algorithms (required for DSL, optional for native).
nblocks: Number of CUDA blocks (0 for auto-selection).
nthreads_per_block: Number of threads per block (0 for auto-selection).
symmetric_memory: Whether to use symmetric memory optimization (default: False).
extras: Additional algorithm-specific parameters.
Returns:
@@ -193,9 +197,14 @@ class Algorithm:
executor,
nblocks,
nthreads_per_block,
symmetric_memory,
extras if extras is not None else {},
)
def reset(self):
"""Reset the internal state of the algorithm, if applicable."""
self._algorithm.reset()
class AlgorithmBuilder:
def __init__(self, algorithm_builder: CppAlgorithmBuilder):
@@ -230,3 +239,17 @@ class AlgorithmCollection:
"""Register an algorithm for a collective operation."""
self._native_collection.register_algorithm(collective, algo_name, algorithm._algorithm)
self._algorithms.append(algorithm)
def get_default_flag_buffer() -> cp.ndarray:
"""Get the default flag buffer for algorithm selection.
This buffer is used internally by default algorithms to store selection flags.
It is allocated as a shared GPU buffer and can be accessed from Python.
Returns:
A CuPy array representing the flag buffer on the GPU.
"""
buffer_ptr, buffer_size = cpp_get_default_flag_buffer()
memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer_ptr, buffer_size, None), 0)
return cp.ndarray((buffer_size // 4,), dtype=cp.uint32, memptr=memptr)

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from typing import Union
from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection
from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection, get_default_flag_buffer
import atexit
from mscclpp._mscclpp import CppAlgorithmCollectionBuilder
@@ -29,6 +29,7 @@ class AlgorithmCollectionBuilder:
if not hasattr(self, "_initialized"):
self._builder = CppAlgorithmCollectionBuilder.get_instance()
self._initialized = True
self._flag_buffer = None
def add_algorithm_builder(self, algorithm_builder: Union[AlgorithmBuilder, Algorithm]):
if isinstance(algorithm_builder, AlgorithmBuilder):
@@ -50,8 +51,17 @@ class AlgorithmCollectionBuilder:
collection = self._builder.build()
return AlgorithmCollection(collection)
def build_default_algorithms(self, scratch_buffer: int, scratch_buffer_size: int, rank: int) -> AlgorithmCollection:
native_collection = self._builder.build_default_algorithms(int(scratch_buffer), scratch_buffer_size, rank)
def build_default_algorithms(
self,
scratch_buffer: int,
scratch_buffer_size: int,
rank: int,
) -> AlgorithmCollection:
if self._flag_buffer is None:
self._flag_buffer = get_default_flag_buffer()
native_collection = self._builder.build_default_algorithms(
int(scratch_buffer), scratch_buffer_size, self._flag_buffer.data.ptr, self._flag_buffer.nbytes, rank
)
return AlgorithmCollection(native_collection)

View File

@@ -192,5 +192,11 @@ def torch_dtype_to_mscclpp_dtype(dtype: "torch.dtype") -> DataType:
return DataType.int32
elif dtype == torch.bfloat16:
return DataType.bfloat16
# Hardware supports either OCP format or FNUZ format for float8.
# Mapping both to the same MSCClPP data type.
elif dtype == torch.float8_e5m2 or dtype == torch.float8_e5m2fnuz:
return DataType.float8_e5m2
elif dtype == torch.float8_e4m3fn or dtype == torch.float8_e4m3fnuz:
return DataType.float8_e4m3
else:
raise ValueError(f"Unknown data type: {dtype}")