Files
mscclpp/python/csrc/algorithm.cpp

136 lines
7.0 KiB
C++

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <nanobind/nanobind.h>
#include <nanobind/stl/function.h>
#include <nanobind/stl/pair.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/string.h>
#include <nanobind/stl/unordered_map.h>
#include <nanobind/stl/vector.h>
#include <cstring>
#include <mscclpp/algorithm.hpp>
namespace nb = nanobind;
using namespace mscclpp;
void register_algorithm(nb::module_& m) {
nb::enum_<CollectiveBufferMode>(m, "CppCollectiveBufferMode")
.value("ANY", CollectiveBufferMode::Any)
.value("IN_PLACE", CollectiveBufferMode::InPlace)
.value("OUT_OF_PLACE", CollectiveBufferMode::OutOfPlace);
nb::enum_<AlgorithmType>(m, "CppAlgorithmType")
.value("NATIVE", AlgorithmType::Native)
.value("DSL", AlgorithmType::DSL);
nb::enum_<CommResult>(m, "CppCommResult")
.value("COMM_SUCCESS", CommResult::CommSuccess)
.value("COMM_UNHANDLED_CUDA_ERROR", CommResult::CommUnhandledCudaError)
.value("COMM_SYSTEM_ERROR", CommResult::CommSystemError)
.value("COMM_INTERNAL_ERROR", CommResult::CommInternalError)
.value("COMM_INVALID_ARGUMENT", CommResult::CommInvalidArgument)
.value("COMM_INVALID_USAGE", CommResult::CommInvalidUsage)
.value("COMM_REMOTE_ERROR", CommResult::CommRemoteError)
.value("COMM_IN_PROGRESS", CommResult::CommInProgress)
.value("COMM_NUM_RESULTS", CommResult::CommNumResults);
nb::enum_<ReduceOp>(m, "CppReduceOp")
.value("SUM", ReduceOp::SUM)
.value("MIN", ReduceOp::MIN)
.value("NOP", ReduceOp::NOP);
auto algorithmClass =
nb::class_<Algorithm>(m, "CppAlgorithm")
.def_static(
"from_native_capsule",
[](nb::capsule cap) {
const char* name = cap.name();
if (name == nullptr || std::strcmp(name, ALGORITHM_NATIVE_CAPSULE_NAME) != 0) {
throw nb::type_error("Invalid capsule: expected 'mscclpp::AlgorithmPtr'");
}
void* data = cap.data();
if (data == nullptr) {
throw nb::value_error("Failed to get pointer from capsule");
}
return *static_cast<std::shared_ptr<Algorithm>*>(data);
},
nb::arg("capsule"))
.def_prop_ro("name", &Algorithm::name)
.def_prop_ro("collective", &Algorithm::collective)
.def_prop_ro("message_range", &Algorithm::messageRange)
.def(
"set_message_size_range",
[](Algorithm& self, size_t minMessageSize, size_t maxMessageSize) {
self.setMessageSizeRange(minMessageSize, maxMessageSize);
},
nb::arg("min_message_size"), nb::arg("max_message_size"))
.def_prop_ro("tags", &Algorithm::tags)
.def_prop_ro("buffer_mode", &Algorithm::bufferMode)
.def_prop_ro("constraint", &Algorithm::constraint)
.def_prop_ro("type", &Algorithm::type)
.def(
"execute",
[](Algorithm& self, std::shared_ptr<Communicator> comm, uintptr_t input, uintptr_t output,
size_t inputSize, size_t outputSize, DataType dtype, ReduceOp op, uintptr_t stream,
std::shared_ptr<Executor> executor, int nBlocks, int nThreadsPerBlock, bool symmetricMemory,
std::unordered_map<std::string, uintptr_t> extras) {
return self.execute(comm, reinterpret_cast<const void*>(input), reinterpret_cast<void*>(output),
inputSize, outputSize, dtype, op, reinterpret_cast<cudaStream_t>(stream), executor,
nBlocks, nThreadsPerBlock, symmetricMemory, extras);
},
nb::arg("comm"), nb::arg("input"), nb::arg("output"), nb::arg("input_size"), nb::arg("output_size"),
nb::arg("dtype"), nb::arg("op") = ReduceOp::NOP, nb::arg("stream") = 0, nb::arg("executor") = nullptr,
nb::arg("n_blocks") = 0, nb::arg("n_threads_per_block") = 0, nb::arg("symmetric_memory") = false,
nb::arg("extras") = std::unordered_map<std::string, uintptr_t>())
.def("reset", &Algorithm::reset);
nb::class_<Algorithm::Constraint>(algorithmClass, "Constraint")
.def(nb::init<>())
.def(nb::init<int, int>(), nb::arg("world_size"), nb::arg("n_ranks_per_node"))
.def_rw("world_size", &Algorithm::Constraint::worldSize)
.def_rw("n_ranks_per_node", &Algorithm::Constraint::nRanksPerNode);
nb::class_<AlgorithmBuilder>(m, "CppAlgorithmBuilder").def("build", &AlgorithmBuilder::build);
nb::class_<DslAlgorithm, Algorithm>(m, "CppDslAlgorithm")
.def(nb::init<std::string, ExecutionPlan, std::unordered_map<std::string, uint64_t>, Algorithm::Constraint>(),
nb::arg("id"), nb::arg("plan"), nb::arg("tags") = std::unordered_map<std::string, uint64_t>(),
nb::arg("constraint") = Algorithm::Constraint())
.def("build", &DslAlgorithm::build);
nb::class_<AlgorithmCollection>(m, "CppAlgorithmCollection")
.def("register_algorithm", &AlgorithmCollection::registerAlgorithm, nb::arg("collective"), nb::arg("algo_name"),
nb::arg("algorithm"))
.def("get_algorithms_by_collective", &AlgorithmCollection::getAlgorithmsByCollective, nb::arg("collective"))
.def("to_list", &AlgorithmCollection::getAllAlgorithms);
nb::class_<CollectiveRequest>(m, "CppCollectiveRequest")
.def_ro("world_size", &CollectiveRequest::worldSize)
.def_ro("n_ranks_per_node", &CollectiveRequest::nRanksPerNode)
.def_ro("rank", &CollectiveRequest::rank)
.def_prop_ro("input_buffer",
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.inputBuffer); })
.def_prop_ro("output_buffer",
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.outputBuffer); })
.def_ro("message_size", &CollectiveRequest::messageSize)
.def_prop_ro("stream", [](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.stream); })
.def_prop_ro("collective", [](const CollectiveRequest& self) { return self.collective; })
.def_ro("dtype", &CollectiveRequest::dtype)
.def_prop_ro("hints", [](const CollectiveRequest& self) { return self.hints; })
.def("buffer_mode", &CollectiveRequest::bufferMode);
m.def(
"cpp_get_flag_buffer",
[]() {
auto [buffer, size] = getFlagBuffer();
uintptr_t ptr = reinterpret_cast<uintptr_t>(buffer.get());
// Transfer shared_ptr ownership into a capsule so Python's GC manages the lifetime.
auto prevent = std::make_unique<std::shared_ptr<void>>(std::move(buffer));
nb::capsule owner(prevent.get(), [](void* p) noexcept { delete static_cast<std::shared_ptr<void>*>(p); });
prevent.release(); // capsule now owns the pointer
return nb::make_tuple(ptr, size, owner);
},
"Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size, owner).");
}