Address comments for PR #692 (#733)

Rename nanobind-exposed C++ types to Cpp*
Replace MSCCLPP_EXECUTION_PLAN_DIR / MSCCLPP_NATIVE_CACHE_DIR with
MSCCLPP_CACHE_DIR across C++ and Python.
This commit is contained in:
Binyang Li
2026-02-03 10:13:20 -08:00
committed by GitHub
parent 03b1936ddb
commit e21513791a
31 changed files with 211 additions and 205 deletions

View File

@@ -11,6 +11,18 @@
import sys
import importlib.util
from pathlib import Path
from unittest.mock import MagicMock
class NamedMock(MagicMock):
def __getattr__(self, name):
attr = super().__getattr__(name)
if isinstance(attr, MagicMock):
# Assigns __name__ and __qualname__ to satisfy Sphinx autodoc inspection.
attr.__name__ = name
attr.__qualname__ = name
return attr
# Add the python package to sys.path so Sphinx can find it
project_root = Path(__file__).parent.parent
@@ -63,7 +75,7 @@ autodoc_default_options = {
"show-inheritance": True,
}
# only mock the C-extension when using the source tree
autodoc_mock_imports = ["mscclpp._version", "mscclpp._mscclpp", "blake3", "cupy", "mpi4py", "numpy", "sortedcontainers"]
autodoc_mock_imports = ["mscclpp._version", "blake3", "cupy", "mpi4py", "numpy", "sortedcontainers"]
autodoc_typehints = "description"
napoleon_google_docstring = True
napoleon_numpy_docstring = True
@@ -71,6 +83,10 @@ intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"numpy": ("https://numpy.org/doc/stable/", None),
}
mock_mscclpp = NamedMock()
# Set attributes to satisfy Sphinx autodoc inspection.
mock_mscclpp.env.return_value.cache_dir = "_mscclpp"
sys.modules["mscclpp._mscclpp"] = mock_mscclpp
templates_path = ["_templates"]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]

View File

@@ -56,7 +56,7 @@ python3 -m mscclpp --install
After installation, the generated JSON execution plan can be found at:
```
~/.cache/mscclpp_default/
~/.cache/mscclpp/default/
```
**Performance Results:**

View File

@@ -7,6 +7,4 @@ This reference organizes the MSCCL++ Python API.
:toctree: py_api
:recursive:
mscclpp.comm
mscclpp.utils
mscclpp.language
mscclpp

View File

@@ -196,7 +196,7 @@ mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=$MSCCLPP_BUILD/lib
Example 2, ReduceScatter will still use msccl++ implementation since reducescatter is not in the fallbacklist.
```bash
export LD_LIBRARY_PATH=/root/mscclpp/build/lib:$LD_LIBRARY_PATH;
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=$MSCCLPP_BUILD/lib/libmscclpp_nccl.so -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=$NCCL_BUILD/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="broadcast" -x MSCCLPP_EXECUTION_PLAN_DIR=/$PATH_TO_EXECUTION_PLANS/execution-files ./build/reduce_scatter_perf -b 1K -e 256M -f 2 -d half -G 20 -w 10 -n 50
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=$MSCCLPP_BUILD/lib/libmscclpp_nccl.so -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=$NCCL_BUILD/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="broadcast" ./build/reduce_scatter_perf -b 1K -e 256M -f 2 -d half -G 20 -w 10 -n 50
```
On AMD platforms, you need to add `RCCL_MSCCL_ENABLE=0` to avoid conflicts with the fallback features.

View File

@@ -70,9 +70,9 @@ class Env {
/// Env name: `MSCCLPP_COMM_ID`. To be deprecated; don't use this.
const std::string commId;
/// Env name: `MSCCLPP_EXECUTION_PLAN_DIR`. The directory to find execution plans from. This should be set to
/// use execution plans for the NCCL API. Unset by default.
const std::string executionPlanDir;
/// Env name: `MSCCLPP_CACHE_DIR`. The directory to use for caching execution plans and other temporary files.
/// If unset, it defaults to `~/.cache/mscclpp`.
const std::string cacheDir;
/// Env name: `MSCCLPP_NPKIT_DUMP_DIR`. The directory to dump NPKIT traces to. If this is set, NPKIT will be
/// enabled and will dump traces to this directory. Unset by default.

View File

@@ -16,14 +16,16 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_algorithm(nb::module_& m) {
nb::enum_<CollectiveBufferMode>(m, "CollectiveBufferMode")
nb::enum_<CollectiveBufferMode>(m, "CppCollectiveBufferMode")
.value("ANY", CollectiveBufferMode::Any)
.value("IN_PLACE", CollectiveBufferMode::InPlace)
.value("OUT_OF_PLACE", CollectiveBufferMode::OutOfPlace);
nb::enum_<AlgorithmType>(m, "AlgorithmType").value("NATIVE", AlgorithmType::Native).value("DSL", AlgorithmType::DSL);
nb::enum_<AlgorithmType>(m, "CppAlgorithmType")
.value("NATIVE", AlgorithmType::Native)
.value("DSL", AlgorithmType::DSL);
nb::enum_<CommResult>(m, "CommResult")
nb::enum_<CommResult>(m, "CppCommResult")
.value("COMM_SUCCESS", CommResult::CommSuccess)
.value("COMM_UNHANDLED_CUDA_ERROR", CommResult::CommUnhandledCudaError)
.value("COMM_SYSTEM_ERROR", CommResult::CommSystemError)
@@ -34,13 +36,13 @@ void register_algorithm(nb::module_& m) {
.value("COMM_IN_PROGRESS", CommResult::CommInProgress)
.value("COMM_NUM_RESULTS", CommResult::CommNumResults);
nb::enum_<ReduceOp>(m, "ReduceOp")
nb::enum_<ReduceOp>(m, "CppReduceOp")
.value("SUM", ReduceOp::SUM)
.value("MIN", ReduceOp::MIN)
.value("NOP", ReduceOp::NOP);
auto algorithmClass =
nb::class_<Algorithm>(m, "Algorithm")
nb::class_<Algorithm>(m, "CppAlgorithm")
.def_static(
"from_native_capsule",
[](nb::capsule cap) {
@@ -83,21 +85,21 @@ void register_algorithm(nb::module_& m) {
.def_rw("world_size", &Algorithm::Constraint::worldSize)
.def_rw("n_ranks_per_node", &Algorithm::Constraint::nRanksPerNode);
nb::class_<AlgorithmBuilder>(m, "AlgorithmBuilder").def("build", &AlgorithmBuilder::build);
nb::class_<AlgorithmBuilder>(m, "CppAlgorithmBuilder").def("build", &AlgorithmBuilder::build);
nb::class_<DslAlgorithm, Algorithm>(m, "DslAlgorithm")
nb::class_<DslAlgorithm, Algorithm>(m, "CppDslAlgorithm")
.def(nb::init<std::string, ExecutionPlan, std::unordered_map<std::string, uint64_t>, Algorithm::Constraint>(),
nb::arg("id"), nb::arg("plan"), nb::arg("tags") = std::unordered_map<std::string, uint64_t>(),
nb::arg("constraint") = Algorithm::Constraint())
.def("build", &DslAlgorithm::build);
nb::class_<AlgorithmCollection>(m, "AlgorithmCollection")
nb::class_<AlgorithmCollection>(m, "CppAlgorithmCollection")
.def("register_algorithm", &AlgorithmCollection::registerAlgorithm, nb::arg("collective"), nb::arg("algo_name"),
nb::arg("algorithm"))
.def("get_algorithms_by_collective", &AlgorithmCollection::getAlgorithmsByCollective, nb::arg("collective"))
.def("to_list", &AlgorithmCollection::getAllAlgorithms);
nb::class_<CollectiveRequest>(m, "CollectiveRequest")
nb::class_<CollectiveRequest>(m, "CppCollectiveRequest")
.def_ro("world_size", &CollectiveRequest::worldSize)
.def_ro("n_ranks_per_node", &CollectiveRequest::nRanksPerNode)
.def_ro("rank", &CollectiveRequest::rank)

View File

@@ -32,21 +32,21 @@ extern void register_algorithm_collection_builder(nb::module_& m);
template <typename T>
void def_shared_future(nb::handle& m, const std::string& typestr) {
std::string pyclass_name = std::string("shared_future_") + typestr;
std::string pyclass_name = std::string("CppSharedFuture_") + typestr;
nb::class_<std::shared_future<T>>(m, pyclass_name.c_str()).def("get", &std::shared_future<T>::get);
}
void register_core(nb::module_& m) {
m.def("version", &version);
nb::enum_<DataType>(m, "DataType")
nb::enum_<DataType>(m, "CppDataType")
.value("int32", DataType::INT32)
.value("uint32", DataType::UINT32)
.value("float16", DataType::FLOAT16)
.value("float32", DataType::FLOAT32)
.value("bfloat16", DataType::BFLOAT16);
nb::class_<Bootstrap>(m, "Bootstrap")
nb::class_<Bootstrap>(m, "CppBootstrap")
.def("get_rank", &Bootstrap::getRank)
.def("get_n_ranks", &Bootstrap::getNranks)
.def("get_n_ranks_per_node", &Bootstrap::getNranksPerNode)
@@ -71,7 +71,7 @@ void register_core(nb::module_& m) {
.def("recv", static_cast<void (Bootstrap::*)(std::vector<char>&, int, int)>(&Bootstrap::recv), nb::arg("data"),
nb::arg("peer"), nb::arg("tag"));
nb::class_<UniqueId>(m, "UniqueId")
nb::class_<UniqueId>(m, "CppUniqueId")
.def(nb::init<>())
.def("__setstate__",
[](UniqueId& self, nb::bytes b) {
@@ -81,7 +81,7 @@ void register_core(nb::module_& m) {
.def("__getstate__",
[](const UniqueId& self) { return nb::bytes(reinterpret_cast<const char*>(self.data()), UniqueIdBytes); });
nb::class_<TcpBootstrap, Bootstrap>(m, "TcpBootstrap")
nb::class_<TcpBootstrap, Bootstrap>(m, "CppTcpBootstrap")
.def(nb::init<int, int>(), "Do not use this constructor. Use create instead.")
.def_static(
"create", [](int rank, int nRanks) { return std::make_shared<TcpBootstrap>(rank, nRanks); }, nb::arg("rank"),
@@ -93,7 +93,7 @@ void register_core(nb::module_& m) {
.def("initialize", static_cast<void (TcpBootstrap::*)(const std::string&, int64_t)>(&TcpBootstrap::initialize),
nb::call_guard<nb::gil_scoped_release>(), nb::arg("if_ip_port_trio"), nb::arg("timeout_sec") = 30);
nb::enum_<Transport>(m, "Transport")
nb::enum_<Transport>(m, "CppTransport")
.value("Unknown", Transport::Unknown)
.value("CudaIpc", Transport::CudaIpc)
.value("IB0", Transport::IB0)
@@ -106,7 +106,7 @@ void register_core(nb::module_& m) {
.value("IB7", Transport::IB7)
.value("NumTransports", Transport::NumTransports);
nb::class_<TransportFlags>(m, "TransportFlags")
nb::class_<TransportFlags>(m, "CppTransportFlags")
.def(nb::init<>())
.def(nb::init_implicit<Transport>(), nb::arg("transport"))
.def("has", &TransportFlags::has, nb::arg("transport"))
@@ -130,12 +130,12 @@ void register_core(nb::module_& m) {
.def(nb::self == nb::self)
.def(nb::self != nb::self);
nb::enum_<DeviceType>(m, "DeviceType")
nb::enum_<DeviceType>(m, "CppDeviceType")
.value("Unknown", DeviceType::Unknown)
.value("CPU", DeviceType::CPU)
.value("GPU", DeviceType::GPU);
nb::class_<Device>(m, "Device")
nb::class_<Device>(m, "CppDevice")
.def(nb::init<>())
.def(nb::init_implicit<DeviceType>(), nb::arg("type"))
.def(nb::init<DeviceType, int>(), nb::arg("type"), nb::arg("id") = -1)
@@ -147,7 +147,7 @@ void register_core(nb::module_& m) {
return ss.str();
});
nb::class_<EndpointConfig::Ib>(m, "EndpointConfigIb")
nb::class_<EndpointConfig::Ib>(m, "CppEndpointConfigIb")
.def(nb::init<>())
.def(nb::init<int, int, int, int, int, int, int>(), nb::arg("device_index") = -1,
nb::arg("port") = EndpointConfig::Ib::DefaultPort,
@@ -164,7 +164,7 @@ void register_core(nb::module_& m) {
.def_rw("max_send_wr", &EndpointConfig::Ib::maxSendWr)
.def_rw("max_wr_per_send", &EndpointConfig::Ib::maxWrPerSend);
nb::class_<RegisteredMemory>(m, "RegisteredMemory")
nb::class_<RegisteredMemory>(m, "CppRegisteredMemory")
.def(nb::init<>())
.def("data", [](RegisteredMemory& self) { return reinterpret_cast<uintptr_t>(self.data()); })
.def("size", &RegisteredMemory::size)
@@ -172,7 +172,7 @@ void register_core(nb::module_& m) {
.def("serialize", &RegisteredMemory::serialize)
.def_static("deserialize", &RegisteredMemory::deserialize, nb::arg("data"));
nb::class_<Endpoint>(m, "Endpoint")
nb::class_<Endpoint>(m, "CppEndpoint")
.def("config", &Endpoint::config)
.def("transport", &Endpoint::transport)
.def("device", &Endpoint::device)
@@ -180,7 +180,7 @@ void register_core(nb::module_& m) {
.def("serialize", &Endpoint::serialize)
.def_static("deserialize", &Endpoint::deserialize, nb::arg("data"));
nb::class_<Connection>(m, "Connection")
nb::class_<Connection>(m, "CppConnection")
.def("write", &Connection::write, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("src"), nb::arg("srcOffset"),
nb::arg("size"))
.def(
@@ -197,7 +197,7 @@ void register_core(nb::module_& m) {
.def("local_device", &Connection::localDevice)
.def("get_max_write_queue_size", &Connection::getMaxWriteQueueSize);
nb::class_<EndpointConfig>(m, "EndpointConfig")
nb::class_<EndpointConfig>(m, "CppEndpointConfig")
.def(nb::init<>())
.def(nb::init_implicit<Transport>(), nb::arg("transport"))
.def(nb::init<Transport, Device, int, EndpointConfig::Ib>(), nb::arg("transport"), nb::arg("device"),
@@ -228,7 +228,7 @@ void register_core(nb::module_& m) {
[](EndpointConfig& self, int v) { self.ib.maxWrPerSend = v; })
.def_rw("max_write_queue_size", &EndpointConfig::maxWriteQueueSize);
nb::class_<Context>(m, "Context")
nb::class_<Context>(m, "CppContext")
.def_static("create", &Context::create)
.def(
"register_memory",
@@ -239,13 +239,13 @@ void register_core(nb::module_& m) {
.def("create_endpoint", &Context::createEndpoint, nb::arg("config"))
.def("connect", &Context::connect, nb::arg("local_endpoint"), nb::arg("remote_endpoint"));
nb::class_<SemaphoreStub>(m, "SemaphoreStub")
nb::class_<SemaphoreStub>(m, "CppSemaphoreStub")
.def(nb::init<const Connection&>(), nb::arg("connection"))
.def("memory", &SemaphoreStub::memory)
.def("serialize", &SemaphoreStub::serialize)
.def_static("deserialize", &SemaphoreStub::deserialize, nb::arg("data"));
nb::class_<Semaphore>(m, "Semaphore")
nb::class_<Semaphore>(m, "CppSemaphore")
.def(nb::init<>())
.def(nb::init<const SemaphoreStub&, const SemaphoreStub&>(), nb::arg("local_stub"), nb::arg("remote_stub"))
.def("connection", &Semaphore::connection)
@@ -256,7 +256,7 @@ void register_core(nb::module_& m) {
def_shared_future<Connection>(m, "Connection");
def_shared_future<Semaphore>(m, "Semaphore");
nb::class_<Communicator>(m, "Communicator")
nb::class_<Communicator>(m, "CppCommunicator")
.def(nb::init<std::shared_ptr<Bootstrap>, std::shared_ptr<Context>>(), nb::arg("bootstrap"),
nb::arg("context") = nullptr)
.def("bootstrap", &Communicator::bootstrap)

View File

@@ -11,7 +11,7 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_env(nb::module_& m) {
nb::class_<Env>(m, "Env")
nb::class_<Env>(m, "CppEnv")
.def_ro("debug", &Env::debug)
.def_ro("debug_subsys", &Env::debugSubsys)
.def_ro("debug_file", &Env::debugFile)
@@ -20,7 +20,7 @@ void register_env(nb::module_& m) {
.def_ro("socket_family", &Env::socketFamily)
.def_ro("socket_ifname", &Env::socketIfname)
.def_ro("comm_id", &Env::commId)
.def_ro("execution_plan_dir", &Env::executionPlanDir)
.def_ro("cache_dir", &Env::cacheDir)
.def_ro("npkit_dump_dir", &Env::npkitDumpDir)
.def_ro("cuda_ipc_use_default_stream", &Env::cudaIpcUseDefaultStream);

View File

@@ -22,7 +22,7 @@ using namespace mscclpp;
m.attr(#name_).ptr());
void register_error(nb::module_ &m) {
nb::enum_<ErrorCode>(m, "ErrorCode")
nb::enum_<ErrorCode>(m, "CppErrorCode")
.value("SystemError", ErrorCode::SystemError)
.value("InternalError", ErrorCode::InternalError)
.value("RemoteError", ErrorCode::RemoteError)

View File

@@ -15,16 +15,16 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_executor(nb::module_& m) {
nb::enum_<PacketType>(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16);
nb::enum_<PacketType>(m, "CppPacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16);
nb::class_<ExecutionPlan>(m, "ExecutionPlan")
nb::class_<ExecutionPlan>(m, "CppExecutionPlan")
.def(nb::init<const std::string&, int>(), nb::arg("planPath"), nb::arg("rank"))
.def_prop_ro("name", [](const ExecutionPlan& self) -> std::string { return self.name(); })
.def_prop_ro("collective", [](const ExecutionPlan& self) -> std::string { return self.collective(); })
.def_prop_ro("min_message_size", [](const ExecutionPlan& self) -> size_t { return self.minMessageSize(); })
.def_prop_ro("max_message_size", [](const ExecutionPlan& self) -> size_t { return self.maxMessageSize(); });
nb::class_<Executor>(m, "Executor")
nb::class_<Executor>(m, "CppExecutor")
.def(nb::init<std::shared_ptr<Communicator>>(), nb::arg("comm"))
.def(
"execute",

View File

@@ -15,7 +15,7 @@ using namespace mscclpp;
using namespace mscclpp::collective;
void register_algorithm_collection_builder(nb::module_& m) {
nb::class_<AlgorithmCollectionBuilder>(m, "AlgorithmCollectionBuilder")
nb::class_<AlgorithmCollectionBuilder>(m, "CppAlgorithmCollectionBuilder")
.def_static("get_instance", &AlgorithmCollectionBuilder::getInstance)
.def("add_algorithm_builder", &AlgorithmCollectionBuilder::addAlgorithmBuilder, nb::arg("builder"))
.def(

View File

@@ -9,7 +9,7 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_fifo(nb::module_& m) {
nb::class_<ProxyTrigger>(m, "ProxyTrigger")
nb::class_<ProxyTrigger>(m, "CppProxyTrigger")
.def_prop_rw(
"fst", [](const ProxyTrigger& self) { return self.fst; },
[](ProxyTrigger& self, uint64_t v) { self.fst = v; })
@@ -17,7 +17,7 @@ void register_fifo(nb::module_& m) {
"snd", [](const ProxyTrigger& self) { return self.snd; },
[](ProxyTrigger& self, uint64_t v) { self.snd = v; });
nb::class_<FifoDeviceHandle>(m, "FifoDeviceHandle")
nb::class_<FifoDeviceHandle>(m, "CppFifoDeviceHandle")
.def_rw("triggers", &FifoDeviceHandle::triggers)
.def_rw("tail", &FifoDeviceHandle::tail)
.def_rw("head", &FifoDeviceHandle::head)
@@ -26,7 +26,7 @@ void register_fifo(nb::module_& m) {
return nb::bytes(reinterpret_cast<const char*>(&self), sizeof(self));
});
nb::class_<Fifo>(m, "Fifo")
nb::class_<Fifo>(m, "CppFifo")
.def(nb::init<int>(), nb::arg("size") = DEFAULT_FIFO_SIZE)
.def("poll", &Fifo::poll)
.def("pop", &Fifo::pop)

View File

@@ -101,7 +101,7 @@ static nb::capsule toDlpack(GpuBuffer<char> buffer, std::string dataType, std::v
void register_gpu_utils(nb::module_& m) {
m.def("is_nvls_supported", &isNvlsSupported);
nb::class_<GpuBuffer<char>>(m, "RawGpuBuffer")
nb::class_<GpuBuffer<char>>(m, "CppRawGpuBuffer")
.def(nb::init<size_t>(), nb::arg("nelems"))
.def("nelems", &GpuBuffer<char>::nelems)
.def("bytes", &GpuBuffer<char>::bytes)

View File

@@ -11,20 +11,20 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_memory_channel(nb::module_& m) {
nb::class_<BaseMemoryChannel>(m, "BaseMemoryChannel")
nb::class_<BaseMemoryChannel>(m, "CppBaseMemoryChannel")
.def(nb::init<>())
.def(nb::init<std::shared_ptr<MemoryDevice2DeviceSemaphore>>(), nb::arg("semaphore"))
.def(nb::init<const Semaphore&>(), nb::arg("semaphore"))
.def("device_handle", &BaseMemoryChannel::deviceHandle);
nb::class_<BaseMemoryChannel::DeviceHandle>(m, "BaseMemoryChannelDeviceHandle")
nb::class_<BaseMemoryChannel::DeviceHandle>(m, "CppBaseMemoryChannelDeviceHandle")
.def(nb::init<>())
.def_rw("semaphore_", &BaseMemoryChannel::DeviceHandle::semaphore_)
.def_prop_ro("raw", [](const BaseMemoryChannel::DeviceHandle& self) -> nb::bytes {
return nb::bytes(reinterpret_cast<const char*>(&self), sizeof(self));
});
nb::class_<MemoryChannel>(m, "MemoryChannel")
nb::class_<MemoryChannel>(m, "CppMemoryChannel")
.def(nb::init<>())
.def(
"__init__",
@@ -42,7 +42,7 @@ void register_memory_channel(nb::module_& m) {
nb::arg("semaphore"), nb::arg("dst"), nb::arg("src"), nb::arg("packet_buffer") = 0)
.def("device_handle", &MemoryChannel::deviceHandle);
nb::class_<MemoryChannel::DeviceHandle>(m, "MemoryChannelDeviceHandle")
nb::class_<MemoryChannel::DeviceHandle>(m, "CppMemoryChannelDeviceHandle")
.def(nb::init<>())
.def_rw("semaphore_", &MemoryChannel::DeviceHandle::semaphore_)
.def_rw("dst_", &MemoryChannel::DeviceHandle::dst_)

View File

@@ -9,7 +9,7 @@
namespace nb = nanobind;
void register_npkit(nb::module_ &m) {
nb::module_ sub_m = m.def_submodule("npkit", "NPKit functions");
nb::module_ sub_m = m.def_submodule("cpp_npkit", "NPKit functions");
sub_m.def("init", &NpKit::Init);
sub_m.def("dump", &NpKit::Dump);
sub_m.def("shutdown", &NpKit::Shutdown);

View File

@@ -7,7 +7,7 @@ void numaBind(int node);
}; // namespace mscclpp
void register_numa(nb::module_ &m) {
nb::module_ sub_m = m.def_submodule("numa", "numa functions");
nb::module_ sub_m = m.def_submodule("cpp_numa", "numa functions");
sub_m.def("get_device_numa_node", &mscclpp::getDeviceNumaNode);
sub_m.def("numa_bind", &mscclpp::numaBind);
}

View File

@@ -11,11 +11,11 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_port_channel(nb::module_& m) {
nb::class_<BaseProxyService>(m, "BaseProxyService")
nb::class_<BaseProxyService>(m, "CppBaseProxyService")
.def("start_proxy", &BaseProxyService::startProxy, nb::arg("blocking") = false)
.def("stop_proxy", &BaseProxyService::stopProxy);
nb::class_<ProxyService, BaseProxyService>(m, "ProxyService")
nb::class_<ProxyService, BaseProxyService>(m, "CppProxyService")
.def(nb::init<int>(), nb::arg("fifo_size") = DEFAULT_FIFO_SIZE)
.def("start_proxy", &ProxyService::startProxy, nb::arg("blocking") = false)
.def("stop_proxy", &ProxyService::stopProxy)
@@ -31,13 +31,13 @@ void register_port_channel(nb::module_& m) {
.def("base_port_channel", &ProxyService::basePortChannel, nb::arg("id"))
.def("port_channel", &ProxyService::portChannel, nb::arg("id"), nb::arg("dst"), nb::arg("src"));
nb::class_<BasePortChannel>(m, "BasePortChannel")
nb::class_<BasePortChannel>(m, "CppBasePortChannel")
.def(nb::init<>())
.def(nb::init<SemaphoreId, std::shared_ptr<Host2DeviceSemaphore>, std::shared_ptr<Proxy>>(),
nb::arg("semaphore_id"), nb::arg("semaphore"), nb::arg("proxy"))
.def("device_handle", &BasePortChannel::deviceHandle);
nb::class_<BasePortChannel::DeviceHandle>(m, "BasePortChannelDeviceHandle")
nb::class_<BasePortChannel::DeviceHandle>(m, "CppBasePortChannelDeviceHandle")
.def(nb::init<>())
.def_rw("semaphore_id_", &BasePortChannel::DeviceHandle::semaphoreId_)
.def_rw("semaphore_", &BasePortChannel::DeviceHandle::semaphore_)
@@ -46,13 +46,13 @@ void register_port_channel(nb::module_& m) {
return nb::bytes(reinterpret_cast<const char*>(&self), sizeof(self));
});
nb::class_<PortChannel>(m, "PortChannel")
nb::class_<PortChannel>(m, "CppPortChannel")
.def(nb::init<>())
.def(nb::init<SemaphoreId, std::shared_ptr<Host2DeviceSemaphore>, std::shared_ptr<Proxy>, MemoryId, MemoryId>(),
nb::arg("semaphore_id"), nb::arg("semaphore"), nb::arg("proxy"), nb::arg("dst"), nb::arg("src"))
.def("device_handle", &PortChannel::deviceHandle);
nb::class_<PortChannel::DeviceHandle>(m, "PortChannelDeviceHandle")
nb::class_<PortChannel::DeviceHandle>(m, "CppPortChannelDeviceHandle")
.def(nb::init<>())
.def_rw("semaphore_id_", &PortChannel::DeviceHandle::semaphoreId_)
.def_rw("semaphore_", &PortChannel::DeviceHandle::semaphore_)

View File

@@ -10,7 +10,7 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_semaphore(nb::module_& m) {
nb::class_<Host2DeviceSemaphore> host2DeviceSemaphore(m, "Host2DeviceSemaphore");
nb::class_<Host2DeviceSemaphore> host2DeviceSemaphore(m, "CppHost2DeviceSemaphore");
host2DeviceSemaphore.def(nb::init<const Semaphore&>(), nb::arg("semaphore"))
.def(nb::init<Communicator&, const Connection&>(), nb::arg("communicator"), nb::arg("connection"))
.def("connection", &Host2DeviceSemaphore::connection)
@@ -25,7 +25,7 @@ void register_semaphore(nb::module_& m) {
return nb::bytes(reinterpret_cast<const char*>(&self), sizeof(self));
});
nb::class_<Host2HostSemaphore>(m, "Host2HostSemaphore")
nb::class_<Host2HostSemaphore>(m, "CppHost2HostSemaphore")
.def(nb::init<const Semaphore&>(), nb::arg("semaphore"))
.def(nb::init<Communicator&, const Connection&>(), nb::arg("communicator"), nb::arg("connection"))
.def("connection", &Host2HostSemaphore::connection)
@@ -34,7 +34,7 @@ void register_semaphore(nb::module_& m) {
.def("wait", &Host2HostSemaphore::wait, nb::call_guard<nb::gil_scoped_release>(),
nb::arg("max_spin_count") = 10000000);
nb::class_<MemoryDevice2DeviceSemaphore> memoryDevice2DeviceSemaphore(m, "MemoryDevice2DeviceSemaphore");
nb::class_<MemoryDevice2DeviceSemaphore> memoryDevice2DeviceSemaphore(m, "CppMemoryDevice2DeviceSemaphore");
memoryDevice2DeviceSemaphore.def(nb::init<const Semaphore&>(), nb::arg("semaphore"))
.def(nb::init<Communicator&, const Connection&>(), nb::arg("communicator"), nb::arg("connection"))
.def("connection", &MemoryDevice2DeviceSemaphore::connection)

View File

@@ -15,11 +15,11 @@ namespace nb = nanobind;
using namespace mscclpp;
void register_nvls(nb::module_& m) {
nb::class_<SwitchChannel>(m, "SwitchChannel")
nb::class_<SwitchChannel>(m, "CppSwitchChannel")
.def("get_device_ptr", [](SwitchChannel* self) { return (uintptr_t)self->getDevicePtr(); })
.def("device_handle", &SwitchChannel::deviceHandle);
nb::class_<SwitchChannel::DeviceHandle>(m, "DeviceHandle")
nb::class_<SwitchChannel::DeviceHandle>(m, "CppSwitchChannelDeviceHandle")
.def(nb::init<>())
.def_rw("device_ptr", &SwitchChannel::DeviceHandle::devicePtr)
.def_rw("mc_ptr", &SwitchChannel::DeviceHandle::mcPtr)
@@ -28,7 +28,7 @@ void register_nvls(nb::module_& m) {
return nb::bytes(reinterpret_cast<const char*>(&self), sizeof(self));
});
nb::class_<NvlsConnection>(m, "NvlsConnection")
nb::class_<NvlsConnection>(m, "CppNvlsConnection")
.def("bind_allocated_memory", &NvlsConnection::bindAllocatedMemory, nb::arg("device_ptr"), nb::arg("size"));
m.def("connect_nvls_collective", &connectNvlsCollective, nb::arg("communicator"), nb::arg("all_ranks"),

View File

@@ -23,35 +23,35 @@ version = {
from ._core import *
from ._mscclpp import (
Device,
DeviceType,
Communicator,
Connection,
CppDevice as Device,
CppDeviceType as DeviceType,
CppCommunicator as Communicator,
CppConnection as Connection,
connect_nvls_collective,
EndpointConfig,
Fifo,
Semaphore,
Host2DeviceSemaphore,
Host2HostSemaphore,
numa,
ProxyService,
RegisteredMemory,
PortChannel,
MemoryChannel,
MemoryDevice2DeviceSemaphore,
TcpBootstrap,
Transport,
TransportFlags,
DataType,
ErrorCode,
Executor,
ExecutionPlan,
PacketType,
RawGpuBuffer,
ReduceOp,
CppEndpointConfig as EndpointConfig,
CppFifo as Fifo,
CppSemaphore as Semaphore,
CppHost2DeviceSemaphore as Host2DeviceSemaphore,
CppHost2HostSemaphore as Host2HostSemaphore,
cpp_numa as numa,
CppProxyService as ProxyService,
CppRegisteredMemory as RegisteredMemory,
CppPortChannel as PortChannel,
CppMemoryChannel as MemoryChannel,
CppMemoryDevice2DeviceSemaphore as MemoryDevice2DeviceSemaphore,
CppTcpBootstrap as TcpBootstrap,
CppTransport as Transport,
CppTransportFlags as TransportFlags,
CppDataType as DataType,
CppErrorCode as ErrorCode,
CppExecutor as Executor,
CppExecutionPlan as ExecutionPlan,
CppPacketType as PacketType,
CppRawGpuBuffer as RawGpuBuffer,
CppReduceOp as ReduceOp,
env,
is_nvls_supported,
npkit,
cpp_npkit as npkit,
)
__all__ = [

View File

@@ -6,7 +6,7 @@ import shutil
import argparse
from pathlib import Path
from mscclpp.language import default_algos as def_algo
from mscclpp import default_algos as def_algo
from mscclpp.language.collectives import *
from mscclpp.language.utils import AlgoSpec
@@ -57,7 +57,7 @@ default_algo_configs = [
def create_default_plans():
plan_dir = os.environ.get("MSCCLPP_EXECUTION_PLAN_DIR", Path.home() / ".cache/mscclpp_default")
plan_dir = os.environ.get("MSCCLPP_CACHE_DIR", Path.home() / ".cache/mscclpp/default")
plan_path = Path(plan_dir)
if plan_path.exists():
shutil.rmtree(plan_path)

View File

@@ -5,9 +5,3 @@ from .algorithm import *
from .comm import *
from .compiler import *
from .buffer import *
__all__ = []
__all__ += algorithm.__all__
__all__ += comm.__all__
__all__ += compiler.__all__
__all__ += buffer.__all__

View File

@@ -7,15 +7,17 @@ from functools import cached_property
from mscclpp._mscclpp import (
Algorithm as _Algorithm,
DslAlgorithm as _DslAlgorithm,
AlgorithmType as _AlgorithmType,
Communicator,
CollectiveBufferMode,
DataType,
Executor,
ExecutionPlan,
ReduceOp,
CppAlgorithm,
CppDslAlgorithm,
CppAlgorithmType,
CppCommunicator,
CppCollectiveBufferMode,
CppDataType,
CppExecutor,
CppExecutionPlan,
CppReduceOp,
CppAlgorithmBuilder,
CppAlgorithmCollection,
)
__all__ = ["Algorithm", "AlgorithmBuilder", "AlgorithmCollection"]
@@ -45,7 +47,7 @@ class Algorithm:
"""
def __init__(self, world_size: int = 0, n_ranks_per_node: int = 0):
self._constraint = _Algorithm.Constraint(world_size, n_ranks_per_node)
self._constraint = CppAlgorithm.Constraint(world_size, n_ranks_per_node)
@property
def world_size(self) -> int:
@@ -58,23 +60,23 @@ class Algorithm:
def __init__(
self,
id: Optional[str] = None,
execution_plan: Optional[ExecutionPlan] = None,
native_handle: Optional[_Algorithm] = None,
execution_plan: Optional[CppExecutionPlan] = None,
native_handle: Optional[CppAlgorithm] = None,
tags: Optional[Dict[str, int]] = None,
constraint: Optional[Constraint] = None,
):
if execution_plan is not None:
self._algorithm = _DslAlgorithm(
self._algorithm = CppDslAlgorithm(
id,
execution_plan,
tags=tags if tags is not None else {},
constraint=constraint._constraint if constraint is not None else _Algorithm.Constraint(),
constraint=constraint._constraint if constraint is not None else CppAlgorithm.Constraint(),
)
elif native_handle is not None:
self._algorithm = native_handle
@classmethod
def create_from_native_handle(cls, handle: _Algorithm):
def create_from_native_handle(cls, handle: CppAlgorithm):
"""Create an Algorithm instance from a native C++ algorithm handle.
Args:
@@ -97,7 +99,7 @@ class Algorithm:
Returns:
A new Algorithm instance wrapping the algorithm from the capsule.
"""
handle = _Algorithm.from_native_capsule(obj)
handle = CppAlgorithm.from_native_capsule(obj)
return cls(native_handle=handle)
@cached_property
@@ -121,7 +123,7 @@ class Algorithm:
return self._algorithm.tags
@cached_property
def buffer_mode(self) -> CollectiveBufferMode:
def buffer_mode(self) -> CppCollectiveBufferMode:
"""The buffer mode supported by this algorithm (IN_PLACE, OUT_OF_PLACE, or ANY)."""
return self._algorithm.buffer_mode
@@ -131,7 +133,7 @@ class Algorithm:
Returns:
True if this algorithm is defined using DSL/execution plan, False otherwise.
"""
if self._algorithm.type == _AlgorithmType.DSL:
if self._algorithm.type == CppAlgorithmType.DSL:
return True
return False
@@ -141,21 +143,21 @@ class Algorithm:
Returns:
True if this algorithm is implemented natively, False otherwise.
"""
if self._algorithm.type == _AlgorithmType.NATIVE:
if self._algorithm.type == CppAlgorithmType.NATIVE:
return True
return False
def execute(
self,
comm: Communicator,
comm: CppCommunicator,
input_buffer: int,
output_buffer: int,
input_size: int,
output_size: int,
dtype: DataType,
op: ReduceOp = ReduceOp.NOP,
dtype: CppDataType,
op: CppReduceOp = CppReduceOp.NOP,
stream: int = 0,
executor: Optional[Executor] = None,
executor: Optional[CppExecutor] = None,
nblocks=0,
nthreads_per_block=0,
extras: Optional[Dict[str, int]] = None,
@@ -196,7 +198,7 @@ class Algorithm:
class AlgorithmBuilder:
def __init__(self, algorithm_builder: _AlgorithmBuilder):
def __init__(self, algorithm_builder: CppAlgorithmBuilder):
self._algorithm_builder = algorithm_builder
def build(self) -> Algorithm:
@@ -204,7 +206,7 @@ class AlgorithmBuilder:
class AlgorithmCollection:
def __init__(self, native_collection: _AlgorithmCollection):
def __init__(self, native_collection: CppAlgorithmCollection):
self._native_collection = native_collection
self._algorithms = [Algorithm.create_from_native_handle(algo) for algo in self._native_collection.to_list()]

View File

@@ -6,7 +6,7 @@ from typing import Union, Tuple
import cupy as cp
import numpy as np
from mscclpp._mscclpp import RawGpuBuffer
from mscclpp._mscclpp import CppRawGpuBuffer
__all__ = ["GpuBuffer"]
@@ -25,6 +25,6 @@ class GpuBuffer(cp.ndarray):
if any(s <= 0 for s in shape):
raise ValueError("Shape must be positive.")
# Create the buffer
buffer = RawGpuBuffer(np.prod(shape) * np.dtype(dtype).itemsize)
buffer = CppRawGpuBuffer(np.prod(shape) * np.dtype(dtype).itemsize)
memptr = cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buffer.data(), buffer.bytes(), buffer), 0)
return cp.ndarray(shape, dtype=dtype, strides=strides, order=order, memptr=memptr)

View File

@@ -6,18 +6,18 @@ from typing import Type
import cupy as cp
from mscclpp._mscclpp import (
Communicator,
Connection,
CppCommunicator,
CppConnection,
connect_nvls_collective,
EndpointConfig,
Semaphore,
ProxyService,
RegisteredMemory,
PortChannel,
MemoryChannel,
TcpBootstrap,
Transport,
TransportFlags,
CppEndpointConfig,
CppSemaphore,
CppProxyService,
CppRegisteredMemory,
CppPortChannel,
CppMemoryChannel,
CppTcpBootstrap,
CppTransport,
CppTransportFlags,
)
import mpi4py
import numpy as np
@@ -32,7 +32,7 @@ class CommGroup:
self, mpi_comm: mpi4py.MPI.Comm = None, interfaceIpPortTrio: str = "", rank: int = None, size: int = None
):
if interfaceIpPortTrio == "":
self.bootstrap = TcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
uniq_id = None
if mpi_comm.rank == 0:
# similar to NCCL's unique id
@@ -41,15 +41,15 @@ class CommGroup:
self.bootstrap.initialize(uniq_id_global)
elif mpi_comm:
# use this instead
self.bootstrap = TcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
self.bootstrap = CppTcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
self.bootstrap.initialize(interfaceIpPortTrio)
elif not interfaceIpPortTrio == "":
assert rank >= 0 and size >= 1
self.bootstrap = TcpBootstrap.create(rank, size)
self.bootstrap = CppTcpBootstrap.create(rank, size)
self.bootstrap.initialize(interfaceIpPortTrio)
else:
raise RuntimeError("Either the interface or mpi_group need to be specified")
self.communicator = Communicator(self.bootstrap)
self.communicator = CppCommunicator(self.bootstrap)
self.my_rank = self.bootstrap.get_rank()
self.nranks = self.bootstrap.get_n_ranks()
self.nranks_per_node = self.bootstrap.get_n_ranks_per_node()
@@ -63,43 +63,43 @@ class CommGroup:
def recv(self, tensor: np.ndarray, peer: int, tag: int):
self.bootstrap.recv(tensor.ctypes.data, tensor.size * tensor.itemsize, peer, tag)
def my_ib_device(self, local_rank: int) -> Transport:
def my_ib_device(self, local_rank: int) -> CppTransport:
if local_rank == 0:
return Transport.IB0
return CppTransport.IB0
if local_rank == 1:
return Transport.IB1
return CppTransport.IB1
if local_rank == 2:
return Transport.IB2
return CppTransport.IB2
if local_rank == 3:
return Transport.IB3
return CppTransport.IB3
if local_rank == 4:
return Transport.IB4
return CppTransport.IB4
if local_rank == 5:
return Transport.IB5
return CppTransport.IB5
if local_rank == 6:
return Transport.IB6
return CppTransport.IB6
if local_rank == 7:
return Transport.IB7
return CppTransport.IB7
else:
assert False # only 8 IBs are supported
def make_connection(
self,
all_ranks: list[int],
endpoints: EndpointConfig | Transport | dict[int, EndpointConfig] | dict[int, Transport],
endpoints: CppEndpointConfig | CppTransport | dict[int, CppEndpointConfig] | dict[int, CppTransport],
use_switch: bool = False,
) -> dict[int, Connection]:
if type(endpoints) is Transport:
endpoints = EndpointConfig(endpoints)
) -> dict[int, CppConnection]:
if type(endpoints) is CppTransport:
endpoints = CppEndpointConfig(endpoints)
elif type(endpoints) is dict:
endpoints = {k: EndpointConfig(v) if type(v) is Transport else v for k, v in endpoints.items()}
endpoints = {k: CppEndpointConfig(v) if type(v) is CppTransport else v for k, v in endpoints.items()}
connections = {}
for rank in all_ranks:
if type(endpoints) is dict:
endpoint = endpoints[rank]
else:
endpoint = endpoints
if endpoint.transport == Transport.CudaIpc and use_switch:
if endpoint.transport == CppTransport.CudaIpc and use_switch:
return connect_nvls_collective(self.communicator, all_ranks, 2**30)
else:
connections[rank] = self.communicator.connect(endpoint, rank)
@@ -107,8 +107,8 @@ class CommGroup:
return connections
def register_tensor_with_connections(
self, tensor: Type[cp.ndarray] | Type[np.ndarray], connections: dict[int, Connection]
) -> dict[int, RegisteredMemory]:
self, tensor: Type[cp.ndarray] | Type[np.ndarray], connections: dict[int, CppConnection]
) -> dict[int, CppRegisteredMemory]:
local_reg_memory = self.register_local_memory(tensor, connections)
all_registered_memories = {}
all_registered_memories[self.my_rank] = local_reg_memory
@@ -121,8 +121,8 @@ class CommGroup:
return all_registered_memories
def _register_memory_with_connections(
self, memory: RegisteredMemory, connections: dict[int, Connection]
) -> dict[int, RegisteredMemory]:
self, memory: CppRegisteredMemory, connections: dict[int, CppConnection]
) -> dict[int, CppRegisteredMemory]:
all_registered_memories = {}
all_registered_memories[self.my_rank] = memory
future_memories = {}
@@ -133,18 +133,20 @@ class CommGroup:
all_registered_memories[rank] = future_memories[rank].get()
return all_registered_memories
def make_semaphores(self, connections: dict[int, Connection]) -> dict[int, Semaphore]:
def make_semaphores(self, connections: dict[int, CppConnection]) -> dict[int, CppSemaphore]:
future_semaphores = {}
for rank in connections:
future_semaphores[rank] = self.communicator.build_semaphore(connections[rank], rank)
return {rank: future.get() for rank, future in future_semaphores.items()}
def make_memory_channels(self, tensor: cp.ndarray, connections: dict[int, Connection]) -> dict[int, MemoryChannel]:
def make_memory_channels(
self, tensor: cp.ndarray, connections: dict[int, CppConnection]
) -> dict[int, CppMemoryChannel]:
semaphores = self.make_semaphores(connections)
registered_memories = self.register_tensor_with_connections(tensor, connections)
channels = {}
for rank in connections:
channels[rank] = MemoryChannel(
channels[rank] = CppMemoryChannel(
semaphores[rank], registered_memories[rank], registered_memories[self.my_rank]
)
return channels
@@ -152,9 +154,9 @@ class CommGroup:
def make_memory_channels_with_scratch(
self,
tensor: cp.ndarray,
registeredScratchBuffer: RegisteredMemory,
connections: dict[int, Connection],
) -> dict[int, MemoryChannel]:
registeredScratchBuffer: CppRegisteredMemory,
connections: dict[int, CppConnection],
) -> dict[int, CppMemoryChannel]:
semaphores = self.make_semaphores(connections)
registered_memories = self._register_memory_with_connections(registeredScratchBuffer, connections)
channels = {}
@@ -162,17 +164,17 @@ class CommGroup:
tensor_size = (
tensor.numel() * tensor.element_size() if is_torch_tensor(tensor) else tensor.size * tensor.itemsize
)
local_registered_memory = self.communicator.register_memory(tensor_data_ptr, tensor_size, TransportFlags())
local_registered_memory = self.communicator.register_memory(tensor_data_ptr, tensor_size, CppTransportFlags())
scratch_data_ptr = registeredScratchBuffer.data()
for rank in connections:
channels[rank] = MemoryChannel(
channels[rank] = CppMemoryChannel(
semaphores[rank], registered_memories[rank], local_registered_memory, scratch_data_ptr
)
return channels
def make_port_channels(
self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection]
) -> dict[int, PortChannel]:
self, proxy_service: CppProxyService, tensor: cp.ndarray, connections: dict[int, CppConnection]
) -> dict[int, CppPortChannel]:
semaphores = self.make_semaphores(connections)
registered_memories = self.register_tensor_with_connections(tensor, connections)
memory_ids = {}
@@ -188,12 +190,12 @@ class CommGroup:
def make_port_channels_with_scratch(
self,
proxy_service: ProxyService,
proxy_service: CppProxyService,
tensor: cp.ndarray,
registeredScratchBuffer: RegisteredMemory,
connections: dict[int, Connection],
) -> dict[int, PortChannel]:
transport_flags = TransportFlags()
registeredScratchBuffer: CppRegisteredMemory,
connections: dict[int, CppConnection],
) -> dict[int, CppPortChannel]:
transport_flags = CppTransportFlags()
for rank in connections:
transport_flags |= connections[rank].transport()
data_ptr = (
@@ -223,8 +225,8 @@ class CommGroup:
return channels
def register_semaphore_with_proxy(
self, proxy_service: ProxyService, connections: dict[int, Connection]
) -> dict[int, PortChannel]:
self, proxy_service: CppProxyService, connections: dict[int, CppConnection]
) -> dict[int, CppPortChannel]:
semaphores = self.make_semaphores(connections)
semaphore_ids = {}
for rank in semaphores:
@@ -235,7 +237,7 @@ class CommGroup:
return channels
def register_memory_with_proxy(
self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection]
self, proxy_service: CppProxyService, tensor: cp.ndarray, connections: dict[int, CppConnection]
) -> dict[int, int]:
registered_memories = self.register_tensor_with_connections(tensor, connections)
memory_ids = {}
@@ -243,8 +245,8 @@ class CommGroup:
memory_ids[rank] = proxy_service.add_memory(registered_memories[rank])
return memory_ids
def register_local_memory(self, tensor: cp.ndarray, connections: dict[int, Connection]) -> RegisteredMemory:
transport_flags = TransportFlags()
def register_local_memory(self, tensor: cp.ndarray, connections: dict[int, CppConnection]) -> CppRegisteredMemory:
transport_flags = CppTransportFlags()
for rank in connections:
transport_flags |= connections[rank].transport()
data_ptr = (

View File

@@ -26,9 +26,7 @@ from mscclpp.language.program import CollectiveProgram
from mscclpp.language.utils import AlgoSpec
from mscclpp.utils import get_device_arch
from mscclpp._mscclpp import (
ExecutionPlan,
)
from mscclpp._mscclpp import CppExecutionPlan, env
logging.basicConfig(level=logging.INFO)
@@ -51,7 +49,7 @@ class DslCompiler:
into execution plans that can be run on GPUs. The compiled plans are cached
to disk for reuse.
The cache location can be configured via the `MSCCLPP_EXECUTION_PLAN_DIR`
The cache location can be configured via the `MSCCLPP_CACHE_DIR`
environment variable (defaults to `~/.cache/mscclpp`).
Example:
@@ -138,7 +136,7 @@ class DslCompiler:
)
).hexdigest()
plan_dir = os.environ.get("MSCCLPP_EXECUTION_PLAN_DIR", Path.home() / ".cache/mscclpp")
plan_dir = Path(env().cache_dir)
os.makedirs(plan_dir, exist_ok=True)
filename = f"{plan_id}.json"
plan_path = os.path.join(plan_dir, filename)
@@ -157,7 +155,7 @@ class DslCompiler:
os.remove(tmp_path)
except Exception:
Path(plan_path).unlink(missing_ok=True)
execution_plan = ExecutionPlan(plan_path, rank)
execution_plan = CppExecutionPlan(plan_path, rank)
return Algorithm(
id=plan_id,
execution_plan=execution_plan,
@@ -179,8 +177,8 @@ class NativeCodeCompiler:
based on the runtime environment. Compiled modules are cached to avoid
recompilation.
The cache location can be configured via the `MSCCLPP_NATIVE_CACHE_DIR`
environment variable (defaults to `~/.cache/mscclpp/native`).
The cache location can be configured via the `MSCCLPP_CACHE_DIR`
environment variable (defaults to `~/.cache/mscclpp`).
Attributes:
_is_hip: True if running on AMD/ROCm, False for NVIDIA/CUDA.
@@ -226,8 +224,7 @@ class NativeCodeCompiler:
"-L" + os.path.join(self._lib_home, "lib"),
"-lmscclpp",
]
cache_root = os.environ.get("MSCCLPP_NATIVE_CACHE_DIR", Path.home() / ".cache/mscclpp/native")
self._cache_dir = Path(cache_root)
self._cache_dir = Path(env().cache_dir) / "native"
self._cache_dir.mkdir(parents=True, exist_ok=True)
def _get_compiler(self) -> str:
@@ -283,7 +280,7 @@ class NativeCodeCompiler:
Note:
- The source file should include pybind11 bindings to expose functions.
- MSCCLPP headers are automatically included in the compilation.
- The module is cached in `MSCCLPP_NATIVE_CACHE_DIR` (default: ~/.cache/mscclpp/native).
- The module is cached in `MSCCLPP_CACHE_DIR` (default: ~/.cache/mscclpp).
- File locking is used to prevent race conditions during parallel compilation.
Example:

View File

@@ -2,5 +2,3 @@
# Licensed under the MIT license.
from .algorithm_collection_builder import *
__all__ = algorithm_collection_builder.__all__

View File

@@ -6,9 +6,7 @@ from typing import Union
from mscclpp._core.algorithm import Algorithm, AlgorithmBuilder, AlgorithmCollection
import atexit
from mscclpp._mscclpp import (
AlgorithmCollectionBuilder as _AlgorithmCollectionBuilder,
)
from mscclpp._mscclpp import CppAlgorithmCollectionBuilder
__all__ = ["AlgorithmCollectionBuilder"]
@@ -24,12 +22,12 @@ class AlgorithmCollectionBuilder:
@classmethod
def reset(cls):
if cls._instance is not None:
_AlgorithmCollectionBuilder.reset()
CppAlgorithmCollectionBuilder.reset()
cls._instance = None
def __init__(self):
if not hasattr(self, "_initialized"):
self._builder = _AlgorithmCollectionBuilder.get_instance()
self._builder = CppAlgorithmCollectionBuilder.get_instance()
self._initialized = True
def add_algorithm_builder(self, algorithm_builder: Union[AlgorithmBuilder, Algorithm]):

View File

@@ -11,7 +11,7 @@ from typing import Any, Type, Union
import cupy as cp
import numpy as np
from mscclpp._mscclpp import DataType
from mscclpp._mscclpp import CppDataType as DataType
try:
import torch

View File

@@ -58,8 +58,7 @@ Env::Env()
socketFamily(readEnv<std::string>("MSCCLPP_SOCKET_FAMILY", "")),
socketIfname(readEnv<std::string>("MSCCLPP_SOCKET_IFNAME", "")),
commId(readEnv<std::string>("MSCCLPP_COMM_ID", "")),
executionPlanDir(readEnv<std::string>("MSCCLPP_EXECUTION_PLAN_DIR",
readEnv<std::string>("HOME", "~") + "/.cache/mscclpp_default")),
cacheDir(readEnv<std::string>("MSCCLPP_CACHE_DIR", readEnv<std::string>("HOME", "~") + "/.cache/mscclpp")),
npkitDumpDir(readEnv<std::string>("MSCCLPP_NPKIT_DUMP_DIR", "")),
cudaIpcUseDefaultStream(readEnv<bool>("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", false)),
ncclSharedLibPath(readEnv<std::string>("MSCCLPP_NCCL_LIB_PATH", "")),
@@ -85,7 +84,7 @@ std::shared_ptr<Env> env() {
logEnv("MSCCLPP_SOCKET_FAMILY", globalEnv->socketFamily);
logEnv("MSCCLPP_SOCKET_IFNAME", globalEnv->socketIfname);
logEnv("MSCCLPP_COMM_ID", globalEnv->commId);
logEnv("MSCCLPP_EXECUTION_PLAN_DIR", globalEnv->executionPlanDir);
logEnv("MSCCLPP_CACHE_DIR", globalEnv->cacheDir);
logEnv("MSCCLPP_NPKIT_DUMP_DIR", globalEnv->npkitDumpDir);
logEnv("MSCCLPP_CUDAIPC_USE_DEFAULT_STREAM", globalEnv->cudaIpcUseDefaultStream);
logEnv("MSCCLPP_NCCL_LIB_PATH", globalEnv->ncclSharedLibPath);

View File

@@ -105,13 +105,13 @@ AlgorithmCollection AlgorithmCollectionBuilder::buildDefaultDslAlgorithms(int ra
return oss.str();
};
std::string planDir = env()->executionPlanDir;
auto planDir = std::filesystem::path(env()->cacheDir) / "default";
if (!std::filesystem::exists(planDir)) {
INFO(ALGO, "Plan directory does not exist: ", planDir);
INFO(ALGO, "Default plan directory does not exist: ", planDir);
return collection;
}
for (const auto& config : defaultAlgoConfigs) {
std::string planPath = planDir + "/" + config.filename;
auto planPath = planDir / config.filename;
INFO(ALGO, "Loading plan: ", planPath);
if (!std::filesystem::exists(planPath)) {
INFO(ALGO, "Plan file does not exist: ", planPath);