From 40295df4c4d0b57a028f29320996f50587330656 Mon Sep 17 00:00:00 2001 From: Caio Rocha <164253795+caiomcbr@users.noreply.github.com> Date: Thu, 14 May 2026 09:56:11 -0700 Subject: [PATCH 1/2] Adding Support to bf16 Executor Tests (#801) This pull request adds support for the `bfloat16` (bf16) data type to the test executor, including both Python and CUDA components. The changes ensure that `bfloat16` is handled consistently across argument parsing, data type conversion, and test kernel implementations. Additionally, the CUDA verification kernels are refactored to use parameterized tolerances for improved numerical accuracy checks. **Support for bfloat16 data type:** * Added handling for `bfloat16`/`bf16` in the Python test executor's argument parsing, data type conversion (`parse_dtype`, `dtype_to_mscclpp_dtype`), and help text. [[1]](diffhunk://#diff-e643968a8622d1603868a8ecf4b2fcd8108be1e404a3420bb7e2a6d51dc23fdcR27-R28) [[2]](diffhunk://#diff-e643968a8622d1603868a8ecf4b2fcd8108be1e404a3420bb7e2a6d51dc23fdcL122-R135) [[3]](diffhunk://#diff-e643968a8622d1603868a8ecf4b2fcd8108be1e404a3420bb7e2a6d51dc23fdcL246-R251) * Updated output to display the correct data type string for `bfloat16`. **CUDA kernel and test improvements:** * Included `bfloat16` headers and defined test data fill and gather kernels for `bfloat16` on both CUDA and HIP platforms. [[1]](diffhunk://#diff-e18b8becff1c3b234733f5ca3250a76ffdc5edddb302c2da098b64b00ba7cf88R8-R11) [[2]](diffhunk://#diff-e18b8becff1c3b234733f5ca3250a76ffdc5edddb302c2da098b64b00ba7cf88R35) [[3]](diffhunk://#diff-e18b8becff1c3b234733f5ca3250a76ffdc5edddb302c2da098b64b00ba7cf88R54-R59) [[4]](diffhunk://#diff-e18b8becff1c3b234733f5ca3250a76ffdc5edddb302c2da098b64b00ba7cf88R133) * Refactored verification kernels (`ALL_REDUCE`, `REDUCE_SCATTER`) to use an explicit tolerance parameter (`Eps`) and added correct tolerances for each data type, including `bfloat16`. [[1]](diffhunk://#diff-e18b8becff1c3b234733f5ca3250a76ffdc5edddb302c2da098b64b00ba7cf88L69-R85) [[2]](diffhunk://#diff-e18b8becff1c3b234733f5ca3250a76ffdc5edddb302c2da098b64b00ba7cf88L94-R113) These changes ensure full support for `bfloat16` in the test executor and improve the accuracy and maintainability of the CUDA test kernels. --------- Co-authored-by: Caio Rocha --- python/test/executor_test.py | 21 ++++++++++------- python/test/executor_test_verifier.cu | 33 +++++++++++++++++++-------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 59bc1661..8a309de5 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -24,6 +24,8 @@ def parse_dtype(dtype_str): dtype_str = dtype_str.strip().lower() if dtype_str == "float16": return cp.float16 + elif dtype_str in ("bfloat16", "bf16"): + return cp.float16 # same 2-byte size; mscclpp DataType is resolved from dtype_str elif dtype_str == "float32": return cp.float32 elif dtype_str == "int32": @@ -119,15 +121,18 @@ def parse_size(size_str): return int(size_str) -def dtype_to_mscclpp_dtype(dtype): - if dtype == cp.float16: +def dtype_to_mscclpp_dtype(dtype_str): + dtype_str = dtype_str.strip().lower() + if dtype_str == "float16": return DataType.float16 - elif dtype == cp.float32: + elif dtype_str in ("bfloat16", "bf16"): + return DataType.bfloat16 + elif dtype_str == "float32": return DataType.float32 - elif dtype == cp.int32: + elif dtype_str == "int32": return DataType.int32 else: - raise ValueError(f"Unknown data type: {dtype}") + raise ValueError(f"Unknown data type: {dtype_str}") def build_bufs( @@ -205,7 +210,7 @@ def main( result_buf.data.ptr, input_buf.nbytes, result_buf.nbytes, - dtype_to_mscclpp_dtype(dtype), + dtype_to_mscclpp_dtype(dtype_str), execution_plan, stream.ptr, packet_type, @@ -231,7 +236,7 @@ def main( npkit.shutdown() print( f"Rank: {mscclpp_group.my_rank} Execution time: {execution_time} us, " - f"data size: {result_buf.nbytes} bytes data type: {dtype().dtype.name} " + f"data size: {result_buf.nbytes} bytes data type: {dtype_str} " f"packet type: {packet_type}" ) executor = None @@ -243,7 +248,7 @@ if __name__ == "__main__": parser.add_argument("-path", "--execution_plan_path", type=str, required=True) parser.add_argument("--size", type=str, required=True) parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") - parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, float32, int32") + parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, bfloat16, float32, int32") parser.add_argument("--packet_type", type=str, default="LL16", help="Choose from LL8, LL16") parser.add_argument("--n_iters", type=int, default=10) parser.add_argument("--n_graph_iters", type=int, default=10) diff --git a/python/test/executor_test_verifier.cu b/python/test/executor_test_verifier.cu index cf3cd4a6..e7749197 100644 --- a/python/test/executor_test_verifier.cu +++ b/python/test/executor_test_verifier.cu @@ -4,8 +4,10 @@ #include #if defined(__HIP_PLATFORM_AMD__) +#include #include #else +#include #include #endif @@ -30,6 +32,7 @@ static __device__ unsigned int ranqd1(unsigned int seed) { } \ } +FILL_DATA(bfloat16, __nv_bfloat16) FILL_DATA(float16, __half) FILL_DATA(float32, float) FILL_DATA(int32, int) @@ -48,11 +51,12 @@ FILL_DATA(int32, int) } \ } +TEST_DATA_ALL_GATHER(bfloat16, __nv_bfloat16) TEST_DATA_ALL_GATHER(float16, __half) TEST_DATA_ALL_GATHER(float32, float) TEST_DATA_ALL_GATHER(int32, int) -#define TEST_DATA_ALL_REDUCE(FuncNameType, DataType) \ +#define TEST_DATA_ALL_REDUCE(FuncNameType, DataType, Eps) \ extern "C" __global__ void __launch_bounds__(1024, 1) test_data_all_reduce_##FuncNameType( \ DataType* result_buf, DataType* test_buf, size_t num_elems, int num_ranks, int my_rank, int seq) { \ for (int rank = 0; rank < num_ranks; rank++) { \ @@ -66,15 +70,19 @@ TEST_DATA_ALL_GATHER(int32, int) } \ } \ for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < num_elems; i += blockDim.x * gridDim.x) { \ - assert(abs(float(result_buf[i]) - float(test_buf[i])) < 1e-3 * num_ranks); \ + float expected = float(test_buf[i]); \ + float result = float(result_buf[i]); \ + float tol = Eps * num_ranks * (1.0f + abs(expected)); \ + assert(abs(result - expected) <= tol); \ } \ } -TEST_DATA_ALL_REDUCE(float16, __half) -TEST_DATA_ALL_REDUCE(float32, float) -TEST_DATA_ALL_REDUCE(int32, int) +TEST_DATA_ALL_REDUCE(bfloat16, __nv_bfloat16, 7.8125e-3f) +TEST_DATA_ALL_REDUCE(float16, __half, 9.765625e-4f) +TEST_DATA_ALL_REDUCE(float32, float, 1.1920929e-7f) +TEST_DATA_ALL_REDUCE(int32, int, 0.0f) -#define TEST_DATA_REDUCE_SCATTER(FuncNameType, DataType) \ +#define TEST_DATA_REDUCE_SCATTER(FuncNameType, DataType, Eps) \ extern "C" __global__ void __launch_bounds__(1024, 1) test_data_reduce_scatter_##FuncNameType( \ DataType* result_buf, DataType* test_buf, size_t num_elems, int num_ranks, int my_rank, int seq) { \ int nem_elems_per_rank = num_elems / num_ranks; \ @@ -91,14 +99,18 @@ TEST_DATA_ALL_REDUCE(int32, int) } \ for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < num_elems; i += blockDim.x * gridDim.x) { \ if (i >= offset && i < offset + nem_elems_per_rank) { \ - assert(abs(float(result_buf[i - offset]) - float(test_buf[i])) < 1e-3 * num_ranks); \ + float expected = float(test_buf[i]); \ + float result = float(result_buf[i - offset]); \ + float tol = Eps * num_ranks * (1.0f + abs(expected)); \ + assert(abs(result - expected) <= tol); \ } \ } \ } -TEST_DATA_REDUCE_SCATTER(float16, __half) -TEST_DATA_REDUCE_SCATTER(float32, float) -TEST_DATA_REDUCE_SCATTER(int32, int) +TEST_DATA_REDUCE_SCATTER(bfloat16, __nv_bfloat16, 7.8125e-3f) +TEST_DATA_REDUCE_SCATTER(float16, __half, 9.765625e-4f) +TEST_DATA_REDUCE_SCATTER(float32, float, 1.1920929e-7f) +TEST_DATA_REDUCE_SCATTER(int32, int, 0.0f) #define TEST_DATA_ALL_TO_ALL(FuncNameType, DataType) \ extern "C" __global__ void __launch_bounds__(1024, 1) test_data_all_to_all_##FuncNameType( \ @@ -118,6 +130,7 @@ TEST_DATA_REDUCE_SCATTER(int32, int) } \ } +TEST_DATA_ALL_TO_ALL(bfloat16, __nv_bfloat16) TEST_DATA_ALL_TO_ALL(float16, __half) TEST_DATA_ALL_TO_ALL(float32, float) TEST_DATA_ALL_TO_ALL(int32, int) \ No newline at end of file From 5d608feaa52f83e7b556fde647a45b63eb119047 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Thu, 14 May 2026 14:06:12 -0700 Subject: [PATCH 2/2] Enhance cross-node CudaIpc availability check (#803) --- include/mscclpp/utils.hpp | 9 +++++++++ src/core/connection.cc | 11 +++++++++++ src/core/context.cc | 17 ++++++++-------- src/core/gpu_ipc_mem.cc | 27 ++++++++++++++++++++++---- test/mp_unit/port_channel_tests.cu | 31 +++++++++++++++++++++++++++--- 5 files changed, 79 insertions(+), 16 deletions(-) diff --git a/include/mscclpp/utils.hpp b/include/mscclpp/utils.hpp index ffe269da..54cfa4a0 100644 --- a/include/mscclpp/utils.hpp +++ b/include/mscclpp/utils.hpp @@ -46,6 +46,15 @@ std::string getIBDeviceName(Transport ibTransport); /// @return The InfiniBand transport associated with the specified device name. Transport getIBTransportByDeviceName(const std::string& ibDeviceName); +/// Check whether this process can allocate/import CUDA memory with NVIDIA fabric handles +/// (`CU_MEM_HANDLE_TYPE_FABRIC`). Fabric handles enable cross-node `Transport::CudaIpc` on +/// MNNVL systems (e.g., GB200 NVL72) when the IMEX service is running. Returns `false` on +/// hardware/software stacks without MNNVL+IMEX, in which case `Transport::CudaIpc` is +/// restricted to ranks within the same node. +/// +/// @return `true` if fabric handles are usable from this process, `false` otherwise. +bool isFabricMemHandleAvailable(); + } // namespace mscclpp #endif // MSCCLPP_UTILS_HPP_ diff --git a/src/core/connection.cc b/src/core/connection.cc index 8b6c0afb..11ecd968 100644 --- a/src/core/connection.cc +++ b/src/core/connection.cc @@ -82,6 +82,17 @@ MSCCLPP_API_CPP int Connection::getMaxWriteQueueSize() const { return impl_->get CudaIpcConnection::CudaIpcConnection(std::shared_ptr context, const Endpoint& localEndpoint, const Endpoint& remoteEndpoint) : BaseConnection(context, localEndpoint) { + // Log fabric/MNNVL availability exactly once per process so any later cross-node CudaIpc failure + // is easy to triage. C++11 magic statics make this thread-safe without an explicit mutex. + // NOTE: assigning the message to a std::string first avoids the logger's pointer-formatting + // overload from kicking in on the const char* result of the ternary. + [[maybe_unused]] static const bool fabricAvailable_ = []() { + const bool avail = isFabricMemHandleAvailable(); + const std::string status = avail ? "available (cross-node CudaIpc via MNNVL/IMEX is supported)" + : "NOT available (CudaIpc is restricted to intra-node ranks on this system)"; + INFO(CONN, "CudaIpc transport selected: fabric handles ", status); + return avail; + }(); if (localEndpoint.transport() != Transport::CudaIpc || remoteEndpoint.transport() != Transport::CudaIpc) { THROW(CONN, Error, ErrorCode::InternalError, "CudaIpc transport is required for CudaIpcConnection"); } diff --git a/src/core/context.cc b/src/core/context.cc index aabe71df..b55939e3 100644 --- a/src/core/context.cc +++ b/src/core/context.cc @@ -4,12 +4,11 @@ #include "context.hpp" #include -#include #include "api.h" #include "connection.hpp" -#include "debug.h" #include "endpoint.hpp" +#include "logger.hpp" #include "registered_memory.hpp" namespace mscclpp { @@ -78,19 +77,17 @@ MSCCLPP_API_CPP Endpoint Context::createEndpoint(EndpointConfig config) { MSCCLPP_API_CPP Connection Context::connect(const Endpoint& localEndpoint, const Endpoint& remoteEndpoint) { if (localEndpoint.device().type == DeviceType::GPU && localEndpoint.device().id < 0) { - throw Error("No GPU device ID provided for local endpoint", ErrorCode::InvalidUsage); + THROW(CONN, Error, ErrorCode::InvalidUsage, "No GPU device ID provided for local endpoint"); } if (remoteEndpoint.device().type == DeviceType::GPU && remoteEndpoint.device().id < 0) { - throw Error("No GPU device ID provided for remote endpoint", ErrorCode::InvalidUsage); + THROW(CONN, Error, ErrorCode::InvalidUsage, "No GPU device ID provided for remote endpoint"); } auto localTransport = localEndpoint.transport(); auto remoteTransport = remoteEndpoint.transport(); if (localTransport != remoteTransport && !(AllIBTransports.has(localTransport) && AllIBTransports.has(remoteTransport))) { - std::stringstream ss; - ss << "Transport mismatch between local (" << localTransport << ") and remote (" << remoteEndpoint.transport() - << ") endpoints"; - throw Error(ss.str(), ErrorCode::InvalidUsage); + THROW(CONN, Error, ErrorCode::InvalidUsage, "Transport mismatch between local (", localTransport, ") and remote (", + remoteTransport, ") endpoints"); } std::shared_ptr conn; if (localTransport == Transport::CudaIpc) { @@ -100,7 +97,9 @@ MSCCLPP_API_CPP Connection Context::connect(const Endpoint& localEndpoint, const } else if (localTransport == Transport::Ethernet) { conn = std::make_shared(shared_from_this(), localEndpoint, remoteEndpoint); } else { - throw Error("Unsupported transport", ErrorCode::InternalError); + THROW(CONN, Error, ErrorCode::InternalError, "Unsupported transport: ", localTransport, + " (this usually means EndpointConfig.transport was left at Transport::Unknown — " + "set it explicitly to CudaIpc, an IB transport, or Ethernet)"); } return Connection(conn); } diff --git a/src/core/gpu_ipc_mem.cc b/src/core/gpu_ipc_mem.cc index c863ecdd..0f58ed20 100644 --- a/src/core/gpu_ipc_mem.cc +++ b/src/core/gpu_ipc_mem.cc @@ -7,6 +7,7 @@ #include #include +#include #include "logger.hpp" #include "unix_socket.hpp" @@ -35,7 +36,7 @@ std::ostream& operator<<(std::ostream& os, const GpuIpcMemHandle::TypeFlags& typ return os; } -[[maybe_unused]] static bool isFabricMemHandleAvailable() { +bool isFabricMemHandleAvailable() { #if (CUDA_NVLS_API_AVAILABLE) static int resultCache = -1; // -1: uninitialized, 0: not available, 1: available if (resultCache != -1) { @@ -283,11 +284,19 @@ GpuIpcMem::GpuIpcMem(const GpuIpcMemHandle& handle) THROW(GPU, Error, ErrorCode::InvalidUsage, "GpuIpcMemHandle type is None, cannot create GpuIpcMem"); } if ((type_ == GpuIpcMemHandle::Type::None) && (handle_.typeFlags & GpuIpcMemHandle::Type::Fabric)) { - if (cuMemImportFromShareableHandle(&allocHandle_, (void*)handle_.fabric.handle, CU_MEM_HANDLE_TYPE_FABRIC) == - CUDA_SUCCESS) { + CUresult res = + cuMemImportFromShareableHandle(&allocHandle_, (void*)handle_.fabric.handle, CU_MEM_HANDLE_TYPE_FABRIC); + if (res == CUDA_SUCCESS) { // Ignore allocHandle in the handle struct since it is process-local and not transferable across processes. handle_.fabric.allocHandle = {}; type_ = GpuIpcMemHandle::Type::Fabric; + } else { + const char* errStr = nullptr; + (void)cuGetErrorString(res, &errStr); + const std::string errMsg = errStr ? std::string(errStr) : std::string("unknown CUDA error"); + WARN(GPU, "Fabric IPC handle import failed (", errMsg, + "); cross-node CudaIpc requires NVIDIA MNNVL hardware and a running IMEX service. ", + "Falling back to other handle types if available."); } } if ((type_ == GpuIpcMemHandle::Type::None) && (handle_.typeFlags & GpuIpcMemHandle::Type::PosixFd)) { @@ -303,7 +312,17 @@ GpuIpcMem::GpuIpcMem(const GpuIpcMemHandle& handle) type_ = GpuIpcMemHandle::Type::RuntimeIpc; } if (type_ == GpuIpcMemHandle::Type::None) { - THROW(GPU, Error, ErrorCode::Aborted, "Failed to open GpuIpcMemHandle (type: ", handle_.typeFlags, ")"); + const bool fabricOnly = (handle_.typeFlags == GpuIpcMemHandle::Type::Fabric); + const std::string hint = fabricOnly + ? std::string( + "The remote rank sent only a Fabric (MNNVL) handle, but this rank could not " + "import it. Check that the IMEX daemon is running on both nodes and that the " + "GPUs share an NVLink fabric.") + : std::string( + "All handle types failed to import; check IMEX service and POSIX FD socket " + "availability."); + THROW(GPU, Error, ErrorCode::Aborted, "Failed to open GpuIpcMemHandle (offered types: ", handle_.typeFlags, "). ", + hint); } } diff --git a/test/mp_unit/port_channel_tests.cu b/test/mp_unit/port_channel_tests.cu index 3b14ed31..4b1b0cfb 100644 --- a/test/mp_unit/port_channel_tests.cu +++ b/test/mp_unit/port_channel_tests.cu @@ -36,6 +36,18 @@ inline void requireGdrForIbMode(IbMode mode, mscclpp::Transport ibTransport) { #define REQUIRE_GDR_FOR_IB_MODE(mode) // No extra requirements on non-CUDA platforms. #endif +// Skip an IPC-only PortChannel test (useIPC=true, useIB=false, useEthernet=false) when CudaIpc +// cannot connect this rank pair. CudaIpc works intra-node always, and cross-node only on MNNVL +// systems (GB200 NVL72 + IMEX). The combined check is "at least 2 ranks per node" OR "fabric +// (MNNVL) handles are usable on this system". +#define REQUIRE_CUDA_IPC_AVAILABLE \ + do { \ + if (gEnv->nRanksPerNode < 2 && !mscclpp::isFabricMemHandleAvailable()) { \ + SKIP_TEST() << "CudaIpc requires intra-node ranks (nRanksPerNode>=2) or MNNVL fabric handles, \ +both unavailable here."; \ + } \ + } while (0) + void PortChannelOneToOneTest::SetUp() { // Use only two ranks setNumRanksToUse(2); @@ -71,7 +83,10 @@ void PortChannelOneToOneTest::setupMeshConnections(std::vectorrank)) && useIPC) { + if (useIPC) { + // CudaIpc works intra-node always, and cross-node on MNNVL systems (GB200 NVL72 + IMEX) + // via fabric handles. Tests that exercise CudaIpc across nodes on non-MNNVL hardware should + // gate themselves with REQUIRE_CUDA_IPC_AVAILABLE; we always request CudaIpc here when asked. cfg.transport = mscclpp::Transport::CudaIpc; } else if (useIb) { cfg.transport = ibTransport; @@ -262,6 +277,7 @@ void PortChannelOneToOneTest::testPingPongPerf(PingPongTestParams params) { } TEST(PortChannelOneToOneTest, PingPong) { + REQUIRE_CUDA_IPC_AVAILABLE; testPingPong(PingPongTestParams{ .useIPC = true, .useIB = false, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::Default}); } @@ -279,6 +295,7 @@ TEST(PortChannelOneToOneTest, PingPongEthernet) { } TEST(PortChannelOneToOneTest, PingPongWithPoll) { + REQUIRE_CUDA_IPC_AVAILABLE; testPingPong(PingPongTestParams{ .useIPC = true, .useIB = false, .useEthernet = false, .waitWithPoll = true, .ibMode = IbMode::Default}); } @@ -291,6 +308,7 @@ TEST(PortChannelOneToOneTest, PingPongIbHostModeWithPoll) { } PERF_TEST(PortChannelOneToOneTest, PingPongPerf) { + REQUIRE_CUDA_IPC_AVAILABLE; testPingPongPerf(PingPongTestParams{ .useIPC = true, .useIB = false, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::Default}); } @@ -482,7 +500,10 @@ void PortChannelOneToOneTest::testPacketPingPongPerf(bool useIb, IbMode ibMode) proxyService->stopProxy(); } -TEST(PortChannelOneToOneTest, PacketPingPong) { testPacketPingPong(false, IbMode::Default); } +TEST(PortChannelOneToOneTest, PacketPingPong) { + REQUIRE_CUDA_IPC_AVAILABLE; + testPacketPingPong(false, IbMode::Default); +} TEST(PortChannelOneToOneTest, PacketPingPongIbHostMode) { REQUIRE_IBVERBS; @@ -490,7 +511,10 @@ TEST(PortChannelOneToOneTest, PacketPingPongIbHostMode) { testPacketPingPong(true, IbMode::Host); } -PERF_TEST(PortChannelOneToOneTest, PacketPingPongPerf) { testPacketPingPongPerf(false, IbMode::Default); } +PERF_TEST(PortChannelOneToOneTest, PacketPingPongPerf) { + REQUIRE_CUDA_IPC_AVAILABLE; + testPacketPingPongPerf(false, IbMode::Default); +} PERF_TEST(PortChannelOneToOneTest, PacketPingPongPerfIbHostMode) { REQUIRE_IBVERBS; @@ -583,6 +607,7 @@ void PortChannelOneToOneTest::testBandwidth(PingPongTestParams params) { } PERF_TEST(PortChannelOneToOneTest, Bandwidth) { + REQUIRE_CUDA_IPC_AVAILABLE; testBandwidth(PingPongTestParams{ .useIPC = true, .useIB = false, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::Default}); }