mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 09:17:06 +00:00
## Summary - **Add `fp8_e4m3b15` datatype**: A software-defined FP8 type with 4 exponent bits, 3 mantissa bits, and bias=15 (max finite value: 0.9375). Implemented entirely in software with no HW dependency, using Triton-style bit manipulation through fp16 as intermediate for efficient conversion. - **Add mixed-precision accumulation for allreduce**: All allreduce algorithm variants (packet, NVLS packet, fullmesh, RSAG zero-copy, and others) now support a configurable `accumDtype` parameter, enabling FP8 inputs to be reduced in float16 or float32 for higher accuracy. - **Propagate `accumDtype` through the full API**: The new parameter is threaded from `Algorithm::execute()` → `NativeAlgorithm` → `KernelFunc` → dispatch → CUDA kernels, with `DataType::AUTO` as the default (resolves to input dtype at runtime). - **Add FP8 accumulation correctness tests**: New `test_fp8_accum.py` validates that higher-precision accumulation produces results at least as accurate as native FP8 accumulation across multiple algorithms and sizes. Skipped on CUDA SM < 89 (pre-Hopper); runs on HIP/ROCm. - **Add `test_fp8_accum.py` to CI**: Azure Pipeline `ut.yml` now runs FP8 accumulation tests alongside existing pytests. - **NCCL shim logging cleanup**: Migrated `printf`-style `WARN`/`INFO` calls to streaming-style logging. ## Key files | Area | Files | |------|-------| | New datatype + vector ops | `include/mscclpp/gpu_data_types.hpp` | | Accumulation reduce helpers | `src/core/include/reduce_kernel.hpp` | | Algorithm API (`accumDtype`) | `include/mscclpp/algorithm.hpp`, `src/core/algorithm.cc` | | Allreduce kernels | `src/ext/collectives/allreduce/*.cu` | | Dispatch + common | `src/ext/collectives/include/allreduce/common.hpp` | | Python bindings | `python/csrc/algorithm.cpp`, `python/mscclpp/_core/algorithm.py` | | Tests | `python/test/test_fp8_accum.py` | | CI | `.azure-pipelines/templates/ut.yml` | ## Test plan - [x] CI passes on H100 (CUDA SM 90) — full FP8 E4M3 + E4M3B15 accumulation tests - [x] CI passes on A100 (CUDA SM 80) — FP8 tests correctly skipped - [x] CI passes on MI300X (ROCm) — FP8 tests run via HIP - [x] Existing `test_mscclpp.py` tests continue to pass - [x] NCCL shim builds and runs correctly with new `accumDtype` defaults 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
138 lines
7.1 KiB
C++
138 lines
7.1 KiB
C++
// Copyright (c) Microsoft Corporation.
|
|
// Licensed under the MIT License.
|
|
|
|
#include <nanobind/nanobind.h>
|
|
#include <nanobind/stl/function.h>
|
|
#include <nanobind/stl/pair.h>
|
|
#include <nanobind/stl/shared_ptr.h>
|
|
#include <nanobind/stl/string.h>
|
|
#include <nanobind/stl/unordered_map.h>
|
|
#include <nanobind/stl/vector.h>
|
|
|
|
#include <cstring>
|
|
#include <mscclpp/algorithm.hpp>
|
|
|
|
namespace nb = nanobind;
|
|
using namespace mscclpp;
|
|
|
|
void register_algorithm(nb::module_& m) {
|
|
nb::enum_<CollectiveBufferMode>(m, "CppCollectiveBufferMode")
|
|
.value("ANY", CollectiveBufferMode::Any)
|
|
.value("IN_PLACE", CollectiveBufferMode::InPlace)
|
|
.value("OUT_OF_PLACE", CollectiveBufferMode::OutOfPlace);
|
|
|
|
nb::enum_<AlgorithmType>(m, "CppAlgorithmType")
|
|
.value("NATIVE", AlgorithmType::Native)
|
|
.value("DSL", AlgorithmType::DSL);
|
|
|
|
nb::enum_<CommResult>(m, "CppCommResult")
|
|
.value("COMM_SUCCESS", CommResult::CommSuccess)
|
|
.value("COMM_UNHANDLED_CUDA_ERROR", CommResult::CommUnhandledCudaError)
|
|
.value("COMM_SYSTEM_ERROR", CommResult::CommSystemError)
|
|
.value("COMM_INTERNAL_ERROR", CommResult::CommInternalError)
|
|
.value("COMM_INVALID_ARGUMENT", CommResult::CommInvalidArgument)
|
|
.value("COMM_INVALID_USAGE", CommResult::CommInvalidUsage)
|
|
.value("COMM_REMOTE_ERROR", CommResult::CommRemoteError)
|
|
.value("COMM_IN_PROGRESS", CommResult::CommInProgress)
|
|
.value("COMM_NUM_RESULTS", CommResult::CommNumResults);
|
|
|
|
nb::enum_<ReduceOp>(m, "CppReduceOp")
|
|
.value("SUM", ReduceOp::SUM)
|
|
.value("MIN", ReduceOp::MIN)
|
|
.value("NOP", ReduceOp::NOP);
|
|
|
|
auto algorithmClass =
|
|
nb::class_<Algorithm>(m, "CppAlgorithm")
|
|
.def_static(
|
|
"from_native_capsule",
|
|
[](nb::capsule cap) {
|
|
const char* name = cap.name();
|
|
if (name == nullptr || std::strcmp(name, ALGORITHM_NATIVE_CAPSULE_NAME) != 0) {
|
|
throw nb::type_error("Invalid capsule: expected 'mscclpp::AlgorithmPtr'");
|
|
}
|
|
void* data = cap.data();
|
|
if (data == nullptr) {
|
|
throw nb::value_error("Failed to get pointer from capsule");
|
|
}
|
|
return *static_cast<std::shared_ptr<Algorithm>*>(data);
|
|
},
|
|
nb::arg("capsule"))
|
|
.def_prop_ro("name", &Algorithm::name)
|
|
.def_prop_ro("collective", &Algorithm::collective)
|
|
.def_prop_ro("message_range", &Algorithm::messageRange)
|
|
.def(
|
|
"set_message_size_range",
|
|
[](Algorithm& self, size_t minMessageSize, size_t maxMessageSize) {
|
|
self.setMessageSizeRange(minMessageSize, maxMessageSize);
|
|
},
|
|
nb::arg("min_message_size"), nb::arg("max_message_size"))
|
|
.def_prop_ro("tags", &Algorithm::tags)
|
|
.def_prop_ro("buffer_mode", &Algorithm::bufferMode)
|
|
.def_prop_ro("constraint", &Algorithm::constraint)
|
|
.def_prop_ro("type", &Algorithm::type)
|
|
.def(
|
|
"execute",
|
|
[](Algorithm& self, std::shared_ptr<Communicator> comm, uintptr_t input, uintptr_t output,
|
|
size_t inputSize, size_t outputSize, DataType dtype, ReduceOp op, uintptr_t stream,
|
|
std::shared_ptr<Executor> executor, int nBlocks, int nThreadsPerBlock, bool symmetricMemory,
|
|
std::unordered_map<std::string, uintptr_t> extras, int32_t accumDtype) {
|
|
return self.execute(comm, reinterpret_cast<const void*>(input), reinterpret_cast<void*>(output),
|
|
inputSize, outputSize, dtype, op, reinterpret_cast<cudaStream_t>(stream), executor,
|
|
nBlocks, nThreadsPerBlock, symmetricMemory, extras,
|
|
static_cast<DataType>(accumDtype));
|
|
},
|
|
nb::arg("comm"), nb::arg("input"), nb::arg("output"), nb::arg("input_size"), nb::arg("output_size"),
|
|
nb::arg("dtype"), nb::arg("op") = ReduceOp::NOP, nb::arg("stream") = 0, nb::arg("executor") = nullptr,
|
|
nb::arg("n_blocks") = 0, nb::arg("n_threads_per_block") = 0, nb::arg("symmetric_memory") = false,
|
|
nb::arg("extras") = std::unordered_map<std::string, uintptr_t>(),
|
|
nb::arg("accum_dtype") = static_cast<int32_t>(DataType::AUTO))
|
|
.def("reset", &Algorithm::reset);
|
|
|
|
nb::class_<Algorithm::Constraint>(algorithmClass, "Constraint")
|
|
.def(nb::init<>())
|
|
.def(nb::init<int, int>(), nb::arg("world_size"), nb::arg("n_ranks_per_node"))
|
|
.def_rw("world_size", &Algorithm::Constraint::worldSize)
|
|
.def_rw("n_ranks_per_node", &Algorithm::Constraint::nRanksPerNode);
|
|
|
|
nb::class_<AlgorithmBuilder>(m, "CppAlgorithmBuilder").def("build", &AlgorithmBuilder::build);
|
|
|
|
nb::class_<DslAlgorithm, Algorithm>(m, "CppDslAlgorithm")
|
|
.def(nb::init<std::string, ExecutionPlan, std::unordered_map<std::string, uint64_t>, Algorithm::Constraint>(),
|
|
nb::arg("id"), nb::arg("plan"), nb::arg("tags") = std::unordered_map<std::string, uint64_t>(),
|
|
nb::arg("constraint") = Algorithm::Constraint())
|
|
.def("build", &DslAlgorithm::build);
|
|
|
|
nb::class_<AlgorithmCollection>(m, "CppAlgorithmCollection")
|
|
.def("register_algorithm", &AlgorithmCollection::registerAlgorithm, nb::arg("collective"), nb::arg("algo_name"),
|
|
nb::arg("algorithm"))
|
|
.def("get_algorithms_by_collective", &AlgorithmCollection::getAlgorithmsByCollective, nb::arg("collective"))
|
|
.def("to_list", &AlgorithmCollection::getAllAlgorithms);
|
|
|
|
nb::class_<CollectiveRequest>(m, "CppCollectiveRequest")
|
|
.def_ro("world_size", &CollectiveRequest::worldSize)
|
|
.def_ro("n_ranks_per_node", &CollectiveRequest::nRanksPerNode)
|
|
.def_ro("rank", &CollectiveRequest::rank)
|
|
.def_prop_ro("input_buffer",
|
|
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.inputBuffer); })
|
|
.def_prop_ro("output_buffer",
|
|
[](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.outputBuffer); })
|
|
.def_ro("message_size", &CollectiveRequest::messageSize)
|
|
.def_prop_ro("stream", [](const CollectiveRequest& self) { return reinterpret_cast<uintptr_t>(self.stream); })
|
|
.def_prop_ro("collective", [](const CollectiveRequest& self) { return self.collective; })
|
|
.def_ro("dtype", &CollectiveRequest::dtype)
|
|
.def_prop_ro("hints", [](const CollectiveRequest& self) { return self.hints; })
|
|
.def("buffer_mode", &CollectiveRequest::bufferMode);
|
|
|
|
m.def(
|
|
"cpp_get_flag_buffer",
|
|
[]() {
|
|
auto [buffer, size] = getFlagBuffer();
|
|
uintptr_t ptr = reinterpret_cast<uintptr_t>(buffer.get());
|
|
// Transfer shared_ptr ownership into a capsule so Python's GC manages the lifetime.
|
|
auto prevent = std::make_unique<std::shared_ptr<void>>(std::move(buffer));
|
|
nb::capsule owner(prevent.get(), [](void* p) noexcept { delete static_cast<std::shared_ptr<void>*>(p); });
|
|
prevent.release(); // capsule now owns the pointer
|
|
return nb::make_tuple(ptr, size, owner);
|
|
},
|
|
"Get the default flag buffer. Returns a tuple of (buffer_ptr, buffer_size, owner).");
|
|
} |