Files
mscclpp/src/ext/nccl/algorithm_selector.cc
Binyang Li 96a72bbd3e Support E4M3B15 datatype (#765)
## Summary

- **Add `fp8_e4m3b15` datatype**: A software-defined FP8 type with 4
exponent bits, 3 mantissa bits, and bias=15 (max finite value: 0.9375).
Implemented entirely in software with no HW dependency, using
Triton-style bit manipulation through fp16 as intermediate for efficient
conversion.
- **Add mixed-precision accumulation for allreduce**: All allreduce
algorithm variants (packet, NVLS packet, fullmesh, RSAG zero-copy, and
others) now support a configurable `accumDtype` parameter, enabling FP8
inputs to be reduced in float16 or float32 for higher accuracy.
- **Propagate `accumDtype` through the full API**: The new parameter is
threaded from `Algorithm::execute()` → `NativeAlgorithm` → `KernelFunc`
→ dispatch → CUDA kernels, with `DataType::AUTO` as the default
(resolves to input dtype at runtime).
- **Add FP8 accumulation correctness tests**: New `test_fp8_accum.py`
validates that higher-precision accumulation produces results at least
as accurate as native FP8 accumulation across multiple algorithms and
sizes. Skipped on CUDA SM < 89 (pre-Hopper); runs on HIP/ROCm.
- **Add `test_fp8_accum.py` to CI**: Azure Pipeline `ut.yml` now runs
FP8 accumulation tests alongside existing pytests.
- **NCCL shim logging cleanup**: Migrated `printf`-style `WARN`/`INFO`
calls to streaming-style logging.

## Key files

| Area | Files |
|------|-------|
| New datatype + vector ops | `include/mscclpp/gpu_data_types.hpp` |
| Accumulation reduce helpers | `src/core/include/reduce_kernel.hpp` |
| Algorithm API (`accumDtype`) | `include/mscclpp/algorithm.hpp`,
`src/core/algorithm.cc` |
| Allreduce kernels | `src/ext/collectives/allreduce/*.cu` |
| Dispatch + common | `src/ext/collectives/include/allreduce/common.hpp`
|
| Python bindings | `python/csrc/algorithm.cpp`,
`python/mscclpp/_core/algorithm.py` |
| Tests | `python/test/test_fp8_accum.py` |
| CI | `.azure-pipelines/templates/ut.yml` |

## Test plan

- [x] CI passes on H100 (CUDA SM 90) — full FP8 E4M3 + E4M3B15
accumulation tests
- [x] CI passes on A100 (CUDA SM 80) — FP8 tests correctly skipped
- [x] CI passes on MI300X (ROCm) — FP8 tests run via HIP
- [x] Existing `test_mscclpp.py` tests continue to pass
- [x] NCCL shim builds and runs correctly with new `accumDtype` defaults

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-07 13:37:02 -07:00

187 lines
6.9 KiB
C++

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "algorithm_selector.hpp"
#include <mscclpp/env.hpp>
#include <mscclpp/utils.hpp>
#include "debug.h"
namespace mscclpp {
namespace nccl {
static bool isNvlsSupportedForDataType(const AlgorithmSelectorConfig& config, DataType dtype) {
bool nvlsSupported = config.nvlsSupported;
// NVLS does not support uint8_t (no hardware support for byte-level reduction)
// NVLS also does not support float8_e4m3b15 (software-defined type with no hardware NVLS reduction support)
if (dtype == DataType::UINT8 || dtype == DataType::FLOAT8_E4M3B15) {
return false;
}
const bool isFp8 = dtype == DataType::FLOAT8_E4M3 || dtype == DataType::FLOAT8_E5M2;
if (!isFp8) {
return nvlsSupported;
}
// FP8 handling
#if !defined(__HIP_PLATFORM_AMD__)
// NVLS does not support FP8 on devices with compute capability < 10
if (config.computeCapability.first < 10) {
return false;
}
#if (defined(__CUDA_ARCH_SPECIFIC__) || defined(__CUDA_ARCH_FAMILY_SPECIFIC__))
return true;
#else
return false;
#endif
#else
return nvlsSupported;
#endif
}
bool matchExecutionPlan(std::shared_ptr<DslAlgorithm> algo, const CollectiveRequest& request) {
bool worldSizeMatch = algo->constraint().worldSize == request.worldSize;
bool ranksPerNodeMatch = algo->constraint().nRanksPerNode == request.nRanksPerNode;
bool collectiveMatch = algo->collective() == request.collective;
bool bufferModeMatch = algo->bufferMode() == CollectiveBufferMode::Any || request.bufferMode() == algo->bufferMode();
size_t effectiveSize =
(request.collective == "allgather") ? (request.messageSize * request.worldSize) : request.messageSize;
bool minSizeMatch = effectiveSize >= algo->messageRange().first;
bool maxSizeMatch = effectiveSize <= algo->messageRange().second;
bool result =
worldSizeMatch && ranksPerNodeMatch && collectiveMatch && bufferModeMatch && minSizeMatch && maxSizeMatch;
return result;
}
static std::shared_ptr<Algorithm> selectSingleNodeAllreduceBlackwell(
const std::unordered_map<std::string, std::shared_ptr<Algorithm>>& algoMap, const CollectiveRequest& request,
const AlgorithmSelectorConfig& config) {
const size_t messageSize = request.messageSize;
const bool nvlsSupported = isNvlsSupportedForDataType(config, request.dtype);
// Small messages always use NVLS packet algorithm
if (messageSize <= (1 << 15)) { // <= 32KB
return algoMap.at("default_allreduce_nvls_packet");
}
if (!config.symmetricMemory) {
if (messageSize <= (1 << 21)) { // <= 2MB
return algoMap.at("default_allreduce_packet");
}
if (config.inCaptureMode) {
// CUDA graph mode: setup new connections each time (zero-copy for graph)
return algoMap.at("default_allreduce_rsag_zero_copy");
}
// Non-graph mode: use non-zero-copy algorithms
if (messageSize <= (1 << 23)) { // <= 8MB
return algoMap.at("default_allreduce_rsag");
}
return algoMap.at("default_allreduce_rsag_pipeline");
}
// Symmetric memory path: can use cached memory handles
const bool useNvlsWithZeroCopy = nvlsSupported && config.isCuMemMapAllocated;
if (messageSize <= (1 << 16) || (messageSize <= (1 << 20) && !useNvlsWithZeroCopy)) { // <= 64KB or <= 1MB
return algoMap.at("default_allreduce_packet");
}
if (useNvlsWithZeroCopy) {
return algoMap.at("default_allreduce_nvls_zero_copy");
}
return algoMap.at("default_allreduce_rsag_zero_copy");
}
std::shared_ptr<Algorithm> selectSingleNodeAllreduce(
const std::unordered_map<std::string, std::shared_ptr<Algorithm>>& algoMap, const CollectiveRequest& request,
const AlgorithmSelectorConfig& config) {
// Use Blackwell-specific selection for compute capability 10.x
if (config.computeCapability.first == 10) {
return selectSingleNodeAllreduceBlackwell(algoMap, request, config);
}
const size_t messageSize = request.messageSize;
// Determine NVLS availability based on data type and device capability
const bool nvlsSupported = isNvlsSupportedForDataType(config, request.dtype);
const bool useNvlsWithZeroCopy = nvlsSupported && config.symmetricMemory && config.isCuMemMapAllocated;
// Very small messages: use allpair packet algorithm
if (messageSize <= (1 << 14)) { // <= 16KB
return algoMap.at("default_allreduce_allpair_packet");
}
// Small messages with NVLS support
if (messageSize <= (1 << 15) && nvlsSupported) { // <= 32KB
return algoMap.at("default_allreduce_nvls_packet");
}
// Medium messages: use packet algorithm
if (messageSize <= (1 << 16) || (messageSize <= (1 << 20) && !useNvlsWithZeroCopy)) { // <= 64KB or <= 1MB
return algoMap.at("default_allreduce_packet");
}
// Large messages with NVLS zero-copy support
if (nvlsSupported && useNvlsWithZeroCopy) {
return algoMap.at("default_allreduce_nvls_zero_copy");
}
// Large messages with NVLS but without zero-copy
if (nvlsSupported) {
if (messageSize < (1 << 24)) { // < 16MB
return algoMap.at("default_allreduce_nvls_warp_pipeline");
}
return algoMap.at("default_allreduce_nvls_block_pipeline");
}
#if defined(__HIP_PLATFORM_AMD__)
// AMD platform: use fullmesh algorithm
return algoMap.at("default_allreduce_fullmesh");
#else
// NVIDIA without NVLS: use RSAG pipeline if no NCCL fallback
if (!config.ncclDlopenSharedLib) {
return algoMap.at("default_allreduce_fullmesh");
}
return nullptr;
#endif
}
std::shared_ptr<Algorithm> selectSingleNodeAllgather(
const std::unordered_map<std::string, std::shared_ptr<Algorithm>>& algoMap, const CollectiveRequest& request,
[[maybe_unused]] const AlgorithmSelectorConfig& config) {
const size_t messageSize = request.messageSize;
// For messages up to 32MB, use fullmesh2 algorithm
if (messageSize <= 32 * (1 << 20)) {
return algoMap.at("default_allgather_fullmesh2");
}
#if defined(__HIP_PLATFORM_AMD__)
// AMD platform always uses fullmesh2
return algoMap.at("default_allgather_fullmesh2");
#else
// NVIDIA: use fullmesh for large messages if no NCCL fallback is available
if (!config.ncclDlopenSharedLib) {
return algoMap.at("default_allgather_fullmesh");
}
return nullptr;
#endif
}
std::shared_ptr<Algorithm> selectMultiNodeAlgorithm(
const std::unordered_map<std::string, std::shared_ptr<Algorithm>>& algoMap [[maybe_unused]],
const CollectiveRequest& request [[maybe_unused]], const AlgorithmSelectorConfig& config [[maybe_unused]]) {
// TODO: Implement multi-node algorithm selection
// Multi-node scenarios will need to consider:
// 1. Multi-node NVLS (if supported by hardware)
// 2. Multi-node IB (InfiniBand)
// 3. Hierarchical algorithms (intra-node + inter-node)
// 4. Network topology awareness
// For now, return nullptr to fallback to NCCL/RCCL
INFO(MSCCLPP_NCCL, "Multi-node collective not yet supported, fallback to nccl/rccl");
return nullptr;
}
} // namespace nccl
} // namespace mscclpp