Torch integration (#692)

Reorganize current native algorithm implementation and DSL algorithm
implementation.
Provide unified API for DSL algo and native algo and provide interface
to tune the algo
Provide interface for pytorch integration with native API and DSL

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: chhwang <8018170+chhwang@users.noreply.github.com>
This commit is contained in:
Binyang Li
2026-01-21 20:32:24 -08:00
committed by GitHub
parent 78ce9fac8d
commit a707273701
156 changed files with 6107 additions and 4076 deletions

View File

@@ -6,10 +6,10 @@ add_subdirectory(test)
add_custom_target(pytest_lib_copy ALL
COMMAND ${CMAKE_COMMAND} -E copy_if_different
${CMAKE_CURRENT_BINARY_DIR}/csrc/_mscclpp.*.so
${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/_mscclpp.*.so
${CMAKE_CURRENT_SOURCE_DIR}/mscclpp
COMMAND ${CMAKE_COMMAND} -E copy_if_different
${CMAKE_CURRENT_BINARY_DIR}/test/_ext.*.so
${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/_ext.*.so
${CMAKE_CURRENT_SOURCE_DIR}/test/_cpp
DEPENDS mscclpp_py mscclpp_py_test
)

View File

@@ -3,7 +3,7 @@
find_package(Python 3.8 COMPONENTS Interpreter Development.Module REQUIRED)
include(FetchContent)
FetchContent_Declare(nanobind GIT_REPOSITORY https://github.com/wjakob/nanobind.git GIT_TAG v1.4.0)
FetchContent_Declare(nanobind GIT_REPOSITORY https://github.com/wjakob/nanobind.git GIT_TAG v1.9.2)
FetchContent_MakeAvailable(nanobind)
FetchContent_Declare(dlpack
@@ -21,6 +21,7 @@ endif()
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cpp)
nanobind_add_module(mscclpp_py ${SOURCES})
set_target_properties(mscclpp_py PROPERTIES OUTPUT_NAME _mscclpp)
target_link_libraries(mscclpp_py PRIVATE dlpack mscclpp_static ${GPU_LIBRARIES})
set_target_properties(mscclpp_py PROPERTIES INSTALL_RPATH "\$ORIGIN/lib")
target_link_libraries(mscclpp_py PRIVATE dlpack mscclpp mscclpp_collectives ${GPU_LIBRARIES})
target_include_directories(mscclpp_py SYSTEM PRIVATE ${GPU_INCLUDE_DIRS})
install(TARGETS mscclpp_py LIBRARY DESTINATION .)

113
python/csrc/algorithm.cpp Normal file
View File

@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <nanobind/nanobind.h>
#include <nanobind/stl/function.h>
#include <nanobind/stl/pair.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/string.h>
#include <nanobind/stl/unordered_map.h>
#include <nanobind/stl/vector.h>
#include <cstring>
#include <mscclpp/algorithm.hpp>
namespace nb = nanobind;
using namespace mscclpp;
void register_algorithm(nb::module_& m) {
nb::enum_<CollectiveBufferMode>(m, "CollectiveBufferMode")
.value("ANY", CollectiveBufferMode::Any)
.value("IN_PLACE", CollectiveBufferMode::InPlace)
.value("OUT_OF_PLACE", CollectiveBufferMode::OutOfPlace);
nb::enum_<AlgorithmType>(m, "AlgorithmType").value("NATIVE", AlgorithmType::Native).value("DSL", AlgorithmType::DSL);
nb::enum_<CommResult>(m, "CommResult")
.value("COMM_SUCCESS", CommResult::CommSuccess)
.value("COMM_UNHANDLED_CUDA_ERROR", CommResult::CommUnhandledCudaError)
.value("COMM_SYSTEM_ERROR", CommResult::CommSystemError)
.value("COMM_INTERNAL_ERROR", CommResult::CommInternalError)
.value("COMM_INVALID_ARGUMENT", CommResult::CommInvalidArgument)
.value("COMM_INVALID_USAGE", CommResult::CommInvalidUsage)
.value("COMM_REMOTE_ERROR", CommResult::CommRemoteError)
.value("COMM_IN_PROGRESS", CommResult::CommInProgress)
.value("COMM_NUM_RESULTS", CommResult::CommNumResults);
nb::enum_<ReduceOp>(m, "ReduceOp")
.value("SUM", ReduceOp::SUM)
.value("MIN", ReduceOp::MIN)
.value("NOP", ReduceOp::NOP);
auto algorithmClass =
nb::class_<Algorithm>(m, "Algorithm")
.def_static(
"from_native_capsule",
[](nb::capsule cap) {
const char* name = cap.name();
if (name == nullptr || std::strcmp(name, ALGORITHM_NATIVE_CAPSULE_NAME) != 0) {
throw nb::type_error("Invalid capsule: expected 'mscclpp::AlgorithmPtr'");
}
void* data = cap.data();
if (data == nullptr) {
throw nb::value_error("Failed to get pointer from capsule");
}
return *static_cast<std::shared_ptr<Algorithm>*>(data);
},
nb::arg("capsule"))
.def_prop_ro("name", &Algorithm::name)
.def_prop_ro("collective", &Algorithm::collective)
.def_prop_ro("message_range", &Algorithm::messageRange)
.def_prop_ro("tags", &Algorithm::tags)
.def_prop_ro("buffer_mode", &Algorithm::bufferMode)
.def_prop_ro("constraint", &Algorithm::constraint)
.def_prop_ro("type", &Algorithm::type)
.def(
"execute",
[](Algorithm& self, std::shared_ptr<Communicator> comm, uintptr_t input, uintptr_t output,
size_t inputSize, size_t outputSize, DataType dtype, ReduceOp op, uintptr_t stream,
std::shared_ptr<Executor> executor, int nBlocks, int nThreadsPerBlock,
std::unordered_map<std::string, uintptr_t> extras) {
return self.execute(comm, reinterpret_cast<const void*>(input), reinterpret_cast<void*>(output),
inputSize, outputSize, dtype, op, reinterpret_cast<cudaStream_t>(stream), executor,
nBlocks, nThreadsPerBlock, extras);
},
nb::arg("comm"), nb::arg("input"), nb::arg("output"), nb::arg("input_size"), nb::arg("output_size"),
nb::arg("dtype"), nb::arg("op") = ReduceOp::NOP, nb::arg("stream") = 0, nb::arg("executor") = nullptr,
nb::arg("n_blocks") = 0, nb::arg("n_threads_per_block") = 0,
nb::arg("extras") = std::unordered_map<std::string, uintptr_t>());
nb::class_<Algorithm::Constraint>(algorithmClass, "Constraint")
.def(nb::init<>())
.def(nb::init<int, int>(), nb::arg("world_size"), nb::arg("n_ranks_per_node"))
.def_rw("world_size", &Algorithm::Constraint::worldSize)
.def_rw("n_ranks_per_node", &Algorithm::Constraint::nRanksPerNode);
nb::class_<AlgorithmBuilder>(m, "AlgorithmBuilder").def("build", &AlgorithmBuilder::build);
nb::class_<DslAlgorithm, Algorithm>(m, "DslAlgorithm")
.def(nb::init<std::string, ExecutionPlan, std::unordered_map<std::string, uint64_t>, Algorithm::Constraint>(),
nb::arg("id"), nb::arg("plan"), nb::arg("tags") = std::unordered_map<std::string, uint64_t>(),
nb::arg("constraint") = Algorithm::Constraint())
.def("build", &DslAlgorithm::build);
nb::class_<AlgorithmCollection>(m, "AlgorithmCollection")
.def("register_algorithm", &AlgorithmCollection::registerAlgorithm, nb::arg("collective"), nb::arg("algo_name"),
nb::arg("algorithm"))
.def("get_algorithms_by_collective", &AlgorithmCollection::getAlgorithmsByCollective, nb::arg("collective"))
.def("to_list", &AlgorithmCollection::getAllAlgorithms);
nb::class_<CollectiveRequest>(m, "CollectiveRequest")
.def_ro("world_size", &CollectiveRequest::worldSize)
.def_ro("n_ranks_per_node", &CollectiveRequest::nRanksPerNode)
.def_ro("rank", &CollectiveRequest::rank)
.def_prop_ro("input_buffer",
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.inputBuffer); })
.def_prop_ro("output_buffer",
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.outputBuffer); })
.def_ro("message_size", &CollectiveRequest::messageSize)
.def_prop_ro("collective", [](const CollectiveRequest& self) { return self.collective; })
.def_ro("dtype", &CollectiveRequest::dtype)
.def_prop_ro("hints", [](const CollectiveRequest& self) { return self.hints; })
.def("buffer_mode", &CollectiveRequest::bufferMode);
}

View File

@@ -3,7 +3,6 @@
#include <nanobind/nanobind.h>
#include <nanobind/operators.h>
#include <nanobind/stl/array.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/string.h>
#include <nanobind/stl/vector.h>
@@ -26,6 +25,10 @@ extern void register_nvls(nb::module_& m);
extern void register_executor(nb::module_& m);
extern void register_npkit(nb::module_& m);
extern void register_gpu_utils(nb::module_& m);
extern void register_algorithm(nb::module_& m);
// ext
extern void register_algorithm_collection_builder(nb::module_& m);
template <typename T>
void def_shared_future(nb::handle& m, const std::string& typestr) {
@@ -36,6 +39,13 @@ void def_shared_future(nb::handle& m, const std::string& typestr) {
void register_core(nb::module_& m) {
m.def("version", &version);
nb::enum_<DataType>(m, "DataType")
.value("int32", DataType::INT32)
.value("uint32", DataType::UINT32)
.value("float16", DataType::FLOAT16)
.value("float32", DataType::FLOAT32)
.value("bfloat16", DataType::BFLOAT16);
nb::class_<Bootstrap>(m, "Bootstrap")
.def("get_rank", &Bootstrap::getRank)
.def("get_n_ranks", &Bootstrap::getNranks)
@@ -61,7 +71,15 @@ void register_core(nb::module_& m) {
.def("recv", static_cast<void (Bootstrap::*)(std::vector<char>&, int, int)>(&Bootstrap::recv), nb::arg("data"),
nb::arg("peer"), nb::arg("tag"));
nb::class_<UniqueId>(m, "UniqueId");
nb::class_<UniqueId>(m, "UniqueId")
.def(nb::init<>())
.def("__setstate__",
[](UniqueId& self, nb::bytes b) {
if (nb::len(b) != UniqueIdBytes) throw std::runtime_error("Invalid UniqueId byte size");
::memcpy(self.data(), b.c_str(), UniqueIdBytes);
})
.def("__getstate__",
[](const UniqueId& self) { return nb::bytes(reinterpret_cast<const char*>(self.data()), UniqueIdBytes); });
nb::class_<TcpBootstrap, Bootstrap>(m, "TcpBootstrap")
.def(nb::init<int, int>(), "Do not use this constructor. Use create instead.")
@@ -284,4 +302,8 @@ NB_MODULE(_mscclpp, m) {
register_executor(m);
register_npkit(m);
register_gpu_utils(m);
register_algorithm(m);
// ext
register_algorithm_collection_builder(m);
}

View File

@@ -15,50 +15,8 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_executor(nb::module_& m) {
nb::enum_<DataType>(m, "DataType")
.value("int32", DataType::INT32)
.value("uint32", DataType::UINT32)
.value("float16", DataType::FLOAT16)
.value("float32", DataType::FLOAT32)
.value("bfloat16", DataType::BFLOAT16);
nb::enum_<PacketType>(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16);
nb::class_<ExecutionRequest>(m, "ExecutionRequest")
.def_ro("world_size", &ExecutionRequest::worldSize)
.def_ro("n_ranks_per_node", &ExecutionRequest::nRanksPerNode)
.def_prop_ro(
"input_buffer",
[](const ExecutionRequest& self) -> uintptr_t { return reinterpret_cast<uintptr_t>(self.inputBuffer); })
.def_prop_ro(
"output_buffer",
[](const ExecutionRequest& self) -> uintptr_t { return reinterpret_cast<uintptr_t>(self.outputBuffer); })
.def_ro("message_size", &ExecutionRequest::messageSize)
.def_prop_ro("collective", [](ExecutionRequest& self) -> const std::string& { return self.collective; })
.def_prop_ro("hints", [](ExecutionRequest& self) { return self.hints; });
nb::class_<ExecutionPlanHandle>(m, "ExecutionPlanHandle")
.def_ro("id", &ExecutionPlanHandle::id)
.def_ro("constraint", &ExecutionPlanHandle::constraint)
.def_ro("plan", &ExecutionPlanHandle::plan)
.def_ro("tags", &ExecutionPlanHandle::tags)
.def_static("create", &ExecutionPlanHandle::create, nb::arg("id"), nb::arg("world_size"),
nb::arg("nranks_per_node"), nb::arg("plan"),
nb::arg("tags") = std::unordered_map<std::string, uint64_t>{});
nb::class_<ExecutionPlanHandle::Constraint>(m, "ExecutionPlanConstraint")
.def_ro("world_size", &ExecutionPlanHandle::Constraint::worldSize)
.def_ro("n_ranks_per_node", &ExecutionPlanHandle::Constraint::nRanksPerNode);
nb::class_<ExecutionPlanRegistry>(m, "ExecutionPlanRegistry")
.def_static("get_instance", &ExecutionPlanRegistry::getInstance)
.def("register_plan", &ExecutionPlanRegistry::registerPlan, nb::arg("planHandle"))
.def("get_plans", &ExecutionPlanRegistry::getPlans, nb::arg("collective"))
.def("get", &ExecutionPlanRegistry::get, nb::arg("id"))
.def("set_selector", &ExecutionPlanRegistry::setSelector, nb::arg("selector"))
.def("set_default_selector", &ExecutionPlanRegistry::setDefaultSelector, nb::arg("selector"))
.def("clear", &ExecutionPlanRegistry::clear);
nb::class_<ExecutionPlan>(m, "ExecutionPlan")
.def(nb::init<const std::string&, int>(), nb::arg("planPath"), nb::arg("rank"))
.def_prop_ro("name", [](const ExecutionPlan& self) -> std::string { return self.name(); })

View File

@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <nanobind/nanobind.h>
#include <nanobind/stl/function.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/unordered_map.h>
#include <nanobind/stl/vector.h>
#include <mscclpp/algorithm.hpp>
#include <mscclpp/ext/collectives/algorithm_collection_builder.hpp>
namespace nb = nanobind;
using namespace mscclpp;
using namespace mscclpp::collective;
void register_algorithm_collection_builder(nb::module_& m) {
nb::class_<AlgorithmCollectionBuilder>(m, "AlgorithmCollectionBuilder")
.def_static("get_instance", &AlgorithmCollectionBuilder::getInstance)
.def("add_algorithm_builder", &AlgorithmCollectionBuilder::addAlgorithmBuilder, nb::arg("builder"))
.def(
"add_dsl_algorithm_builder",
[](AlgorithmCollectionBuilder& self, std::shared_ptr<DslAlgorithm> algorithm) {
self.addAlgorithmBuilder(algorithm);
},
nb::arg("algorithm"))
.def("set_algorithm_selector", &AlgorithmCollectionBuilder::setAlgorithmSelector, nb::arg("selector"))
.def("set_fallback_algorithm_selector", &AlgorithmCollectionBuilder::setFallbackAlgorithmSelector,
nb::arg("selector"))
.def("build", &AlgorithmCollectionBuilder::build)
.def("build_default_algorithms", &AlgorithmCollectionBuilder::buildDefaultAlgorithms, nb::arg("scratch_buffer"),
nb::arg("scratch_buffer_size"), nb::arg("rank"))
.def_static("reset", &AlgorithmCollectionBuilder::reset);
}

View File

@@ -3,43 +3,26 @@
"""MSCCL++ Python API."""
import atexit
from dataclasses import dataclass
from functools import cached_property, wraps
import inspect
import json
from functools import wraps
import os
from pathlib import Path
from typing import Any
import warnings
from blake3 import blake3
from mscclpp.language.program import CollectiveProgram
from mscclpp.language.utils import AlgoSpec
from functools import wraps
from mscclpp._version import __version__, __commit_id__
from ._version import __version__, __commit_id__
if os.environ.get("MSCCLPP_HOME", None) is None:
os.environ["MSCCLPP_HOME"] = os.path.abspath(os.path.dirname(__file__))
# Parse the version
version = {
"version": __version__,
"git_commit": __commit_id__,
}
from ._core import *
from ._mscclpp import (
Env,
ErrorCode,
BaseError,
Error,
SysError,
CudaError,
CuError,
IbError,
Device,
DeviceType,
Communicator,
@@ -60,16 +43,15 @@ from ._mscclpp import (
Transport,
TransportFlags,
DataType,
ErrorCode,
Executor,
ExecutionPlan,
ExecutionPlanConstraint,
PacketType,
RawGpuBuffer,
ReduceOp,
env,
is_nvls_supported,
npkit,
ExecutionPlanHandle as _ExecutionPlanHandle,
ExecutionPlanRegistry as _ExecutionPlanRegistry,
)
__all__ = [
@@ -79,6 +61,7 @@ __all__ = [
"Connection",
"connect_nvls_collective",
"EndpointConfig",
"ErrorCode",
"Fifo",
"Semaphore",
"Host2DeviceSemaphore",
@@ -97,6 +80,7 @@ __all__ = [
"ExecutionPlan",
"PacketType",
"RawGpuBuffer",
"ReduceOp",
"env",
"version",
"is_nvls_supported",
@@ -107,6 +91,11 @@ __all__ = [
"version",
"get_include",
"get_lib",
# Python API
"Algorithm",
"AlgorithmCollection",
"CommGroup",
"GpuBuffer",
]
@@ -135,193 +124,5 @@ def deprecated(new_cls):
return decorator
class ExecutionPlanHandle:
def __init__(self, handle: _ExecutionPlanHandle):
self._handle = handle
@cached_property
def id(self) -> int:
return self._handle.id
@cached_property
def tags(self) -> set:
return frozenset(self._handle.tags)
@cached_property
def plan(self) -> ExecutionPlan:
return self._handle.plan
@cached_property
def constraints(self) -> ExecutionPlanConstraint:
return self._handle.constraints
@dataclass(frozen=True)
class ExecutionRequest:
collective: str
world_size: int
n_ranks_per_node: int
send_buffer: int
recv_buffer: int
message_size: int
hints: dict
class ExecutionPlanRegistry:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(ExecutionPlanRegistry, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized"):
self._registry = _ExecutionPlanRegistry.get_instance()
self._id_map = {}
self._collective_map = {}
self._selector = None
self._initialized = True
def register_plan(self, plan: ExecutionPlanHandle):
self._id_map[plan.id] = plan
if plan.plan.collective not in self._collective_map:
self._collective_map[plan.plan.collective] = []
self._collective_map[plan.plan.collective].append(plan)
return self._instance._registry.register_plan(plan._handle)
def set_selector(self, selector):
self._selector = selector
self._instance._registry.set_selector(selector)
def set_default_selector(self, selector):
self._selector = selector
self._instance._registry.set_default_selector(selector)
def get(self, id: str) -> ExecutionPlanHandle:
return self._id_map.get(id, None)
def select(
self,
collective: str,
world_size: int,
n_ranks_per_node: int,
send_buffer: int,
recv_buffer: int,
message_size: int,
hints: dict = {},
) -> ExecutionPlanHandle:
if self._selector is None or collective not in self._collective_map:
return None
req = ExecutionRequest(
collective=collective,
world_size=world_size,
n_ranks_per_node=n_ranks_per_node,
send_buffer=send_buffer,
recv_buffer=recv_buffer,
message_size=message_size,
hints=hints,
)
return self._selector(self._collective_map[collective], req)
@classmethod
def reset_instance(cls):
if cls._instance is not None:
cls._instance._registry.clear()
cls._instance._id_map = {}
cls._instance._collective_map = {}
cls._instance._selector = None
cls._instance = None
atexit.register(ExecutionPlanRegistry.reset_instance)
_execution_plan_registry = ExecutionPlanRegistry()
def _stable_json_bytes(obj: Any) -> bytes:
return json.dumps(
obj,
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
).encode("utf-8")
def compile(
algo,
algo_spec: AlgoSpec,
rank: int,
**kwargs,
) -> ExecutionPlanHandle:
"""Compile a MSCCL++ program from a high-level algorithm description.
Args:
algo: The high-level algorithm description (e.g., a function or class).
algo_spec (AlgoSpec): Algorithm specification containing collective type,
world size, ranks per node, instances, protocol, and other configuration.
rank (int): The rank of the current process.
**kwargs: Additional keyword arguments passed to the algorithm function.
Returns:
ExecutionPlanHandle: The compiled execution plan handle.
Raises:
ValueError: If the 'algo' argument is not callable.
"""
if not callable(algo):
raise ValueError("The 'algo' argument must be a callable (e.g., a function or class).")
prog: CollectiveProgram = algo(
algo_spec,
**kwargs,
)
source = inspect.getsource(algo)
source_hash = blake3(source.encode("utf-8")).hexdigest()
plan_id = blake3(
_stable_json_bytes(
{
"version": __version__,
"algo_name": algo_spec.name,
"collective": algo_spec.collective.name,
"tags": sorted(algo_spec.tags.items()),
"source_hash": source_hash,
"envs": {
"nranks_per_node": algo_spec.nranks_per_node,
"world_size": algo_spec.world_size,
"instances": algo_spec.instances,
"protocol": algo_spec.protocol,
},
}
)
).hexdigest()
plan_handle = _execution_plan_registry.get(plan_id)
if plan_handle is not None:
return plan_handle
plan_dir = os.environ.get("MSCCLPP_EXECUTION_PLAN_DIR", Path.home() / ".cache/mscclpp")
os.makedirs(plan_dir, exist_ok=True)
filename = f"{plan_id}.json"
plan_path = os.path.join(plan_dir, filename)
tmp_path = plan_path + f".tmp.{os.getpid()}"
if not os.path.exists(plan_path):
try:
# TODO (binyli): Each rank could generate its own execution plan separately. Doesn't need to generate whole plan.
with open(tmp_path, "w") as f:
prog.post_process_operations()
f.write(prog.to_json(indent=None, separators=(",", ":"), ensure_ascii=False))
f.flush()
os.fsync(f.fileno())
if not os.path.exists(plan_path):
os.rename(tmp_path, plan_path)
else:
os.remove(tmp_path)
except Exception:
Path(plan_path).unlink(missing_ok=True)
execution_plan = ExecutionPlan(plan_path, rank)
handle = _ExecutionPlanHandle.create(
id=plan_id,
world_size=algo_spec.world_size,
nranks_per_node=algo_spec.nranks_per_node,
plan=execution_plan,
tags=algo_spec.tags,
)
return ExecutionPlanHandle(handle)
compile: DslCompiler = DslCompiler()
compile_native: NativeCodeCompiler = NativeCodeCompiler()

View File

@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from .algorithm import *
from .comm import *
from .compiler import *
from .buffer import *
__all__ = []
__all__ += algorithm.__all__
__all__ += comm.__all__
__all__ += compiler.__all__
__all__ += buffer.__all__

View File

@@ -0,0 +1,230 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
from typing import Optional, Tuple, Dict
from functools import cached_property
from mscclpp._mscclpp import (
Algorithm as _Algorithm,
DslAlgorithm as _DslAlgorithm,
AlgorithmType as _AlgorithmType,
Communicator,
CollectiveBufferMode,
DataType,
Executor,
ExecutionPlan,
ReduceOp,
)
__all__ = ["Algorithm", "AlgorithmBuilder", "AlgorithmCollection"]
class Algorithm:
"""A wrapper for collective communication algorithms.
This class provides a Python interface for collective communication algorithms
such as allreduce, allgather, and reduce-scatter. Algorithms can be either
DSL-based (defined using MSCCL++ execution plans) or native (implemented in C++/CUDA).
Attributes:
name: Human-readable name of the algorithm.
collective: The collective operation this algorithm implements (e.g., "allreduce").
message_size_range: Tuple of (min_size, max_size) in bytes for valid message sizes.
tags: Dictionary of tag names to tag values for algorithm selection hints.
buffer_mode: The buffer mode supported by this algorithm (IN_PLACE, OUT_OF_PLACE, or ANY).
"""
class Constraint:
"""Constraints that define valid execution environments for the algorithm.
Args:
world_size: Required world size (number of ranks). 0 means any size.
n_ranks_per_node: Required number of ranks per node. 0 means any.
"""
def __init__(self, world_size: int = 0, n_ranks_per_node: int = 0):
self._constraint = _Algorithm.Constraint(world_size, n_ranks_per_node)
@property
def world_size(self) -> int:
return self._constraint.worldSize
@property
def n_ranks_per_node(self) -> int:
return self._constraint.nRanksPerNode
def __init__(
self,
id: Optional[str] = None,
execution_plan: Optional[ExecutionPlan] = None,
native_handle: Optional[_Algorithm] = None,
tags: Optional[Dict[str, int]] = None,
constraint: Optional[Constraint] = None,
):
if execution_plan is not None:
self._algorithm = _DslAlgorithm(
id,
execution_plan,
tags=tags if tags is not None else {},
constraint=constraint._constraint if constraint is not None else _Algorithm.Constraint(),
)
elif native_handle is not None:
self._algorithm = native_handle
@classmethod
def create_from_native_handle(cls, handle: _Algorithm):
"""Create an Algorithm instance from a native C++ algorithm handle.
Args:
handle: The native C++ algorithm handle.
Returns:
A new Algorithm instance wrapping the native handle.
"""
return cls(
native_handle=handle,
)
@classmethod
def create_from_native_capsule(cls, obj):
"""Create an Algorithm instance from a PyCapsule object.
Args:
obj: A PyCapsule containing a native algorithm pointer.
Returns:
A new Algorithm instance wrapping the algorithm from the capsule.
"""
handle = _Algorithm.from_native_capsule(obj)
return cls(native_handle=handle)
@cached_property
def name(self) -> str:
"""The human-readable name of the algorithm."""
return self._algorithm.name
@cached_property
def collective(self) -> str:
"""The collective operation this algorithm implements (e.g., "allreduce", "allgather")."""
return self._algorithm.collective
@cached_property
def message_size_range(self) -> Tuple[int, int]:
"""The valid message size range (min_size, max_size) in bytes."""
return (self._algorithm.message_range[0], self._algorithm.message_range[1])
@cached_property
def tags(self) -> Dict[str, int]:
"""Dictionary of tag names to tag values for algorithm selection hints."""
return self._algorithm.tags
@cached_property
def buffer_mode(self) -> CollectiveBufferMode:
"""The buffer mode supported by this algorithm (IN_PLACE, OUT_OF_PLACE, or ANY)."""
return self._algorithm.buffer_mode
def is_dsl_algorithm(self) -> bool:
"""Check if this is a DSL-based algorithm.
Returns:
True if this algorithm is defined using DSL/execution plan, False otherwise.
"""
if self._algorithm.type == _AlgorithmType.DSL:
return True
return False
def is_native_algorithm(self) -> bool:
"""Check if this is a native C++/CUDA algorithm.
Returns:
True if this algorithm is implemented natively, False otherwise.
"""
if self._algorithm.type == _AlgorithmType.NATIVE:
return True
return False
def execute(
self,
comm: Communicator,
input_buffer: int,
output_buffer: int,
input_size: int,
output_size: int,
dtype: DataType,
op: ReduceOp = ReduceOp.NOP,
stream: int = 0,
executor: Optional[Executor] = None,
nblocks=0,
nthreads_per_block=0,
extras: Optional[Dict[str, int]] = None,
) -> int:
"""Execute the collective algorithm.
Args:
comm: The communicator to use.
input_buffer: Device pointer to the input buffer.
output_buffer: Device pointer to the output buffer.
input_size: Size of the input buffer in bytes.
output_size: Size of the output buffer in bytes.
dtype: Data type of the elements.
op: Reduction operation for reduce-type collectives (default: NOP).
stream: CUDA stream to execute on (default: 0).
executor: The executor for DSL algorithms (required for DSL, optional for native).
nblocks: Number of CUDA blocks (0 for auto-selection).
nthreads_per_block: Number of threads per block (0 for auto-selection).
extras: Additional algorithm-specific parameters.
Returns:
The result code (0 for success).
"""
return self._algorithm.execute(
comm,
int(input_buffer),
int(output_buffer),
input_size,
output_size,
dtype,
op,
int(stream),
executor,
nblocks,
nthreads_per_block,
extras if extras is not None else {},
)
class AlgorithmBuilder:
def __init__(self, algorithm_builder: _AlgorithmBuilder):
self._algorithm_builder = algorithm_builder
def build(self) -> Algorithm:
return Algorithm.create_from_native_handle(self._algorithm_builder.build())
class AlgorithmCollection:
def __init__(self, native_collection: _AlgorithmCollection):
self._native_collection = native_collection
self._algorithms = [Algorithm.create_from_native_handle(algo) for algo in self._native_collection.to_list()]
def __iter__(self):
"""Iterate over all algorithms in the collection."""
return iter(self._algorithms)
def __len__(self):
"""Return the number of algorithms in the collection."""
return len(self._algorithms)
def __getitem__(self, index: int) -> Algorithm:
"""Get an algorithm by index."""
return self._algorithms[index]
def get_by_collective(self, collective: str):
"""Get all algorithms for a specific collective operation."""
return [algo for algo in self._algorithms if algo.collective == collective]
def register_algorithm(self, collective: str, algo_name: str, algorithm: Algorithm):
"""Register an algorithm for a collective operation."""
self._native_collection.register_algorithm(collective, algo_name, algorithm._algorithm)
self._algorithms.append(algorithm)

View File

@@ -0,0 +1,30 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from typing import Union, Tuple
import cupy as cp
import numpy as np
from mscclpp._mscclpp import RawGpuBuffer
__all__ = ["GpuBuffer"]
class GpuBuffer(cp.ndarray):
def __new__(
cls, shape: Union[int, Tuple[int]], dtype: cp.dtype = float, strides: Tuple[int] = None, order: str = "C"
):
# Check if `shape` is valid
if isinstance(shape, int):
shape = (shape,)
try:
shape = tuple(shape)
except TypeError:
raise ValueError("Shape must be a tuple-like or an integer.")
if any(s <= 0 for s in shape):
raise ValueError("Shape must be positive.")
# Create the buffer
buffer = RawGpuBuffer(np.prod(shape) * np.dtype(dtype).itemsize)
memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer.data(), buffer.bytes(), buffer), 0)
return cp.ndarray(shape, dtype=dtype, strides=strides, order=order, memptr=memptr)

View File

@@ -2,22 +2,19 @@
# Licensed under the MIT license.
from __future__ import annotations
from typing import Tuple, Type
from typing import Type
import cupy as cp
from ._mscclpp import (
from mscclpp._mscclpp import (
Communicator,
Connection,
connect_nvls_collective,
EndpointConfig,
Semaphore,
Host2DeviceSemaphore,
Host2HostSemaphore,
ProxyService,
RegisteredMemory,
PortChannel,
MemoryChannel,
MemoryDevice2DeviceSemaphore,
TcpBootstrap,
Transport,
TransportFlags,
@@ -27,6 +24,8 @@ import numpy as np
from mscclpp.utils import is_torch_tensor
__all__ = ["CommGroup"]
class CommGroup:
def __init__(

View File

@@ -0,0 +1,350 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
import importlib.util
import inspect
import logging
import json
import os
import subprocess
import fcntl
from typing import Any, Callable
from pathlib import Path
import pybind11
import sys
import sysconfig
from blake3 import blake3
import cupy as cp
from mscclpp._version import __version__
from .algorithm import Algorithm
from mscclpp.language.program import CollectiveProgram
from mscclpp.language.utils import AlgoSpec
from mscclpp.utils import get_device_arch
from mscclpp._mscclpp import (
ExecutionPlan,
)
logging.basicConfig(level=logging.INFO)
__all__ = ["DslCompiler", "NativeCodeCompiler"]
def _stable_json_bytes(obj: Any) -> bytes:
return json.dumps(
obj,
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
).encode("utf-8")
class DslCompiler:
"""Compiler for MSCCL++ DSL (Domain-Specific Language) algorithms.
This compiler transforms high-level algorithm descriptions written in Python
into execution plans that can be run on GPUs. The compiled plans are cached
to disk for reuse.
The cache location can be configured via the `MSCCLPP_EXECUTION_PLAN_DIR`
environment variable (defaults to `~/.cache/mscclpp`).
Example:
>>> compiler = DslCompiler()
>>> algo = compiler.compile(my_allreduce_algo, algo_spec, rank=0)
"""
def __init__(self):
pass
def __call__(self, algo: Callable[..., CollectiveProgram], algo_spec: AlgoSpec, rank: int, **kwds) -> Algorithm:
return self.compile(algo, algo_spec, rank, **kwds)
def compile(
self,
algo: Callable[..., CollectiveProgram],
algo_spec: AlgoSpec,
rank: int,
**kwargs,
) -> Algorithm:
"""Compile a MSCCL++ DSL program from a high-level algorithm description.
This method takes a Python function that defines a collective communication
algorithm and compiles it into an executable Algorithm. The compilation
result is cached based on a hash of the source code and algorithm specification.
Args:
algo: A callable (function or class) that takes an AlgoSpec and returns
a CollectiveProgram. This defines the communication pattern.
algo_spec: Algorithm specification containing:
- collective: The collective operation type (e.g., allreduce, allgather)
- world_size: Total number of ranks
- nranks_per_node: Number of ranks per node
- instances: Number of algorithm instances
- protocol: Communication protocol to use
- name: Human-readable algorithm name
- tags: Dictionary of tags for algorithm selection
rank: The rank of the current process (0 to world_size-1).
**kwargs: Additional keyword arguments passed to the algorithm function.
Returns:
Algorithm: The compiled algorithm ready for execution.
Raises:
ValueError: If the 'algo' argument is not callable.
Note:
Compiled execution plans are cached to disk. The cache key is computed
from the algorithm source code, specification, and MSCCL++ version.
Subsequent calls with the same inputs will reuse the cached plan.
Example:
>>> def my_ring_allreduce(spec: AlgoSpec) -> CollectiveProgram:
... # Define algorithm using MSCCL++ DSL
... ...
>>> compiler = DslCompiler()
>>> spec = AlgoSpec(collective=Collective.allreduce, world_size=8, ...)
>>> algo = compiler.compile(my_ring_allreduce, spec, rank=0)
"""
if not callable(algo):
raise ValueError("The 'algo' argument must be a callable (e.g., a function or class).")
prog: CollectiveProgram = algo(
algo_spec,
**kwargs,
)
source = inspect.getsource(algo)
source_hash = blake3(source.encode("utf-8")).hexdigest()
plan_id = blake3(
_stable_json_bytes(
{
"version": __version__,
"algo_name": algo_spec.name,
"collective": algo_spec.collective.name,
"tags": sorted(algo_spec.tags.items()),
"source_hash": source_hash,
"envs": {
"nranks_per_node": algo_spec.nranks_per_node,
"world_size": algo_spec.world_size,
"instances": algo_spec.instances,
"protocol": algo_spec.protocol,
},
}
)
).hexdigest()
plan_dir = os.environ.get("MSCCLPP_EXECUTION_PLAN_DIR", Path.home() / ".cache/mscclpp")
os.makedirs(plan_dir, exist_ok=True)
filename = f"{plan_id}.json"
plan_path = os.path.join(plan_dir, filename)
tmp_path = plan_path + f".tmp.{os.getpid()}"
if not os.path.exists(plan_path):
try:
# TODO (binyli): Each rank could generate its own execution plan separately. Doesn't need to generate whole plan.
with open(tmp_path, "w") as f:
prog.post_process_operations()
f.write(prog.to_json(indent=None, separators=(",", ":"), ensure_ascii=False))
f.flush()
os.fsync(f.fileno())
if not os.path.exists(plan_path):
os.rename(tmp_path, plan_path)
else:
os.remove(tmp_path)
except Exception:
Path(plan_path).unlink(missing_ok=True)
execution_plan = ExecutionPlan(plan_path, rank)
return Algorithm(
id=plan_id,
execution_plan=execution_plan,
constraint=Algorithm.Constraint(
world_size=algo_spec.world_size, n_ranks_per_node=algo_spec.nranks_per_node
),
tags=algo_spec.tags,
)
class NativeCodeCompiler:
"""Compiler for native CUDA/HIP algorithm implementations.
This compiler takes CUDA or HIP source files containing custom collective
algorithm kernels and compiles them into loadable Python modules using
pybind11 bindings.
The compiler automatically detects whether to use NVCC (CUDA) or HIPCC (ROCm)
based on the runtime environment. Compiled modules are cached to avoid
recompilation.
The cache location can be configured via the `MSCCLPP_NATIVE_CACHE_DIR`
environment variable (defaults to `~/.cache/mscclpp/native`).
Attributes:
_is_hip: True if running on AMD/ROCm, False for NVIDIA/CUDA.
_device_arch: The GPU architecture string (e.g., "sm_90" or "gfx90a").
_compiler: Path to the compiler executable (nvcc or hipcc).
Example:
>>> compiler = NativeCodeCompiler()
>>> module = compiler.compile("my_kernel", "path/to/kernel.cu")
>>> algo = module.create_algorithm()
"""
def __init__(self):
self._is_hip = cp.cuda.runtime.is_hip
self._device_arch = get_device_arch()
self._compiler = self._get_compiler()
self._default_options = ["-std=c++17", "-O3", "--shared"]
python_include = sysconfig.get_path("include")
pybind11_include = pybind11.get_include()
self._default_options += [f"-I{python_include}", f"-I{pybind11_include}"]
python_lib = f"-lpython{sys.version_info.major}.{sys.version_info.minor}"
self._default_options.append(python_lib)
self._lib_home = os.path.abspath(os.path.dirname(__file__))
if not self._is_hip:
# Format: -gencode=arch=compute_90,code=sm_90
compute_arch = self._device_arch.replace("sm_", "compute_")
arch_flag = f"-gencode=arch={compute_arch},code={self._device_arch}"
self._default_options.append(arch_flag)
self._default_options += ["--compiler-options", "-fPIC"]
self._default_options += ["--linker-options", f"-rpath,{self._lib_home}/lib"]
else:
# Format for HIP: --offload-arch=gfx90a
arch_flag = f"--offload-arch={self._device_arch}"
self._default_options.append(arch_flag)
self._default_options += ["-fPIC"]
self._default_options += ["-D__HIP_PLATFORM_AMD__"]
self._default_options += [f"-Wl,-rpath,{self._lib_home}/lib"]
self._default_options = self._default_options + [
"-I" + os.path.join(self._lib_home, "include"),
"-L" + os.path.join(self._lib_home, "lib"),
"-lmscclpp",
]
cache_root = os.environ.get("MSCCLPP_NATIVE_CACHE_DIR", Path.home() / ".cache/mscclpp/native")
self._cache_dir = Path(cache_root)
self._cache_dir.mkdir(parents=True, exist_ok=True)
def _get_compiler(self) -> str:
"""Get the path to the appropriate compiler.
Returns:
Path to nvcc (CUDA) or hipcc (ROCm) compiler.
"""
if self._is_hip:
rocm_home = os.environ.get("ROCM_HOME")
return os.path.join(rocm_home, "bin/hipcc") if rocm_home else "hipcc"
else:
cuda_home = os.environ.get("CUDA_HOME")
return os.path.join(cuda_home, "bin/nvcc") if cuda_home else "nvcc"
def get_arch(self):
"""Get the target GPU architecture.
Returns:
str: The GPU architecture string (e.g., "sm_90" for NVIDIA or "gfx90a" for AMD).
"""
return self._device_arch
def __call__(self, name: str, file: str, **kwds):
return self.compile(name, file, **kwds)
def compile(self, name: str, file: str):
"""Compile a native CUDA/HIP source file into a Python module.
This method compiles a CUDA (.cu) or HIP source file containing custom
collective algorithm kernels into a dynamically loadable Python module.
The module is expected to use pybind11 bindings to expose algorithm
creation functions.
Compilation results are cached based on a hash of the source code,
compiler options, and GPU architecture. Subsequent calls with unchanged
inputs will return the cached module.
Args:
name: The name of the Python module to create. This will be the
module name used for importing (e.g., `import name`).
file: Path to the CUDA/HIP source file to compile.
Returns:
module: The compiled and loaded Python module containing the
algorithm implementation.
Raises:
FileNotFoundError: If the specified source file does not exist.
RuntimeError: If compilation fails (compiler not found, syntax errors, etc.).
ImportError: If the compiled module cannot be loaded.
Note:
- The source file should include pybind11 bindings to expose functions.
- MSCCLPP headers are automatically included in the compilation.
- The module is cached in `MSCCLPP_NATIVE_CACHE_DIR` (default: ~/.cache/mscclpp/native).
- File locking is used to prevent race conditions during parallel compilation.
Example:
>>> compiler = NativeCodeCompiler()
>>> # Compile a custom allreduce kernel
>>> module = compiler.compile("my_allreduce", "kernels/allreduce.cu")
>>> # Use the module to create an algorithm
>>> algo = module.create_allreduce_algorithm(comm, buffer, size)
"""
if not os.path.isfile(file):
raise FileNotFoundError(f"The specified source file does not exist: {file}")
with open(file, "rb") as source_file:
source_bytes = source_file.read()
source_hash = blake3(source_bytes).hexdigest()
cache_key = blake3(
_stable_json_bytes(
{
"version": __version__,
"source_hash": source_hash,
"compiler": self._compiler,
"options": self._default_options,
"arch": self._device_arch,
}
)
).hexdigest()
output_file = self._cache_dir / f"{name}-{cache_key}.so"
lock_file = output_file.with_suffix(output_file.suffix + ".lock")
with open(lock_file, "w") as lock_handle:
fcntl.flock(lock_handle, fcntl.LOCK_EX)
if not output_file.exists():
tmp_file = output_file.with_suffix(output_file.suffix + f".tmp.{os.getpid()}")
compile_command = [self._compiler] + self._default_options + ["-o", str(tmp_file), file]
try:
subprocess.run(compile_command, check=True, capture_output=True, text=True)
os.replace(tmp_file, output_file)
except FileNotFoundError as e:
Path(tmp_file).unlink(missing_ok=True)
raise RuntimeError(
f"Compiler '{self._compiler}' not found. Make sure it's installed and in PATH."
) from e
except subprocess.CalledProcessError as e:
Path(tmp_file).unlink(missing_ok=True)
raise RuntimeError(
f"Compilation failed with return code {e.returncode}.\n"
f"Command: {' '.join(compile_command)}\n"
f"Stdout: {e.stdout}\n"
f"Stderr: {e.stderr}"
) from e
module_name = name
existing_module = sys.modules.get(module_name)
if existing_module and getattr(existing_module, "__mscclpp_cache_key__", None) == cache_key:
return existing_module
spec = importlib.util.spec_from_file_location(module_name, output_file)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load module '{name}' from '{output_file}'")
module = importlib.util.module_from_spec(spec)
module.__mscclpp_cache_key__ = cache_key
sys.modules[module_name] = module
spec.loader.exec_module(module)
logging.debug(f"Successfully compiled and loaded module '{name}' from '{output_file}'")
return module

View File

@@ -1,6 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from mscclpp.language.default_algos.allreduce_2nodes import allreduce_2nodes
from mscclpp.default_algos.allreduce_2nodes import allreduce_2nodes
__all__ = ["allreduce_2nodes"]

View File

@@ -0,0 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from .algorithm_collection_builder import *
__all__ = algorithm_collection_builder.__all__

View File

@@ -0,0 +1,60 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
from typing import Union
from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection
import atexit
from mscclpp._mscclpp import (
AlgorithmCollectionBuilder as _AlgorithmCollectionBuilder,
)
__all__ = ["AlgorithmCollectionBuilder"]
class AlgorithmCollectionBuilder:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(AlgorithmCollectionBuilder, cls).__new__(cls)
return cls._instance
@classmethod
def reset(cls):
if cls._instance is not None:
_AlgorithmCollectionBuilder.reset()
cls._instance = None
def __init__(self):
if not hasattr(self, "_initialized"):
self._builder = _AlgorithmCollectionBuilder.get_instance()
self._initialized = True
def add_algorithm_builder(self, algorithm_builder: Union[AlgorithmBuilder, Algorithm]):
if isinstance(algorithm_builder, AlgorithmBuilder):
self._builder.add_algorithm_builder(algorithm_builder._algorithm_builder)
return
if isinstance(algorithm_builder, Algorithm):
if algorithm_builder.is_dsl_algorithm():
self._builder.add_dsl_algorithm_builder(algorithm_builder._algorithm)
return
raise ValueError("The 'algorithm_builder' argument must be an instance of AlgorithmBuilder or DSL Algorithm.")
def set_algorithm_selector(self, selector):
self._builder.set_algorithm_selector(selector)
def set_fallback_algorithm_selector(self, selector):
self._builder.set_fallback_algorithm_selector(selector)
def build(self) -> AlgorithmCollection:
collection = self._builder.build()
return AlgorithmCollection(collection)
def build_default_algorithms(self, scratch_buffer: int, scratch_buffer_size: int, rank: int) -> AlgorithmCollection:
native_collection = self._builder.build_default_algorithms(int(scratch_buffer), scratch_buffer_size, rank)
return AlgorithmCollection(native_collection)
atexit.register(AlgorithmCollectionBuilder.reset)

View File

@@ -2,3 +2,5 @@
# Licensed under the MIT License.
"""MSCCL++ DSL."""
from .utils import *

View File

@@ -5,6 +5,8 @@ from enum import Enum
from dataclasses import dataclass, field
from mscclpp.language.collectives import Collective
__all__ = ["AlgoSpec", "ReplicationPolicy"]
class ReplicationPolicy(Enum):
interleaved = "interleaved"

View File

@@ -6,11 +6,12 @@ import os
import struct
import subprocess
import tempfile
from typing import Any, Type, Union, Tuple
from typing import Any, Type, Union
import cupy as cp
import numpy as np
from ._mscclpp import RawGpuBuffer
from mscclpp._mscclpp import DataType
try:
import torch
@@ -22,6 +23,22 @@ except ImportError:
torchTensor = Type[Any]
__all__ = [
"Kernel",
"KernelBuilder",
"pack",
"get_device_arch",
"torch_dtype_to_mscclpp_dtype",
]
def get_device_arch() -> str:
if cp.cuda.runtime.is_hip:
return cp.cuda.runtime.getDeviceProperties(cp.cuda.Device().id)["gcnArchName"].decode("utf-8")
else:
return f"sm_{cp.cuda.Device().compute_capability}"
class Kernel:
CU_LAUNCH_PARAM_BUFFER_POINTER = 0x01
CU_LAUNCH_PARAM_BUFFER_SIZE = 0x02
@@ -86,7 +103,8 @@ class KernelBuilder:
mscclpp_home = os.environ.get("MSCCLPP_HOME", "/usr/local/mscclpp")
include_dir = os.path.join(mscclpp_home, "include")
if not cp.cuda.runtime.is_hip:
compute_capability = cp.cuda.Device().compute_capability
arch = get_device_arch()
compute_capability = arch.replace("sm_", "")
cuda_home = os.environ.get("CUDA_HOME")
nvcc = os.path.join(cuda_home, "bin/nvcc") if cuda_home else "nvcc"
command = [
@@ -104,9 +122,7 @@ class KernelBuilder:
]
else:
# the gcn arch name is like "gfx942:sramecc+:xnack-"
gcn_arch = (
cp.cuda.runtime.getDeviceProperties(cp.cuda.Device().id)["gcnArchName"].decode("utf-8").split(":")[0]
)
gcn_arch = get_device_arch()
rocm_home = os.environ.get("ROCM_HOME")
hipcc = os.path.join(rocm_home, "bin/hipcc") if rocm_home else "hipcc"
command = [
@@ -138,25 +154,6 @@ class KernelBuilder:
self._tempdir.cleanup()
class GpuBuffer(cp.ndarray):
def __new__(
cls, shape: Union[int, Tuple[int]], dtype: cp.dtype = float, strides: Tuple[int] = None, order: str = "C"
):
# Check if `shape` is valid
if isinstance(shape, int):
shape = (shape,)
try:
shape = tuple(shape)
except TypeError:
raise ValueError("Shape must be a tuple-like or an integer.")
if any(s <= 0 for s in shape):
raise ValueError("Shape must be positive.")
# Create the buffer
buffer = RawGpuBuffer(np.prod(shape) * np.dtype(dtype).itemsize)
memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer.data(), buffer.bytes(), buffer), 0)
return cp.ndarray(shape, dtype=dtype, strides=strides, order=order, memptr=memptr)
def pack(*args):
res = b""
for arg in list(args):
@@ -182,3 +179,18 @@ def pack(*args):
def is_torch_tensor(tensor: Any) -> bool:
return _use_torch and isinstance(tensor, torchTensor)
def torch_dtype_to_mscclpp_dtype(dtype: "torch.dtype") -> DataType:
if not _use_torch:
raise RuntimeError("PyTorch is not available.")
if dtype == torch.float16:
return DataType.float16
elif dtype == torch.float32:
return DataType.float32
elif dtype == torch.int32:
return DataType.int32
elif dtype == torch.bfloat16:
return DataType.bfloat16
else:
raise ValueError(f"Unknown data type: {dtype}")

View File

@@ -1 +1,4 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from .mscclpp_op import MscclppAllReduce1, MscclppAllReduce2, MscclppAllReduce3, MscclppAllReduce4, MscclppAllReduce5

View File

@@ -13,9 +13,7 @@ from mscclpp_op import (
from nccl_op import NcclAllReduce
from mpi4py import MPI
import cupy.cuda.nccl as nccl
import mscclpp.comm as mscclpp_comm
from mscclpp import ProxyService, is_nvls_supported
from mscclpp.utils import GpuBuffer
from mscclpp import ProxyService, is_nvls_supported, CommGroup, GpuBuffer
from prettytable import PrettyTable
import netifaces as ni
import ipaddress
@@ -160,9 +158,7 @@ def find_best_config(mscclpp_call, niter):
return best_config, best_time
def run_benchmark(
mscclpp_group: mscclpp_comm.CommGroup, nccl_op: nccl.NcclCommunicator, table: PrettyTable, niter: int, nelem: int
):
def run_benchmark(mscclpp_group: CommGroup, nccl_op: nccl.NcclCommunicator, table: PrettyTable, niter: int, nelem: int):
memory = GpuBuffer(nelem, dtype=data_type)
memory_out = GpuBuffer(nelem, dtype=data_type)
cp.cuda.runtime.deviceSynchronize()
@@ -259,9 +255,7 @@ if __name__ == "__main__":
network_interface, my_ip = get_netinterface_info()
root_ip = MPI.COMM_WORLD.bcast(my_ip, root=0)
ifIpPortTrio = network_interface + ":" + root_ip + ":50000" # some random port
mscclpp_group = mscclpp_comm.CommGroup(
interfaceIpPortTrio=ifIpPortTrio, rank=MPI.COMM_WORLD.rank, size=MPI.COMM_WORLD.size
)
mscclpp_group = CommGroup(interfaceIpPortTrio=ifIpPortTrio, rank=MPI.COMM_WORLD.rank, size=MPI.COMM_WORLD.size)
# create a NcclComm
if MPI.COMM_WORLD.rank == 0:

View File

@@ -1,9 +1,12 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
import cupy as cp
import ctypes
from mscclpp import Transport, ProxyService, MemoryDevice2DeviceSemaphore
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, GpuBuffer, pack
from mscclpp import CommGroup, GpuBuffer
from mscclpp.utils import KernelBuilder, pack
IB_TRANSPORTS = [
Transport.IB0,
@@ -31,7 +34,7 @@ def type_to_str(dtype):
class MscclppAllReduce1:
def __init__(
self,
group: mscclpp_comm.CommGroup,
group: CommGroup,
memory: cp.ndarray,
read_only: int = 1,
block_size: int = 1024,
@@ -97,7 +100,7 @@ class MscclppAllReduce1:
class MscclppAllReduce2:
def __init__(
self,
group: mscclpp_comm.CommGroup,
group: CommGroup,
memory: cp.ndarray,
memory_out: cp.ndarray,
block_size: int = 512,
@@ -164,7 +167,7 @@ class MscclppAllReduce2:
class MscclppAllReduce3:
def __init__(
self,
group: mscclpp_comm.CommGroup,
group: CommGroup,
memory: cp.ndarray,
proxy_service: ProxyService,
block_size: int = 1024,
@@ -234,7 +237,7 @@ class MscclppAllReduce3:
class MscclppAllReduce4:
def __init__(
self,
group: mscclpp_comm.CommGroup,
group: CommGroup,
memory: cp.ndarray,
nranks_per_node: int,
proxy_service: ProxyService,
@@ -335,7 +338,7 @@ class MscclppAllReduce4:
class MscclppAllReduce5:
def __init__(
self,
group: mscclpp_comm.CommGroup,
group: CommGroup,
memory: cp.ndarray,
memory_out: cp.ndarray,
nranks_per_node: int,
@@ -428,7 +431,7 @@ class MscclppAllReduce5:
class MscclppAllReduce6:
def __init__(
self,
group: mscclpp_comm.CommGroup,
group: CommGroup,
nelem: int,
memory_dtype: cp.dtype,
block_size: int = 1024,

View File

@@ -1,3 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import cupy.cuda.nccl as nccl
from mpi4py import MPI
import cupy as cp

View File

@@ -6,4 +6,5 @@ pytest
numpy
matplotlib
sortedcontainers @ git+https://github.com/grantjenks/python-sortedcontainers.git@3ac358631f58c1347f1d6d2d92784117db0f38ed
blake3
blake3
pybind11

View File

@@ -6,4 +6,5 @@ pytest
numpy
matplotlib
sortedcontainers @ git+https://github.com/grantjenks/python-sortedcontainers.git@3ac358631f58c1347f1d6d2d92784117db0f38ed
blake3
blake3
pybind11

View File

@@ -10,7 +10,7 @@ from mscclpp import (
npkit,
env,
)
import mscclpp.comm as mscclpp_comm
from mscclpp import CommGroup, GpuBuffer
from mscclpp.utils import KernelBuilder, GpuBuffer, pack
import os
import struct
@@ -180,7 +180,7 @@ def main(
n_iters: int = 10,
n_graph_iters: int = 10,
):
mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD)
mscclpp_group = CommGroup(MPI.COMM_WORLD)
cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use()
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = env().npkit_dump_dir

View File

@@ -13,7 +13,6 @@ import pytest
from mscclpp import (
ErrorCode,
Error,
DataType,
EndpointConfig,
ExecutionPlan,
@@ -31,8 +30,8 @@ from mscclpp import (
Device,
DeviceType,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, GpuBuffer, pack
from mscclpp import CommGroup, GpuBuffer
from mscclpp.utils import KernelBuilder, pack
from ._cpp import _ext
from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group
@@ -75,7 +74,7 @@ def test_group_with_ip(mpi_group: MpiGroup, ifIpPortTrio: str):
# ranks are on different nodes
pytest.skip("this case is not supported as localhost will be different for different nodes")
group = mscclpp_comm.CommGroup(mpi_group.comm, ifIpPortTrio)
group = CommGroup(mpi_group.comm, ifIpPortTrio)
nelem = 1024
memory = np.zeros(nelem, dtype=np.int32)
@@ -141,7 +140,7 @@ def test_bootstrap_init_gil_release(mpi_group: MpiGroup):
mpi_group.comm.barrier()
def create_connection(group: mscclpp_comm.CommGroup, connection_type: str):
def create_connection(group: CommGroup, connection_type: str):
if connection_type == "NVLS":
all_ranks = list(range(group.nranks))
tran = Transport.CudaIpc
@@ -163,7 +162,7 @@ def create_connection(group: mscclpp_comm.CommGroup, connection_type: str):
def create_group_and_connection(mpi_group: MpiGroup, connection_type: str):
if (connection_type == "NVLink" or connection_type == "NVLS") and all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvlink/nvls for cross node")
group = mscclpp_comm.CommGroup(mpi_group.comm)
group = CommGroup(mpi_group.comm)
try:
connection = create_connection(group, connection_type)
except Error as e:
@@ -282,7 +281,7 @@ def test_connection_write_and_signal(mpi_group: MpiGroup, connection_type: str,
@parametrize_mpi_groups(2, 4, 8, 16)
def test_h2h_semaphores(mpi_group: MpiGroup):
group = mscclpp_comm.CommGroup(mpi_group.comm)
group = CommGroup(mpi_group.comm)
tran = group.my_ib_device(group.my_rank % 8)
endpoint = EndpointConfig(tran, Device(DeviceType.CPU))
remote_nghrs = list(range(group.nranks))
@@ -302,7 +301,7 @@ def test_h2h_semaphores(mpi_group: MpiGroup):
@parametrize_mpi_groups(2, 4, 8, 16)
def test_h2h_semaphores_gil_release(mpi_group: MpiGroup):
group = mscclpp_comm.CommGroup(mpi_group.comm)
group = CommGroup(mpi_group.comm)
tran = group.my_ib_device(group.my_rank % 8)
endpoint = EndpointConfig(tran, Device(DeviceType.CPU))
remote_nghrs = list(range(group.nranks))
@@ -339,7 +338,7 @@ def test_h2h_semaphores_gil_release(mpi_group: MpiGroup):
def test_nvls_connection(mpi_group: MpiGroup):
if all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvls for cross node")
group = mscclpp_comm.CommGroup(mpi_group.comm)
group = CommGroup(mpi_group.comm)
all_ranks = list(range(group.nranks))
nvls_connection = group.make_connection(all_ranks, Transport.CudaIpc, use_switch=True)
memory1 = GpuBuffer(2**29, cp.int8)
@@ -659,7 +658,7 @@ def test_executor(mpi_group: MpiGroup, filename: str):
if all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("algo not support cross node")
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm)
mscclpp_group = CommGroup(mpi_group.comm)
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = env().npkit_dump_dir
if npkit_dump_dir != "":