From 67f9933ba13da5baea941bab92a114398bafa69e Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Wed, 1 Apr 2026 10:20:43 +0000 Subject: [PATCH] fix data direct --- .github/copilot-instructions.md | 2 +- cmake/FindGDRCopy.cmake | 2 + cmake/FindMLX5.cmake | 1 + include/mscclpp/atomic_device.hpp | 5 +- src/core/connection.cc | 196 +++++++++------------- src/core/endpoint.cc | 21 --- src/core/gdr.cc | 112 ++++++++++--- src/core/ib.cc | 115 +++++++------ src/core/ibverbs_wrapper.cc | 43 ++++- src/core/include/connection.hpp | 56 +++---- src/core/include/endpoint.hpp | 8 - src/core/include/gdr.hpp | 57 ++----- src/core/include/ib.hpp | 16 +- src/core/include/mlx5dv_wrapper.hpp | 12 +- src/core/mlx5dv_wrapper.cc | 39 +++-- src/core/semaphore.cc | 21 ++- test/framework.cc | 3 + test/mp_unit/ib_tests.cu | 120 ++++++++++--- test/mp_unit/port_channel_tests.cu | 19 +++ test/unit/CMakeLists.txt | 1 + test/unit/gdr_tests.cu | 251 ++++++++++++++++++++++++++++ 21 files changed, 735 insertions(+), 365 deletions(-) create mode 100644 test/unit/gdr_tests.cu diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 4f13c557..9d7e7798 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -43,7 +43,7 @@ For testing after successful build: # To run tests with two GPUs - two is enough for most tests mpirun -np 2 ./build/bin/mp_unit_tests # To run tests excluding IB-related ones (when IB is not available) -mpirun -np 2 ./build/bin/mp_unit_tests --gtest_filter=-*Ib* +mpirun -np 2 ./build/bin/mp_unit_tests --filter=-*Ib* ``` For building a Python package: diff --git a/cmake/FindGDRCopy.cmake b/cmake/FindGDRCopy.cmake index e62f32f2..54e0ba1c 100644 --- a/cmake/FindGDRCopy.cmake +++ b/cmake/FindGDRCopy.cmake @@ -35,7 +35,9 @@ find_library(GDRCOPY_LIBRARIES if(GDRCOPY_INCLUDE_DIRS) include(CheckSymbolExists) set(CMAKE_REQUIRED_INCLUDES ${GDRCOPY_INCLUDE_DIRS}) + set(CMAKE_REQUIRED_LIBRARIES ${GDRCOPY_LIBRARIES}) check_symbol_exists(gdr_pin_buffer_v2 "gdrapi.h" GDRCOPY_HAS_PIN_BUFFER_V2) + unset(CMAKE_REQUIRED_LIBRARIES) unset(CMAKE_REQUIRED_INCLUDES) if(NOT GDRCOPY_HAS_PIN_BUFFER_V2) message(STATUS "GDRCopy found but too old (gdr_pin_buffer_v2 not available). Requires >= 2.5.") diff --git a/cmake/FindMLX5.cmake b/cmake/FindMLX5.cmake index 59298450..9fd59127 100644 --- a/cmake/FindMLX5.cmake +++ b/cmake/FindMLX5.cmake @@ -33,5 +33,6 @@ find_library(MLX5_LIBRARIES /usr/lib/x86_64-linux-gnu) include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(MLX5 DEFAULT_MSG MLX5_INCLUDE_DIRS MLX5_LIBRARIES) mark_as_advanced(MLX5_INCLUDE_DIRS MLX5_LIBRARIES) diff --git a/include/mscclpp/atomic_device.hpp b/include/mscclpp/atomic_device.hpp index 74f6122f..d00bb50c 100644 --- a/include/mscclpp/atomic_device.hpp +++ b/include/mscclpp/atomic_device.hpp @@ -38,7 +38,7 @@ MSCCLPP_HOST_DEVICE_INLINE T atomicFetchAdd(T* ptr, const T& val, cuda::memory_o return cuda::atomic_ref{*ptr}.fetch_add(val, memoryOrder); } -#elif defined(MSCCLPP_DEVICE_HIP) +#else // !defined(MSCCLPP_DEVICE_CUDA) constexpr auto memoryOrderRelaxed = __ATOMIC_RELAXED; constexpr auto memoryOrderAcquire = __ATOMIC_ACQUIRE; @@ -46,7 +46,6 @@ constexpr auto memoryOrderRelease = __ATOMIC_RELEASE; constexpr auto memoryOrderAcqRel = __ATOMIC_ACQ_REL; constexpr auto memoryOrderSeqCst = __ATOMIC_SEQ_CST; -// HIP does not have thread scope enums like CUDA constexpr auto scopeSystem = 0; constexpr auto scopeDevice = 0; @@ -65,7 +64,7 @@ MSCCLPP_HOST_DEVICE_INLINE T atomicFetchAdd(T* ptr, const T& val, int memoryOrde return __atomic_fetch_add(ptr, val, memoryOrder); } -#endif // defined(MSCCLPP_DEVICE_HIP) +#endif // !defined(MSCCLPP_DEVICE_CUDA) } // namespace mscclpp diff --git a/src/core/connection.cc b/src/core/connection.cc index 7ce9b37d..172bca39 100644 --- a/src/core/connection.cc +++ b/src/core/connection.cc @@ -7,6 +7,7 @@ #include #endif +#include #include #include #include @@ -219,29 +220,18 @@ void IBConnection::recvThreadFunc() { continue; } - // Read the token value from the incoming write-with-imm completion. - if (dataDirectEnabled_) { - // Data Direct path: the signal GPU buffer MR was registered with - // MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT, and the semaphore token is also written - // through Data Direct (via GDRCopy). Both writes go through the same path, so - // all data is visible in GPU memory when the CQE is polled. Read from imm_data. - newValueHost = static_cast(qp->getRecvWcImmData(i)); - } else { - // Slow path: read the 64-bit token from the local signal GPU buffer via volatile load. - // localSignalGpuPtr_ points to either a GDRCopy BAR1 mapping (CUDA) or the - // GPU buffer directly (ROCm system-coherent/uncached memory). - newValueHost = *static_cast(localSignalGpuPtr_); - } + // Read the token from imm_data (always available and correct in the CQE). + newValueHost = static_cast(qp->getRecvWcImmData(i)); - // Read token address from the local stored address (set by setSignalForwardingDst) - if (remoteUpdateDstAddr_ != 0) { - uint64_t* dstPtr = reinterpret_cast(remoteUpdateDstAddr_); - - if (remoteUpdateDstAddrMap_ && remoteUpdateDstAddrMap_->valid()) { - // Direct host-side write to GPU memory via GDRCopy BAR1 mapping - remoteUpdateDstAddrMap_->copyTo(&newValueHost, sizeof(uint64_t)); + // Forward the token to the semaphore's inbound token address via atomicStore + // through the GDRCopy BAR1 mapping. The GPU reads with system-scope acquire. + if (signalAddr_ != 0) { + if (signalGdrMap_ && signalGdrMap_->valid()) { + atomicStore(signalGdrMap_->hostPtr(), newValueHost, memoryOrderRelaxed); } else { - *dstPtr = newValueHost; + // For HIP/ROCm. + // NOTE: may need a fix in the future to ensure BAR1 mapping. + *reinterpret_cast(signalAddr_) = newValueHost; } } @@ -259,12 +249,10 @@ IBConnection::IBConnection(std::shared_ptr context, const Endpoint& loc remoteTransport_(remoteEndpoint.transport()), atomicSrc_(std::make_unique(0)), ibNoAtomic_(getImpl(localEndpoint).ibNoAtomic_), + gdrSignalForwarding_(false), stopRecvThread_(false), localGpuDeviceId_(localEndpoint.device().id), - remoteUpdateDstAddr_(0), - remoteSignalGpuMrInfo_{0, 0}, - localSignalGpuPtr_(nullptr), - dataDirectEnabled_(false) { + signalAddr_(0) { qp_ = getImpl(localEndpoint).ibQp_; qp_.lock()->rtr(getImpl(remoteEndpoint).ibQpInfo_); qp_.lock()->rts(); @@ -274,105 +262,89 @@ IBConnection::IBConnection(std::shared_ptr context, const Endpoint& loc if (ibNoAtomic_) { #if defined(MSCCLPP_USE_CUDA) + // On CUDA, HostNoAtomic requires GDRCopy for CPU→GPU signal forwarding through BAR1. if (!gdrEnabled()) { - std::string reason = "unknown"; - switch (gdrStatus()) { - case GdrStatus::NotBuilt: - reason = "mscclpp was not built with GDRCopy support (MSCCLPP_USE_GDRCOPY not set)"; - break; - case GdrStatus::Disabled: - reason = "GDRCopy is disabled via MSCCLPP_FORCE_DISABLE_GDR environment variable"; - break; - case GdrStatus::DriverMissing: - reason = "GDRCopy kernel driver is not loaded (/dev/gdrdrv not found)"; - break; - case GdrStatus::OpenFailed: - reason = "gdr_open() failed; GDRCopy driver may be misconfigured"; - break; - default: - break; + THROW(CONN, Error, ErrorCode::InvalidUsage, + "IB host-no-atomic mode on CUDA requires GDRCopy: ", gdrStatusMessage()); + } + gdrSignalForwarding_ = true; +#endif // defined(MSCCLPP_USE_CUDA) + + // On platforms with a CPU-GPU bridge that reorders posted writes (e.g., Grace/GB200 + // NVLink-C2C), HostNoAtomic requires Data Direct for correct memory ordering. Data Direct + // routes NIC DMA through the PCIe Data Direct engine, bypassing the bridge. It is available + // on Virtual Function (VF) devices. On platforms without such a bridge (x86, non-Grace + // aarch64), HostNoAtomic works without Data Direct. + // + // We cannot reliably detect the bridge at compile time or runtime, so we emit a warning + // when the device is not a VF. If data corruption occurs, switching to VF devices with + // Data Direct or using IbMode::Host with RDMA atomics will resolve it. + { + IbCtx* ibCtx = getImpl(*context).getIbContext(transport_); + if (!ibCtx->isVirtualFunction()) { + WARN(CONN, + "IB HostNoAtomic mode without a Virtual Function (VF) device may cause data corruption " + "on platforms with a CPU-GPU bridge that reorders posted writes (e.g., Grace/GB200). " + "Device ", + ibCtx->getDevName(), + " is not a VF. " + "If you experience data corruption, use VF devices with Data Direct or IbMode::Host."); } - THROW(CONN, Error, ErrorCode::InvalidUsage, "IB host-no-atomic mode on CUDA requires GDRCopy: ", reason); - } -#endif - - // Extract remote endpoint's signal GPU buffer MR info for write-with-imm destination - const auto& remoteImpl = getImpl(remoteEndpoint); - remoteSignalGpuMrInfo_ = remoteImpl.ibSignalGpuMrInfo_; - - // Create a GDR mapping of the local signal GPU buffer. recvThreadFunc reads the - // 64-bit token via localSignalGpuPtr_, which points to the BAR1-mapped host address - // (CUDA/GDRCopy) or the GPU buffer directly (ROCm system-coherent memory). - const auto& localImpl = getImpl(localEndpoint); - if (gdrEnabled() && localImpl.ibSignalGpuBuffer_) { - localSignalGpuMap_ = - std::make_unique(std::static_pointer_cast(localImpl.ibSignalGpuBuffer_), localGpuDeviceId_); - } - if (localSignalGpuMap_ && localSignalGpuMap_->valid()) { - // Use the BAR1-mapped host pointer; uncacheable MMIO ensures ordered volatile reads. - localSignalGpuPtr_ = localSignalGpuMap_->hostPtr(); - } else if (localImpl.ibSignalGpuBuffer_) { - // ROCm: GPU memory is system-coherent, so direct volatile read is safe. - localSignalGpuPtr_ = reinterpret_cast(localImpl.ibSignalGpuBuffer_.get()); } - // Data Direct requires all three conditions: - // 1. Signal GPU buffer MR registered with MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT - // 2. Local signal GPU GDRCopy mapping pinned with GDR_PIN_FLAG_FORCE_PCIE - // 3. (signal forwarding dst GDRCopy mapping checked at setSignalForwardingDst time) - // When all conditions are met, RDMA data writes and GDRCopy token writes both go - // through the Data Direct engine, guaranteeing GPU memory visibility at CQE poll time. + // Pre-post receive requests for incoming WRITE_WITH_IMM notifications. + // The recv CQE guarantees the preceding data WRITE has been committed to GPU memory. auto qp = qp_.lock(); - dataDirectEnabled_ = localImpl.ibSignalGpuMr_ && localImpl.ibSignalGpuMr_->isDataDirect() && - localSignalGpuMap_ && localSignalGpuMap_->valid(); - if (dataDirectEnabled_) { - INFO(CONN, "IBConnection: Data Direct enabled"); - } - - // Pre-post receive requests for incoming write-with-imm int maxRecvWr = localEndpoint.config().ib.maxRecvWr; for (int i = 0; i < maxRecvWr; ++i) { qp->stageRecv(/*wrId=*/0); } qp->postRecv(); - // Start the background thread to poll recv CQ - recvThread_ = std::thread([this]() { this->recvThreadFunc(); }); - INFO(CONN, "IBConnection via ", getIBDeviceName(transport_), " created with no-atomic mode"); + // The recv thread is started later in startSignalForwarding() when the semaphore + // provides the signal forwarding destination. This ensures the thread lifetime is + // bounded by the GdrMap lifetime (created before start, destroyed after stop). + INFO(CONN, "IBConnection via ", getIBDeviceName(transport_), " created with signal forwarding (HostNoAtomic) mode"); } else { INFO(CONN, "IBConnection via ", getIBDeviceName(transport_), " created with atomic mode"); } } -IBConnection::~IBConnection() { +IBConnection::~IBConnection() { stopSignalForwarding(); } + +Transport IBConnection::transport() const { return transport_; } + +Transport IBConnection::remoteTransport() const { return remoteTransport_; } + +bool IBConnection::isSignalForwarding() const { return ibNoAtomic_; } + +void IBConnection::startSignalForwarding(std::shared_ptr mem) { + // Set up the forwarding destination and GdrMap, then start the recv thread. + // Order: set address → create GdrMap → start thread. + signalAddr_ = reinterpret_cast(mem.get()); + if (gdrSignalForwarding_) { + signalGdrMap_ = std::make_unique(std::move(mem), localGpuDeviceId_); + } + if (ibNoAtomic_) { + stopRecvThread_.store(false, std::memory_order_relaxed); + recvThread_ = std::thread([this]() { this->recvThreadFunc(); }); + } + INFO(CONN, "IBConnection startSignalForwarding: ", (void*)signalAddr_); +} + +void IBConnection::stopSignalForwarding() { + // Stop the recv thread, then tear down GdrMap and address. + // Order: stop thread → destroy GdrMap → clear address. if (ibNoAtomic_) { stopRecvThread_.store(true, std::memory_order_relaxed); if (recvThread_.joinable()) { recvThread_.join(); } } -} - -Transport IBConnection::transport() const { return transport_; } - -Transport IBConnection::remoteTransport() const { return remoteTransport_; } - -bool IBConnection::usesSignalForwarding() const { return ibNoAtomic_; } - -void IBConnection::setSignalForwardingDst(std::shared_ptr mem) { - remoteUpdateDstAddr_ = reinterpret_cast(mem.get()); - if (gdrEnabled()) { - if (mem) { - remoteUpdateDstAddrMap_ = std::make_unique(std::move(mem), localGpuDeviceId_); - // Data Direct requires the token write mapping to also use FORCE_PCIE - if (dataDirectEnabled_ && !(remoteUpdateDstAddrMap_ && remoteUpdateDstAddrMap_->valid())) { - dataDirectEnabled_ = false; - INFO(CONN, "IBConnection: Data Direct disabled (signal forwarding dst GDRCopy mapping not available)"); - } - } else { - remoteUpdateDstAddrMap_.reset(); - } + if (gdrSignalForwarding_) { + signalGdrMap_.reset(); } - INFO(CONN, "IBConnection setSignalForwardingDst: ", (void*)remoteUpdateDstAddr_); + signalAddr_ = 0; + INFO(CONN, "IBConnection stopSignalForwarding"); } void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, @@ -425,27 +397,23 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6 *src = newValue; if (ibNoAtomic_) { - // Use RDMA write-with-imm instead of atomic operation. - // Write the token value (8 bytes) from the local host buffer to the remote signal GPU buffer, - // with newValue also in imm_data (32-bit). The remote's recvThreadFunc reads the token from - // the signal GPU buffer and forwards it to the semaphore's inbound token address. - - // Put newValue in imm_data (truncated to 32-bit; semaphore counters should fit) + // Signal forwarding: send a 0-byte RDMA WRITE_WITH_IMM with the token in imm_data. + // The receiver's recv thread polls the CQE, which guarantees the preceding data WRITE + // has been committed to GPU memory. The recv thread then forwards the token to the + // semaphore's inbound token via GDRCopy atomicStore. unsigned int immData = static_cast(newValue); - - // Write the real token value into the host buffer, then RDMA write host->remote GPU *atomicSrc_ = newValue; - qp_.lock()->stageSendWriteWithImm(atomicSrcTransportInfo_.ibMr, remoteSignalGpuMrInfo_, - /*size=*/sizeof(uint64_t), /*wrId=*/0, + qp_.lock()->stageSendWriteWithImm(nullptr, dstMrInfo, + /*size=*/0, /*wrId=*/0, /*srcOffset=*/0, /*dstOffset=*/0, /*signaled=*/true, /*immData=*/immData); qp_.lock()->postSend(); - INFO(CONN, "IBConnection write-with-imm: value ", oldValue, " -> ", newValue); + INFO(CONN, "IBConnection signal forwarding: value ", oldValue, " -> ", newValue); } else { qp_.lock()->stageSendAtomicAdd(atomicSrcTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, newValue - oldValue, /*signaled=*/true); qp_.lock()->postSend(); - INFO(CONN, "IBConnection atomic Write: from ", src, " to ", (uint8_t*)dstMrInfo.addr + dstOffset, ", ", oldValue, + INFO(CONN, "IBConnection atomic write: from ", src, " to ", (uint8_t*)dstMrInfo.addr + dstOffset, ", ", oldValue, " -> ", newValue); } diff --git a/src/core/endpoint.cc b/src/core/endpoint.cc index 6569a31e..5ab4bad0 100644 --- a/src/core/endpoint.cc +++ b/src/core/endpoint.cc @@ -53,21 +53,6 @@ Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl) ->createQp(config_.ib.port, config_.ib.gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum, config_.ib.maxSendWr, maxRecvWr, config_.ib.maxWrPerSend, ibNoAtomic_); ibQpInfo_ = ibQp_->getInfo(); - - // Allocate a 64-bit signal GPU buffer for write-with-imm data payload (ibNoAtomic_ only). - if (ibNoAtomic_ && config_.device.type == DeviceType::GPU && config_.device.id >= 0) { - CudaDeviceGuard deviceGuard(config_.device.id); -#if defined(MSCCLPP_DEVICE_HIP) - ibSignalGpuBuffer_ = detail::gpuCallocUncachedShared(); -#else - ibSignalGpuBuffer_ = detail::gpuCallocShared(); -#endif - ibSignalGpuMr_ = - contextImpl.getIbContext(config_.transport)->registerMr(ibSignalGpuBuffer_.get(), sizeof(uint64_t)); - ibSignalGpuMrInfo_ = ibSignalGpuMr_->getInfo(); - } else { - ibSignalGpuMrInfo_ = {0, 0}; - } } else if (config_.transport == Transport::Ethernet) { // Configuring Ethernet Interfaces abortFlag_ = 0; @@ -90,9 +75,6 @@ Endpoint::Impl::Impl(const std::vector& serialization) { ibLocal_ = false; it = detail::deserialize(it, ibQpInfo_); it = detail::deserialize(it, ibNoAtomic_); - if (ibNoAtomic_) { - it = detail::deserialize(it, ibSignalGpuMrInfo_); - } } else if (config_.transport == Transport::Ethernet) { it = detail::deserialize(it, socketAddress_); } @@ -123,9 +105,6 @@ MSCCLPP_API_CPP std::vector Endpoint::serialize() const { if (AllIBTransports.has(pimpl_->config_.transport)) { detail::serialize(data, pimpl_->ibQpInfo_); detail::serialize(data, pimpl_->ibNoAtomic_); - if (pimpl_->ibNoAtomic_) { - detail::serialize(data, pimpl_->ibSignalGpuMrInfo_); - } } else if (pimpl_->config_.transport == Transport::Ethernet) { detail::serialize(data, pimpl_->socketAddress_); } diff --git a/src/core/gdr.cc b/src/core/gdr.cc index 341002ed..22ac15c9 100644 --- a/src/core/gdr.cc +++ b/src/core/gdr.cc @@ -5,6 +5,7 @@ #if defined(MSCCLPP_USE_GDRCOPY) +#include #include #include @@ -12,9 +13,11 @@ #include "logger.hpp" +#ifndef GPU_PAGE_SHIFT #define GPU_PAGE_SHIFT 16 #define GPU_PAGE_SIZE (1UL << GPU_PAGE_SHIFT) #define GPU_PAGE_MASK (~(GPU_PAGE_SIZE - 1)) +#endif namespace mscclpp { @@ -45,6 +48,23 @@ GdrStatus gdrStatus() { return gdrContext()->status(); } bool gdrEnabled() { return gdrStatus() == GdrStatus::Ok; } +const char* gdrStatusMessage() { + switch (gdrStatus()) { + case GdrStatus::Ok: + return "GDRCopy initialized successfully"; + case GdrStatus::NotBuilt: + return "mscclpp was not built with GDRCopy support (MSCCLPP_USE_GDRCOPY not set)"; + case GdrStatus::Disabled: + return "GDRCopy is disabled via MSCCLPP_FORCE_DISABLE_GDR environment variable"; + case GdrStatus::DriverMissing: + return "GDRCopy kernel driver is not loaded (/dev/gdrdrv not found)"; + case GdrStatus::OpenFailed: + return "gdr_open() failed; GDRCopy driver may be misconfigured"; + default: + return "unknown GDRCopy status"; + } +} + GdrContext::GdrContext() : status_(GdrStatus::Disabled), handle_(nullptr) { if (env()->forceDisableGdr) { INFO(GPU, "GDRCopy disabled via MSCCLPP_FORCE_DISABLE_GDR"); @@ -77,53 +97,79 @@ GdrContext::~GdrContext() { } } -// GdrMap +// GdrMap::Impl — real implementation with GDRCopy + +struct GdrMap::Impl { + std::shared_ptr ctx; + std::shared_ptr gpuMem; + gdr_mh_t mh; + void* barPtr; + uint64_t* hostDstPtr; + size_t mappedSize; +}; + +GdrMap::GdrMap(std::shared_ptr gpuMem, int deviceId) : pimpl_(std::make_unique()) { + pimpl_->ctx = gdrContext(); + pimpl_->gpuMem = std::move(gpuMem); + pimpl_->mh = {}; + pimpl_->barPtr = nullptr; + pimpl_->hostDstPtr = nullptr; + pimpl_->mappedSize = 0; -GdrMap::GdrMap(std::shared_ptr gpuMem, int deviceId) - : ctx_(gdrContext()), - gpuMem_(std::move(gpuMem)), - mh_{}, - barPtr_(nullptr), - hostDstPtr_(nullptr), - mappedSize_(0) { // Ensure CUDA device context is active for gdr_pin_buffer CudaDeviceGuard deviceGuard(deviceId); - uint64_t gpuAddr = reinterpret_cast(gpuMem_.get()); + uint64_t gpuAddr = reinterpret_cast(pimpl_->gpuMem.get()); // Align to GPU page boundary and pin one page around the target address unsigned long alignedAddr = gpuAddr & GPU_PAGE_MASK; unsigned long pageOffset = gpuAddr - alignedAddr; - mappedSize_ = GPU_PAGE_SIZE; + pimpl_->mappedSize = GPU_PAGE_SIZE; - int ret = gdr_pin_buffer_v2(ctx_->handle(), alignedAddr, mappedSize_, GDR_PIN_FLAG_FORCE_PCIE, &mh_); + // Pin the GPU memory for GDRCopy BAR1 mapping. Try GDR_PIN_FLAG_FORCE_PCIE first for optimal + // ordering on platforms that support it (e.g., GB200). Fall back to flags=0 if FORCE_PCIE is + // not supported. Both paths work correctly: CPU writes via atomicStore, GPU reads via + // system-scope acquire. + int ret = + gdr_pin_buffer_v2(pimpl_->ctx->handle(), alignedAddr, pimpl_->mappedSize, GDR_PIN_FLAG_FORCE_PCIE, &pimpl_->mh); if (ret != 0) { - THROW(GPU, Error, ErrorCode::InternalError, "gdr_pin_buffer_v2 failed (ret=", ret, ") for addr ", (void*)gpuAddr, - ". Ensure the GPU memory is allocated with cudaMalloc (not cuMemCreate/cuMemMap)."); + ret = gdr_pin_buffer_v2(pimpl_->ctx->handle(), alignedAddr, pimpl_->mappedSize, 0, &pimpl_->mh); + if (ret != 0) { + THROW(GPU, Error, ErrorCode::InternalError, "gdr_pin_buffer_v2 failed (ret=", ret, ") for addr ", (void*)gpuAddr, + ". Ensure the GPU memory is allocated with cudaMalloc (not cuMemCreate/cuMemMap)."); + } } - ret = gdr_map(ctx_->handle(), mh_, &barPtr_, mappedSize_); + ret = gdr_map(pimpl_->ctx->handle(), pimpl_->mh, &pimpl_->barPtr, pimpl_->mappedSize); if (ret != 0) { - (void)gdr_unpin_buffer(ctx_->handle(), mh_); + (void)gdr_unpin_buffer(pimpl_->ctx->handle(), pimpl_->mh); THROW(GPU, Error, ErrorCode::InternalError, "gdr_map failed (ret=", ret, ") for addr ", (void*)gpuAddr); } - hostDstPtr_ = reinterpret_cast(reinterpret_cast(barPtr_) + pageOffset); + pimpl_->hostDstPtr = reinterpret_cast(reinterpret_cast(pimpl_->barPtr) + pageOffset); - INFO(GPU, "GDRCopy mapping established: GPU addr ", (void*)gpuAddr, " -> host ptr ", (const void*)hostDstPtr_); + INFO(GPU, "GDRCopy mapping established: GPU addr ", (void*)gpuAddr, " -> host ptr ", (const void*)pimpl_->hostDstPtr); } GdrMap::~GdrMap() { - if (barPtr_ != nullptr) { - (void)gdr_unmap(ctx_->handle(), mh_, barPtr_, mappedSize_); - } - if (hostDstPtr_ != nullptr) { - (void)gdr_unpin_buffer(ctx_->handle(), mh_); + if (pimpl_) { + if (pimpl_->barPtr != nullptr) { + (void)gdr_unmap(pimpl_->ctx->handle(), pimpl_->mh, pimpl_->barPtr, pimpl_->mappedSize); + } + if (pimpl_->hostDstPtr != nullptr) { + (void)gdr_unpin_buffer(pimpl_->ctx->handle(), pimpl_->mh); + } } } -void GdrMap::copyTo(const void* src, size_t size) { gdr_copy_to_mapping(mh_, hostDstPtr_, src, size); } +bool GdrMap::valid() const { return pimpl_ && pimpl_->hostDstPtr != nullptr; } -void GdrMap::copyFrom(void* dst, size_t size) const { gdr_copy_from_mapping(mh_, dst, hostDstPtr_, size); } +uint64_t* GdrMap::hostPtr() const { return pimpl_ ? pimpl_->hostDstPtr : nullptr; } + +void GdrMap::copyTo(const void* src, size_t size) { gdr_copy_to_mapping(pimpl_->mh, pimpl_->hostDstPtr, src, size); } + +void GdrMap::copyFrom(void* dst, size_t size) const { + gdr_copy_from_mapping(pimpl_->mh, dst, pimpl_->hostDstPtr, size); +} } // namespace mscclpp @@ -135,6 +181,24 @@ GdrStatus gdrStatus() { return GdrStatus::NotBuilt; } bool gdrEnabled() { return false; } +const char* gdrStatusMessage() { return "mscclpp was not built with GDRCopy support (MSCCLPP_USE_GDRCOPY not set)"; } + +// GdrMap::Impl — stub (no GDRCopy) + +struct GdrMap::Impl {}; + +GdrMap::GdrMap(std::shared_ptr /*gpuMem*/, int /*deviceId*/) {} + +GdrMap::~GdrMap() = default; + +bool GdrMap::valid() const { return false; } + +uint64_t* GdrMap::hostPtr() const { return nullptr; } + +void GdrMap::copyTo(const void* /*src*/, size_t /*size*/) {} + +void GdrMap::copyFrom(void* /*dst*/, size_t /*size*/) const {} + } // namespace mscclpp #endif // !defined(MSCCLPP_USE_GDRCOPY) diff --git a/src/core/ib.cc b/src/core/ib.cc index c82b147a..f783daa9 100644 --- a/src/core/ib.cc +++ b/src/core/ib.cc @@ -67,8 +67,7 @@ static inline bool isDmabufSupportedByGpu(int gpuId) { return ret; } -IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isMlx5) - : mr_(nullptr), buff_(buff), size_(0), isDmabuf_(false), isDataDirect_(false) { +IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isDataDirect) : mr_(nullptr), buff_(buff), size_(0) { if (size == 0) { THROW(NET, Error, ErrorCode::InvalidUsage, "invalid MR size: 0"); } @@ -91,11 +90,8 @@ IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isMlx5) int accessFlags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_RELAXED_ORDERING | IBV_ACCESS_REMOTE_ATOMIC; #if defined(MSCCLPP_USE_MLX5DV) - if (isMlx5 && MLX5DV::isAvailable()) { + if (isDataDirect && MLX5DV::isAvailable()) { mr_ = MLX5DV::mlx5dv_reg_dmabuf_mr(pd, offsetInDmaBuf, size, buffIntPtr, fd, accessFlags); - if (mr_ != nullptr) { - isDataDirect_ = true; - } } #endif if (mr_ == nullptr) { @@ -105,7 +101,6 @@ IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isMlx5) if (mr_ == nullptr) { THROW(NET, IbError, errno, "ibv_reg_dmabuf_mr failed (errno ", errno, ")"); } - isDmabuf_ = true; #else // defined(MSCCLPP_USE_ROCM) THROW(NET, Error, ErrorCode::InvalidUsage, "We don't support DMABUF on HIP platforms yet"); #endif // defined(MSCCLPP_USE_ROCM) @@ -145,12 +140,8 @@ const void* IbMr::getBuff() const { return buff_; } uint32_t IbMr::getLkey() const { return mr_->lkey; } -bool IbMr::isDmabuf() const { return isDmabuf_; } - -bool IbMr::isDataDirect() const { return isDataDirect_; } - IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendCqSize, int maxSendCqPollNum, - int maxSendWr, int maxRecvWr, int maxWrPerSend, bool noAtomic, bool isMlx5) + int maxSendWr, int maxRecvWr, int maxWrPerSend, bool noAtomic) : portNum_(portNum), gidIndex_(gidIndex), info_(), @@ -171,8 +162,7 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendC maxSendWr_(maxSendWr), maxWrPerSend_(maxWrPerSend), maxRecvWr_(maxRecvWr), - noAtomic_(noAtomic), - isMlx5_(isMlx5) { + noAtomic_(noAtomic) { sendCq_ = IBVerbs::ibv_create_cq(ctx, maxSendCqSize, nullptr, nullptr, 0); if (sendCq_ == nullptr) { THROW(NET, IbError, errno, "ibv_create_cq failed (errno ", errno, ")"); @@ -186,47 +176,21 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendC } } - struct ibv_qp* qp = nullptr; -#if defined(MSCCLPP_USE_MLX5DV) - if (isMlx5_) { - struct ibv_qp_init_attr_ex qpInitAttrEx = {}; - qpInitAttrEx.sq_sig_all = 0; - qpInitAttrEx.send_cq = sendCq_; - qpInitAttrEx.recv_cq = (recvCq_ != nullptr) ? recvCq_ : sendCq_; - qpInitAttrEx.qp_type = IBV_QPT_RC; - qpInitAttrEx.cap.max_send_wr = maxSendWr; - qpInitAttrEx.cap.max_recv_wr = maxRecvWr; - qpInitAttrEx.cap.max_send_sge = 1; - qpInitAttrEx.cap.max_recv_sge = 1; - qpInitAttrEx.cap.max_inline_data = 0; - qpInitAttrEx.pd = pd; - qpInitAttrEx.comp_mask = IBV_QP_INIT_ATTR_PD; + struct ibv_qp_init_attr qpInitAttr = {}; + qpInitAttr.sq_sig_all = 0; + qpInitAttr.send_cq = sendCq_; + // Use separate recv CQ if created, otherwise use the send CQ + qpInitAttr.recv_cq = (recvCq_ != nullptr) ? recvCq_ : sendCq_; + qpInitAttr.qp_type = IBV_QPT_RC; + qpInitAttr.cap.max_send_wr = maxSendWr; + qpInitAttr.cap.max_recv_wr = maxRecvWr; + qpInitAttr.cap.max_send_sge = 1; + qpInitAttr.cap.max_recv_sge = 1; + qpInitAttr.cap.max_inline_data = 0; - struct mlx5dv_qp_init_attr mlx5QpAttr = {}; - - qp = MLX5DV::mlx5dv_create_qp(ctx, &qpInitAttrEx, &mlx5QpAttr); - if (qp == nullptr) { - THROW(NET, IbError, errno, "mlx5dv_create_qp failed (errno ", errno, ")"); - } - } else -#endif // defined(MSCCLPP_USE_MLX5DV) - { - struct ibv_qp_init_attr qpInitAttr = {}; - qpInitAttr.sq_sig_all = 0; - qpInitAttr.send_cq = sendCq_; - // Use separate recv CQ if created, otherwise use the send CQ - qpInitAttr.recv_cq = (recvCq_ != nullptr) ? recvCq_ : sendCq_; - qpInitAttr.qp_type = IBV_QPT_RC; - qpInitAttr.cap.max_send_wr = maxSendWr; - qpInitAttr.cap.max_recv_wr = maxRecvWr; - qpInitAttr.cap.max_send_sge = 1; - qpInitAttr.cap.max_recv_sge = 1; - qpInitAttr.cap.max_inline_data = 0; - - qp = IBVerbs::ibv_create_qp(pd, &qpInitAttr); - if (qp == nullptr) { - THROW(NET, IbError, errno, "ibv_create_qp failed (errno ", errno, ")"); - } + struct ibv_qp* qp = IBVerbs::ibv_create_qp(pd, &qpInitAttr); + if (qp == nullptr) { + THROW(NET, IbError, errno, "ibv_create_qp failed (errno ", errno, ")"); } struct ibv_port_attr portAttr; @@ -483,12 +447,29 @@ std::string IbQp::getRecvWcStatusString(int idx) const { return IBVerbs::ibv_wc_ unsigned int IbQp::getRecvWcImmData(int idx) const { return ntohl((*recvWcs_)[idx].imm_data); } IbCtx::IbCtx(const std::string& devName) - : devName_(devName), ctx_(nullptr), pd_(nullptr), supportsRdmaAtomics_(false), isMlx5_(false) { + : devName_(devName), + ctx_(nullptr), + pd_(nullptr), + supportsRdmaAtomics_(false), + isMlx5_(false), + dataDirect_(false), + isVF_(false) { int num; struct ibv_device** devices = IBVerbs::ibv_get_device_list(&num); for (int i = 0; i < num; ++i) { if (std::string(devices[i]->name) == devName_) { ctx_ = IBVerbs::ibv_open_device(devices[i]); + + // Detect if this IB device is a Virtual Function (VF). + // VFs have a 'physfn' sysfs symlink pointing to their parent PF; PFs do not. + { + std::string physfnPath = "/sys/class/infiniband/" + devName_ + "/device/physfn"; + isVF_ = (access(physfnPath.c_str(), F_OK) == 0); + if (isVF_) { + INFO(NET, "IB device ", devName_, " is a Virtual Function (Data Direct ordering available)"); + } + } + #if defined(MSCCLPP_USE_MLX5DV) if (MLX5DV::isAvailable()) { isMlx5_ = MLX5DV::mlx5dv_is_supported(devices[i]); @@ -509,6 +490,20 @@ IbCtx::IbCtx(const std::string& devName) THROW(NET, IbError, errno, "ibv_alloc_pd failed (errno ", errno, ")"); } + // Detect Data Direct support via mlx5dv_get_data_direct_sysfs_path +#if defined(MSCCLPP_USE_MLX5DV) + if (isMlx5_ && MLX5DV::isAvailable()) { + char sysfsPath[256]; + int ret = MLX5DV::mlx5dv_get_data_direct_sysfs_path(ctx_, sysfsPath, sizeof(sysfsPath)); + if (ret == 0) { + dataDirect_ = true; + INFO(NET, "IB device ", devName_, " supports Data Direct (sysfs: ", sysfsPath, ")"); + } else { + INFO(NET, "IB device ", devName_, " does not support Data Direct"); + } + } +#endif // defined(MSCCLPP_USE_MLX5DV) + // Query and cache RDMA atomics capability struct ibv_device_attr attr = {}; if (IBVerbs::ibv_query_device(ctx_, &attr) == 0) { @@ -579,17 +574,21 @@ std::shared_ptr IbCtx::createQp(int port, int gidIndex, int maxSendCqSize, THROW(NET, Error, ErrorCode::InvalidUsage, "invalid IB port: ", port); } return std::shared_ptr(new IbQp(ctx_, pd_, port, gidIndex, maxSendCqSize, maxSendCqPollNum, maxSendWr, - maxRecvWr, maxWrPerSend, noAtomic, isMlx5_)); + maxRecvWr, maxWrPerSend, noAtomic)); } std::unique_ptr IbCtx::registerMr(void* buff, std::size_t size) { - return std::unique_ptr(new IbMr(pd_, buff, size, isMlx5_)); + return std::unique_ptr(new IbMr(pd_, buff, size, dataDirect_)); } bool IbCtx::supportsRdmaAtomics() const { return supportsRdmaAtomics_; } bool IbCtx::isMlx5() const { return isMlx5_; } +bool IbCtx::supportsDataDirect() const { return dataDirect_; } + +bool IbCtx::isVirtualFunction() const { return isVF_; } + MSCCLPP_API_CPP int getIBDeviceCount() { int num; IBVerbs::ibv_get_device_list(&num); @@ -699,8 +698,6 @@ IbMr::~IbMr() {} IbMrInfo IbMr::getInfo() const { return IbMrInfo(); } const void* IbMr::getBuff() const { return nullptr; } uint32_t IbMr::getLkey() const { return 0; } -bool IbMr::isDmabuf() const { return false; } -bool IbMr::isDataDirect() const { return false; } IbQp::~IbQp() {} void IbQp::rtr(const IbQpInfo& /*info*/) {} diff --git a/src/core/ibverbs_wrapper.cc b/src/core/ibverbs_wrapper.cc index 51f3f29c..4fdf1b1e 100644 --- a/src/core/ibverbs_wrapper.cc +++ b/src/core/ibverbs_wrapper.cc @@ -10,19 +10,37 @@ #include "logger.hpp" +// NOTE: MRC_SUPPORT is a temporal macro that makes the current MRC implementation work. +// MRC_SUPPORT is needed because the current libibverbs implmentation of MRC does not provide +// all symbols that we need, so we need to load some symbols from the original libibverbs. +// This macro will be removed (set 0) once MRC provides all necessary symbols. +// Non-MRC environments will not be affected by this macro as long as VMRC_LIBIBVERBS_SO +// environment variable is not set. +#define MRC_SUPPORT 1 +#if (MRC_SUPPORT) +#include +#include +#endif // (MRC_SUPPORT) + namespace mscclpp { static std::unique_ptr globalIBVerbsHandle(nullptr, &::dlclose); +#if (MRC_SUPPORT) +static std::unique_ptr globalOrigIBVerbsHandle(nullptr, &::dlclose); +#endif // (MRC_SUPPORT) void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) { +#if (MRC_SUPPORT) + static std::set mrcSymbols = { + "ibv_get_device_list", "ibv_get_device_name", "ibv_open_device", "ibv_close_device", "ibv_query_qp", + "ibv_create_cq", "ibv_destroy_cq", "ibv_create_qp", "ibv_modify_qp", "ibv_destroy_qp", + }; +#endif // (MRC_SUPPORT) if (!globalIBVerbsHandle) { if (mscclpp::env()->ibvSo != "") { void* handle = ::dlopen(mscclpp::env()->ibvSo.c_str(), RTLD_NOW); if (handle) { globalIBVerbsHandle.reset(handle); - } else { - THROW(NET, SysError, errno, "Failed to load libibverbs library specified by MSCCLPP_IBV_SO ('", - mscclpp::env()->ibvSo, "'): ", std::string(::dlerror())); } } else { const char* possibleLibNames[] = {"libibverbs.so", "libibverbs.so.1", nullptr}; @@ -38,7 +56,26 @@ void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) { THROW(NET, SysError, errno, "Failed to open libibverbs: ", std::string(::dlerror())); } } +#if (MRC_SUPPORT) + // In MRC mode, `VMRC_LIBIBVERBS_SO` should be set. + char* vmrcLibibverbsSo = ::getenv("VMRC_LIBIBVERBS_SO"); + void* ptr; + if (vmrcLibibverbsSo != nullptr && mrcSymbols.find(symbol) == mrcSymbols.end()) { + // If we are in MRC mode and the symbol is not in the table, get it from the original libibverbs. + if (!globalOrigIBVerbsHandle) { + void* handle = ::dlopen(vmrcLibibverbsSo, RTLD_NOW); + if (!handle) { + THROW(NET, SysError, errno, "Failed to open ", std::string(vmrcLibibverbsSo)); + } + globalOrigIBVerbsHandle.reset(handle); + } + ptr = ::dlsym(globalOrigIBVerbsHandle.get(), symbol.c_str()); + } else { + ptr = ::dlsym(globalIBVerbsHandle.get(), symbol.c_str()); + } +#else // !(MRC_SUPPORT) void* ptr = ::dlsym(globalIBVerbsHandle.get(), symbol.c_str()); +#endif // !(MRC_SUPPORT) if (!ptr && !allowReturnNull) { THROW(NET, SysError, errno, "Failed to load libibverbs symbol: ", symbol); } diff --git a/src/core/include/connection.hpp b/src/core/include/connection.hpp index f2ed2c8b..47b03d6c 100644 --- a/src/core/include/connection.hpp +++ b/src/core/include/connection.hpp @@ -37,16 +37,18 @@ class BaseConnection { virtual void flush(int64_t timeoutUsec = -1) = 0; - /// Set the local address where forwarded signals should be written. - /// This is called by the receiver to specify where incoming signals should be forwarded. - /// Default implementation is a no-op for connections that don't need it. - /// @param mem Shared pointer to the memory for incoming writes (nullptr to clear). - virtual void setSignalForwardingDst(std::shared_ptr /*mem*/) {} + /// Start signal forwarding to the given memory address. + /// Called by the semaphore to specify where incoming signals should be written. + /// @param mem Shared pointer to the GPU memory for the signal token. + virtual void startSignalForwarding(std::shared_ptr /*mem*/) {} + + /// Stop signal forwarding and release associated resources. + virtual void stopSignalForwarding() {} /// Whether this connection uses signal forwarding (e.g., IB host-no-atomic mode). /// When true, the semaphore must allocate a separate inboundToken_ for the recv thread to write to. /// When false, the NIC writes directly to the semaphore's registered memory (e.g., via atomics). - virtual bool usesSignalForwarding() const { return false; } + virtual bool isSignalForwarding() const { return false; } virtual Transport transport() const = 0; @@ -105,31 +107,20 @@ class IBConnection : public BaseConnection { // For write-with-imm mode (HostNoAtomic): uses RDMA write-with-imm to signal // instead of atomic operations, with a host thread forwarding to GPU for memory consistency. bool ibNoAtomic_; + bool gdrSignalForwarding_; // ibNoAtomic_ && gdrEnabled() — decided once at construction std::thread recvThread_; std::atomic stopRecvThread_; int localGpuDeviceId_; // Local GPU device ID for CUDA context and GDR mapping - // Write-with-imm design: - // - Sender: 8-byte RDMA write-with-imm from local host buffer to remote signal GPU buffer, - // carrying the token value both as RDMA payload and in imm_data (32-bit). - // - Receiver: reads the full 64-bit token from the local signal GPU buffer (via BAR1 or - // volatile read), then writes it to remoteUpdateDstAddr_ (the semaphore's inbound token). - uint64_t remoteUpdateDstAddr_; + // Signal forwarding design (HostNoAtomic mode): + // - Sender: 0-byte RDMA WRITE_WITH_IMM carrying the token value in imm_data (32-bit). + // - Receiver: CPU recv thread polls recv CQ for WRITE_WITH_IMM completions (CQE), reads + // the token from imm_data, then writes it to signalAddr_ (the semaphore's + // inbound token) via atomicStore through the GDRCopy BAR1 mapping. The GPU reads + // inboundToken with system-scope acquire ordering. + uint64_t signalAddr_; - // Remote endpoint's signal GPU buffer MR info (destination for RDMA write-with-imm). - // The local host buffer (atomicSrc_ / atomicSrcTransportInfo_.ibMr) serves as the source. - IbMrInfo remoteSignalGpuMrInfo_; - - std::unique_ptr remoteUpdateDstAddrMap_; - std::unique_ptr localSignalGpuMap_; - uint64_t* localSignalGpuPtr_; - - // When true, recvThreadFunc reads the token from imm_data (from CQE) instead of the - // signal GPU buffer via GDRCopy. Enabled only when all Data Direct conditions are met: - // the signal GPU buffer MR is registered with MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT, - // and all GDRCopy mappings (local signal buffer and remoteUpdateDstAddr) are valid, - // so both RDMA data writes and GDRCopy token writes go through the Data Direct engine. - bool dataDirectEnabled_; + std::unique_ptr signalGdrMap_; void recvThreadFunc(); @@ -137,12 +128,15 @@ class IBConnection : public BaseConnection { IBConnection(std::shared_ptr context, const Endpoint& localEndpoint, const Endpoint& remoteEndpoint); ~IBConnection(); - /// Set the local address where forwarded signals should be written. - /// Must be called before the remote sends any updateAndSync in host-no-atomic mode. - /// @param mem Shared pointer to the memory for incoming writes (nullptr to clear). - void setSignalForwardingDst(std::shared_ptr mem) override; + /// Start signal forwarding to the given memory address. + /// Must be called before the remote sends any updateAndSync in HostNoAtomic mode. + /// @param mem Shared pointer to the GPU memory for the signal token. + void startSignalForwarding(std::shared_ptr mem) override; - bool usesSignalForwarding() const override; + /// Stop signal forwarding and release associated resources. + void stopSignalForwarding() override; + + bool isSignalForwarding() const override; Transport transport() const override; diff --git a/src/core/include/endpoint.hpp b/src/core/include/endpoint.hpp index 1548d527..363faab1 100644 --- a/src/core/include/endpoint.hpp +++ b/src/core/include/endpoint.hpp @@ -6,7 +6,6 @@ #include #include -#include #include #include "ib.hpp" @@ -30,13 +29,6 @@ struct Endpoint::Impl { std::shared_ptr ibQp_; IbQpInfo ibQpInfo_; - // Signal GPU buffer for write-with-imm data payload (ibNoAtomic_ only). - // Each endpoint allocates a 64-bit GPU buffer and registers it as an IB MR. - // The MR info is serialized/exchanged so the remote can RDMA-write to it. - std::shared_ptr ibSignalGpuBuffer_; - std::unique_ptr ibSignalGpuMr_; - IbMrInfo ibSignalGpuMrInfo_; - // The following are only used for Ethernet and are undefined for other transports. std::unique_ptr socket_; SocketAddress socketAddress_; diff --git a/src/core/include/gdr.hpp b/src/core/include/gdr.hpp index bde2986a..e0c7f006 100644 --- a/src/core/include/gdr.hpp +++ b/src/core/include/gdr.hpp @@ -4,6 +4,10 @@ #ifndef MSCCLPP_GDR_HPP_ #define MSCCLPP_GDR_HPP_ +#include +#include +#include + namespace mscclpp { enum class GdrStatus { @@ -20,25 +24,14 @@ GdrStatus gdrStatus(); /// Whether the global GDRCopy context is enabled (shorthand for gdrStatus() == GdrStatus::Ok). bool gdrEnabled(); -} // namespace mscclpp +/// Return a human-readable error message for the current GDRCopy status. +const char* gdrStatusMessage(); -#include -#include -#include - -#if defined(MSCCLPP_USE_GDRCOPY) - -#include - -namespace mscclpp { - -class GdrContext; - -/// RAII wrapper for a per-connection GDRCopy BAR1 mapping of a GPU address. +/// RAII wrapper for a GDRCopy BAR1 mapping of a GPU address. +/// When GDRCopy is not available, all operations are no-ops and valid() returns false. class GdrMap { public: /// Pin and map a GPU address for direct host-side access. - /// Holds a shared reference to the GPU memory to keep it alive. /// @param gpuMem Shared pointer to the GPU memory (e.g. from gpuCallocShared). /// @param deviceId The CUDA device ID for setting context. GdrMap(std::shared_ptr gpuMem, int deviceId); @@ -48,10 +41,10 @@ class GdrMap { GdrMap& operator=(const GdrMap&) = delete; /// Whether the mapping was established successfully. - bool valid() const { return hostDstPtr_ != nullptr; } + bool valid() const; /// Return the BAR1-mapped host pointer to the GPU location. - uint64_t* hostPtr() const { return hostDstPtr_; } + uint64_t* hostPtr() const; /// Copy data from host memory to the mapped GPU location. void copyTo(const void* src, size_t size); @@ -60,36 +53,10 @@ class GdrMap { void copyFrom(void* dst, size_t size) const; private: - std::shared_ptr ctx_; - std::shared_ptr gpuMem_; - gdr_mh_t mh_; - void* barPtr_; - uint64_t* hostDstPtr_; - size_t mappedSize_; + struct Impl; + std::unique_ptr pimpl_; }; } // namespace mscclpp -#else // !defined(MSCCLPP_USE_GDRCOPY) - -namespace mscclpp { - -/// Stub GdrMap when GDRCopy is not available. -class GdrMap { - public: - GdrMap(std::shared_ptr /*gpuMem*/, int /*deviceId*/) {} - ~GdrMap() = default; - - GdrMap(const GdrMap&) = delete; - GdrMap& operator=(const GdrMap&) = delete; - - bool valid() const { return false; } - void copyTo(const void* /*src*/, size_t /*size*/) {} - void copyFrom(void* /*dst*/, size_t /*size*/) const {} - uint64_t* hostPtr() const { return nullptr; } -}; - -} // namespace mscclpp - -#endif // !defined(MSCCLPP_USE_GDRCOPY) #endif // MSCCLPP_GDR_HPP_ diff --git a/src/core/include/ib.hpp b/src/core/include/ib.hpp index 9e5a454c..923a7ca0 100644 --- a/src/core/include/ib.hpp +++ b/src/core/include/ib.hpp @@ -34,17 +34,13 @@ class IbMr { IbMrInfo getInfo() const; const void* getBuff() const; uint32_t getLkey() const; - bool isDmabuf() const; - bool isDataDirect() const; private: - IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isMlx5); + IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isDataDirect); ibv_mr* mr_; void* buff_; std::size_t size_; - bool isDmabuf_; - bool isDataDirect_; friend class IbCtx; }; @@ -92,7 +88,6 @@ class IbQp { int getRecvWcStatus(int idx) const; std::string getRecvWcStatusString(int idx) const; unsigned int getRecvWcImmData(int idx) const; - bool isMlx5() const { return isMlx5_; } private: struct SendWrInfo { @@ -106,7 +101,7 @@ class IbQp { }; IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxSendCqSize, int maxSendCqPollNum, int maxSendWr, - int maxRecvWr, int maxWrPerSend, bool noAtomic, bool isMlx5); + int maxRecvWr, int maxWrPerSend, bool noAtomic); SendWrInfo getNewSendWrInfo(); RecvWrInfo getNewRecvWrInfo(); @@ -134,7 +129,6 @@ class IbQp { const int maxWrPerSend_; const int maxRecvWr_; const bool noAtomic_; - const bool isMlx5_; friend class IbCtx; }; @@ -150,6 +144,8 @@ class IbCtx { std::unique_ptr registerMr(void* buff, std::size_t size); bool supportsRdmaAtomics() const; bool isMlx5() const; + bool supportsDataDirect() const; + bool isVirtualFunction() const; #else IbCtx([[maybe_unused]] const std::string& devName) {} ~IbCtx() {} @@ -160,6 +156,8 @@ class IbCtx { } bool supportsRdmaAtomics() const { return false; } bool isMlx5() const { return false; } + bool supportsDataDirect() const { return false; } + bool isVirtualFunction() const { return false; } #endif const std::string& getDevName() const { return devName_; }; @@ -173,6 +171,8 @@ class IbCtx { ibv_pd* pd_; bool supportsRdmaAtomics_; bool isMlx5_; + bool dataDirect_; + bool isVF_; }; } // namespace mscclpp diff --git a/src/core/include/mlx5dv_wrapper.hpp b/src/core/include/mlx5dv_wrapper.hpp index 654b086c..79403a36 100644 --- a/src/core/include/mlx5dv_wrapper.hpp +++ b/src/core/include/mlx5dv_wrapper.hpp @@ -6,7 +6,7 @@ #if defined(MSCCLPP_USE_MLX5DV) -#include +#include #include @@ -19,14 +19,14 @@ struct MLX5DV { /// Check if the given IB device supports mlx5 Direct Verbs. static bool mlx5dv_is_supported(struct ibv_device* device); - /// Create a QP using mlx5dv extensions. - static struct ibv_qp* mlx5dv_create_qp(struct ibv_context* ctx, struct ibv_qp_init_attr_ex* qpAttr, - struct mlx5dv_qp_init_attr* mlx5QpAttr); - /// Register a DMABUF memory region using mlx5dv extensions. /// Returns nullptr if mlx5dv_reg_dmabuf_mr is not available in this rdma-core version. static struct ibv_mr* mlx5dv_reg_dmabuf_mr(struct ibv_pd* pd, uint64_t offset, size_t length, uint64_t iova, int fd, - int access); + int access); + + /// Query the Data Direct sysfs path for the given IB context. + /// Returns 0 on success (device supports Data Direct), non-zero otherwise. + static int mlx5dv_get_data_direct_sysfs_path(struct ibv_context* context, char* buf, size_t buf_len); private: static void* dlsym(const std::string& symbol, bool allowReturnNull = false); diff --git a/src/core/mlx5dv_wrapper.cc b/src/core/mlx5dv_wrapper.cc index b1c398ee..5d13d9c8 100644 --- a/src/core/mlx5dv_wrapper.cc +++ b/src/core/mlx5dv_wrapper.cc @@ -3,9 +3,19 @@ #if defined(MSCCLPP_USE_MLX5DV) +// _GNU_SOURCE is required for dlvsym() +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + #include "mlx5dv_wrapper.hpp" #include +#include + +#ifndef MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT +#define MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT (1 << 0) +#endif #include @@ -72,14 +82,6 @@ bool MLX5DV::mlx5dv_is_supported(struct ibv_device* device) { return impl(device); } -struct ibv_qp* MLX5DV::mlx5dv_create_qp(struct ibv_context* ctx, struct ibv_qp_init_attr_ex* qpAttr, - struct mlx5dv_qp_init_attr* mlx5QpAttr) { - using FuncType = struct ibv_qp* (*)(struct ibv_context*, struct ibv_qp_init_attr_ex*, struct mlx5dv_qp_init_attr*); - static FuncType impl = nullptr; - if (!impl) impl = reinterpret_cast(MLX5DV::dlsym("mlx5dv_create_qp")); - return impl(ctx, qpAttr, mlx5QpAttr); -} - struct ibv_mr* MLX5DV::mlx5dv_reg_dmabuf_mr(struct ibv_pd* pd, uint64_t offset, size_t length, uint64_t iova, int fd, int access) { // mlx5dv_reg_dmabuf_mr(pd, offset, length, iova, fd, access, mlx5_access) — the last arg is mlx5-specific flags. @@ -92,12 +94,27 @@ struct ibv_mr* MLX5DV::mlx5dv_reg_dmabuf_mr(struct ibv_pd* pd, uint64_t offset, resolved = true; } if (!impl) return nullptr; -#ifndef MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT -#define MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT (1 << 0) -#endif return impl(pd, offset, length, iova, fd, access, MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT); } +int MLX5DV::mlx5dv_get_data_direct_sysfs_path(struct ibv_context* context, char* buf, size_t buf_len) { + using FuncType = int (*)(struct ibv_context*, char*, size_t); + static FuncType impl = nullptr; + static bool resolved = false; + if (!resolved) { + if (globalMLX5Handle) { + void* ptr = dlvsym(globalMLX5Handle.get(), "mlx5dv_get_data_direct_sysfs_path", "MLX5_1.25"); + if (!ptr) { + ptr = MLX5DV::dlsym("mlx5dv_get_data_direct_sysfs_path", /*allowReturnNull=*/true); + } + impl = ptr ? reinterpret_cast(ptr) : nullptr; + } + resolved = true; + } + if (!impl) return -1; + return impl(context, buf, buf_len); +} + } // namespace mscclpp #endif // defined(MSCCLPP_USE_MLX5DV) diff --git a/src/core/semaphore.cc b/src/core/semaphore.cc index c6299dec..53635a0b 100644 --- a/src/core/semaphore.cc +++ b/src/core/semaphore.cc @@ -123,19 +123,18 @@ MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(const Semaphore& sema THROW(CONN, Error, ErrorCode::InvalidUsage, "Local endpoint device type of Host2DeviceSemaphore should be GPU"); } auto connImpl = BaseConnection::getImpl(connection()); - if (connImpl->usesSignalForwarding()) { - // Signal forwarding mode: the recv thread writes the token to GPU memory. - // Allocate a separate inbound token via plain cudaMalloc (not TokenPool/VMM) - // so that it is always compatible with GDRCopy pinning (VMM memory cannot be pinned by gdr_pin_buffer). + if (connImpl->isSignalForwarding()) { + // Signal forwarding (HostNoAtomic): the receiver's recv thread polls the recv CQ for + // WRITE_WITH_IMM completions, then forwards the token to inboundToken_ via GDRCopy. CudaDeviceGuard deviceGuard(connection().localDevice().id); #if defined(MSCCLPP_USE_ROCM) inboundToken_ = detail::gpuCallocUncachedShared(); #else inboundToken_ = detail::gpuCallocShared(); #endif - connImpl->setSignalForwardingDst(inboundToken_); + connImpl->startSignalForwarding(inboundToken_); } - // When usesSignalForwarding() is false (e.g., atomic mode), inboundToken_ stays null + // When isSignalForwarding() is false (atomic mode), inboundToken_ stays null // and the GPU polls the SemaphoreStub token directly (the NIC atomic target). } @@ -144,9 +143,9 @@ MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(Communicator& communi MSCCLPP_API_CPP Host2DeviceSemaphore::~Host2DeviceSemaphore() { if (inboundToken_) { - // Clear the connection's signal forwarding destination (and any associated GdrMap) + // Clear the connection's signal forwarding destination (and GdrMap) // before inboundToken_ is freed, to avoid use-after-free on the pinned GPU memory. - BaseConnection::getImpl(connection())->setSignalForwardingDst(nullptr); + BaseConnection::getImpl(connection())->stopSignalForwarding(); } } @@ -158,7 +157,7 @@ MSCCLPP_API_CPP void Host2DeviceSemaphore::signal() { MSCCLPP_API_CPP Host2DeviceSemaphore::DeviceHandle Host2DeviceSemaphore::deviceHandle() const { Host2DeviceSemaphore::DeviceHandle device; - // If inboundToken_ is allocated (host-no-atomic mode), the GPU polls it. + // If inboundToken_ is allocated (signal forwarding mode), the GPU polls it. // Otherwise (atomic mode), the GPU polls the SemaphoreStub token directly, // which is the same address targeted by the NIC's atomic operation. device.inboundToken = @@ -178,12 +177,12 @@ MSCCLPP_API_CPP Host2HostSemaphore::Host2HostSemaphore(const Semaphore& semaphor THROW(CONN, Error, ErrorCode::InvalidUsage, "Local endpoint device type of Host2HostSemaphore should be CPU"); } auto connImpl = BaseConnection::getImpl(connection()); - if (connImpl->usesSignalForwarding()) { + if (connImpl->isSignalForwarding()) { // Signal forwarding mode: tell the recv thread where to write the incoming token. // Non-owning shared_ptr: Host2HostSemaphore outlives the connection, so the memory stays valid. auto token = std::shared_ptr(reinterpret_cast(semaphore_.localMemory().data()), [](uint64_t*) {}); - connImpl->setSignalForwardingDst(std::move(token)); + connImpl->startSignalForwarding(std::move(token)); } } diff --git a/test/framework.cc b/test/framework.cc index 73cf1272..f5bf55aa 100644 --- a/test/framework.cc +++ b/test/framework.cc @@ -285,6 +285,9 @@ int TestRegistry::runAllTests(int argc, char* argv[]) { passed++; } else { std::cout << "[ FAILED ] " << fullName << std::endl; + if (!gCurrentTestFailureMessage.empty()) { + std::cout << " Reason: " << gCurrentTestFailureMessage << std::endl; + } failed++; } } diff --git a/test/mp_unit/ib_tests.cu b/test/mp_unit/ib_tests.cu index 5809dd2f..8c91db66 100644 --- a/test/mp_unit/ib_tests.cu +++ b/test/mp_unit/ib_tests.cu @@ -3,8 +3,12 @@ #include +#include +#include #include +#include +#include "gdr.hpp" #include "mp_unit_tests.hpp" #include "utils_internal.hpp" @@ -41,7 +45,10 @@ void IbPeerToPeerTest::SetUp() { ibCtx = std::make_shared(ibDevName); bool noAtomic = !ibCtx->supportsRdmaAtomics(); - qp = ibCtx->createQp(-1, ib_gid_index, 1024, 1, 8192, 0, 64, noAtomic); + // When atomics are not supported, the MemoryConsistency test uses + // write-with-imm which requires recv WRs on the receiver side. + int maxRecvWr = noAtomic ? 64 : 0; + qp = ibCtx->createQp(-1, ib_gid_index, 1024, 1, 8192, maxRecvWr, 64, noAtomic); qpInfo[gEnv->rank] = qp->getInfo(); bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo)); @@ -199,15 +206,34 @@ TEST(IbPeerToPeerTest, MemoryConsistency) { // This test needs only two ranks return; } - if (!ibCtx->supportsRdmaAtomics()) { - GTEST_SKIP() << "This test requires RDMA atomics support."; - } + + // Use atomic path if supported by the IB device. + bool useAtomic = ibCtx->supportsRdmaAtomics(); const uint64_t signalPeriod = 1024; const uint64_t maxIter = 10000; const uint64_t nelem = 65536 + 1; auto data = mscclpp::detail::gpuCallocUnique(nelem); + // For no-atomic mode: allocate a separate signal buffer for write-with-imm destination. + // The sender writes-with-imm to this buffer; the receiver's CPU thread reads the imm_data + // from the recv CQ and writes the iteration value to data[0] via GDRCopy atomicStore. + std::shared_ptr signalBuf; + std::unique_ptr signalMr; + std::array signalMrInfo{}; + if (!useAtomic) { + signalBuf = mscclpp::detail::gpuCallocShared(1); + signalMr = ibCtx->registerMr(signalBuf.get(), sizeof(uint64_t)); + signalMrInfo[gEnv->rank] = signalMr->getInfo(); + bootstrap->allGather(signalMrInfo.data(), sizeof(mscclpp::IbMrInfo)); + + // Pre-post recv WRs for write-with-imm on both ranks + for (int i = 0; i < 64; ++i) { + qp->stageRecv(0); + } + qp->postRecv(); + } + registerBufferAndConnect(data.get(), sizeof(uint64_t) * nelem); uint64_t res = 0; @@ -226,6 +252,40 @@ TEST(IbPeerToPeerTest, MemoryConsistency) { ASSERT_EQ(*ptrCurIter, 0); ASSERT_EQ(*ptrResult, 0); + // For no-atomic mode: create a GDRCopy mapping for data[0] and start a CPU thread that + // polls recv CQ and forwards the signal via GDRCopy BAR1 write — the same mechanism + // used by IBConnection::recvThreadFunc for port channels. + std::atomic stopRecvThread(false); + std::thread recvThread; + std::unique_ptr dataGdrMap; + if (!useAtomic) { + if (!mscclpp::gdrEnabled()) { + SKIP_TEST() << "No-atomic mode requires GDRCopy but it is not available."; + } + // Create GDRCopy BAR1 mapping for data[0] — same as how connection.cc maps inboundToken_ + dataGdrMap = + std::make_unique(std::shared_ptr(data.get(), [](void*) {}), // non-owning shared_ptr + cudaDevId); + + recvThread = std::thread([&]() { + while (!stopRecvThread.load(std::memory_order_relaxed)) { + int wcNum = qp->pollRecvCq(); + if (wcNum <= 0) continue; + for (int i = 0; i < wcNum; ++i) { + int status = qp->getRecvWcStatus(i); + if (status != static_cast(mscclpp::WsStatus::Success)) continue; + uint64_t val = static_cast(qp->getRecvWcImmData(i)); + // Write the iteration value to data[0] via GDRCopy BAR1 atomicStore — + // same pattern as IBConnection::recvThreadFunc. + mscclpp::atomicStore(dataGdrMap->hostPtr(), val, mscclpp::memoryOrderRelaxed); + // Re-post recv + qp->stageRecv(0); + qp->postRecv(); + } + } + }); + } + kernelMemoryConsistency<<<1, 1024>>>(data.get(), ptrCurIter, ptrResult, nelem, maxIter); MSCCLPP_CUDATHROW(cudaGetLastError()); @@ -247,6 +307,11 @@ TEST(IbPeerToPeerTest, MemoryConsistency) { } MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + if (!useAtomic) { + stopRecvThread.store(true, std::memory_order_relaxed); + if (recvThread.joinable()) recvThread.join(); + } } else if (gEnv->rank == 1) { // Sender std::vector hostBuffer(nelem, 0); @@ -267,15 +332,20 @@ TEST(IbPeerToPeerTest, MemoryConsistency) { stageSendWrite(sizeof(uint64_t) * (nelem - 1), 0, sizeof(uint64_t), sizeof(uint64_t), signaled); qp->postSend(); -#if 0 - // For reference: send the first element using a normal send. This should occasionally see a wrong result. - stageSendWrite(sizeof(uint64_t), 0, 0, 0, false); - qp->postSend(); -#else - // Send the first element using AtomicAdd. This should see the correct result. - stageSendAtomicAdd(0, 0, 1, false); - qp->postSend(); -#endif + if (useAtomic) { + // Send the first element using AtomicAdd. The non-posted PCIe atomic operation + // provides end-to-end ordering: data[1..N] are guaranteed visible when data[0] updates. + stageSendAtomicAdd(0, 0, 1, false); + qp->postSend(); + } else { + // No-atomic mode: send a 0-byte WRITE_WITH_IMM carrying the iteration in imm_data. + // The receiver's CPU thread polls the recv CQ and writes the value to data[0] + // via GDRCopy atomicStore. + // QP ordering guarantees data[1..N] WRITE completes before this write-with-imm. + const mscclpp::IbMrInfo& remoteSignalMrInfo = signalMrInfo[(gEnv->rank == 1) ? 0 : 1]; + qp->stageSendWriteWithImm(nullptr, remoteSignalMrInfo, 0, 0, 0, 0, false, static_cast(iter)); + qp->postSend(); + } if (signaled) { int wcNum = qp->pollSendCq(); @@ -296,13 +366,23 @@ TEST(IbPeerToPeerTest, MemoryConsistency) { } } - if (res & 2) { - FAIL() << "The receiver is stuck at iteration " << iter << "."; - } else if (res != 0 && res != 1) { - FAIL() << "Unknown error is detected at iteration " << iter << ". res =" << res; + if (useAtomic) { + // With RDMA atomics, memory consistency must be guaranteed. + if (res & 2) { + FAIL() << "The receiver is stuck at iteration " << iter << "."; + } + EXPECT_EQ(res, 0); + } else { + if (res == 0) { + // No-atomic path works correctly here. + } else if (res & 2) { + SKIP_TEST() << "No-atomic signal forwarding: receiver stuck at iteration " << iter + << ". NIC DMA and CPU writes are not ordered on this platform."; + } else { + SKIP_TEST() << "No-atomic signal forwarding: memory inconsistency detected at iteration " << iter + << ". NIC DMA and CPU writes are not ordered on this platform."; + } } - - EXPECT_EQ(res, 0); } TEST(IbPeerToPeerTest, SimpleAtomicAdd) { @@ -311,7 +391,7 @@ TEST(IbPeerToPeerTest, SimpleAtomicAdd) { return; } if (!ibCtx->supportsRdmaAtomics()) { - GTEST_SKIP() << "This test requires RDMA atomics support."; + SKIP_TEST() << "This test requires RDMA atomics support."; } mscclpp::Timer timeout(3); diff --git a/test/mp_unit/port_channel_tests.cu b/test/mp_unit/port_channel_tests.cu index 764c3299..b69f388a 100644 --- a/test/mp_unit/port_channel_tests.cu +++ b/test/mp_unit/port_channel_tests.cu @@ -4,9 +4,24 @@ #include #include +#include "gdr.hpp" #include "mp_unit_tests.hpp" #include "utils_internal.hpp" +// Skip the current test if HostNoAtomic mode is not supported. +// On CUDA, HostNoAtomic requires GDRCopy for BAR1 signal forwarding. +// On ROCm, HostNoAtomic uses direct volatile writes and does not need GDRCopy. +#if defined(MSCCLPP_USE_CUDA) +#define REQUIRE_HOST_NO_ATOMIC \ + do { \ + if (!mscclpp::gdrEnabled()) { \ + SKIP_TEST() << "HostNoAtomic requires GDRCopy: " << mscclpp::gdrStatusMessage(); \ + } \ + } while (0) +#else +#define REQUIRE_HOST_NO_ATOMIC // No extra requirements on non-CUDA platforms. +#endif + void PortChannelOneToOneTest::SetUp() { // Use only two ranks setNumRanksToUse(2); @@ -272,6 +287,7 @@ TEST(PortChannelOneToOneTest, PingPongPerfIbHostMode) { TEST(PortChannelOneToOneTest, PingPongPerfIbHostNoAtomicMode) { REQUIRE_IBVERBS; + REQUIRE_HOST_NO_ATOMIC; testPingPongPerf(PingPongTestParams{ .useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::HostNoAtomic}); } @@ -465,16 +481,19 @@ TEST(PortChannelOneToOneTest, PacketPingPongPerfIbHostMode) { TEST(PortChannelOneToOneTest, PacketPingPongPerfIbHostNoAtomicMode) { REQUIRE_IBVERBS; + REQUIRE_HOST_NO_ATOMIC; testPacketPingPongPerf(true, IbMode::HostNoAtomic); } TEST(PortChannelOneToOneTest, PingPongIbHostNoAtomicMode) { REQUIRE_IBVERBS; + REQUIRE_HOST_NO_ATOMIC; testPingPong(PingPongTestParams{ .useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::HostNoAtomic}); } TEST(PortChannelOneToOneTest, PacketPingPongIbHostNoAtomicMode) { REQUIRE_IBVERBS; + REQUIRE_HOST_NO_ATOMIC; testPacketPingPong(true, IbMode::HostNoAtomic); } diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 7836e063..a345effc 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -4,6 +4,7 @@ target_sources(unit_tests PRIVATE unit_tests_main.cc core_tests.cc + gdr_tests.cu gpu_utils_tests.cc errors_tests.cc fifo_tests.cu diff --git a/test/unit/gdr_tests.cu b/test/unit/gdr_tests.cu new file mode 100644 index 00000000..78bb2e1a --- /dev/null +++ b/test/unit/gdr_tests.cu @@ -0,0 +1,251 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include +#include +#include + +#include "../framework.hpp" +#include "gdr.hpp" + +// GdrStatus and gdrEnabled + +class GdrStatusTest : public ::mscclpp::test::TestCase {}; + +TEST(GdrStatusTest, StatusIsValid) { + // gdrStatus() should return one of the defined enum values + auto status = mscclpp::gdrStatus(); + ASSERT_TRUE(status == mscclpp::GdrStatus::Ok || status == mscclpp::GdrStatus::NotBuilt || + status == mscclpp::GdrStatus::Disabled || status == mscclpp::GdrStatus::DriverMissing || + status == mscclpp::GdrStatus::OpenFailed); +} + +TEST(GdrStatusTest, EnabledConsistentWithStatus) { + // gdrEnabled() should be true iff gdrStatus() == Ok + EXPECT_EQ(mscclpp::gdrEnabled(), mscclpp::gdrStatus() == mscclpp::GdrStatus::Ok); +} + +// GdrMap tests — only run when GDRCopy is available + +class GdrMapTest : public ::mscclpp::test::TestCase { + protected: + void SetUp() override { + if (!mscclpp::gdrEnabled()) { + SKIP_TEST() << "GDRCopy not enabled on this platform."; + } + MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId_)); + // Try creating a GDRCopy mapping to check if pin+map works on this platform. + try { + auto testMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap testMap(std::static_pointer_cast(testMem), deviceId_); + } catch (const std::exception&) { + SKIP_TEST() << "GDRCopy mapping not supported on this platform."; + } + } + + int deviceId_ = 0; +}; + +TEST(GdrMapTest, BasicMapping) { + // Allocate GPU memory via cudaMalloc (not VMM) and create a GDRCopy mapping + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + + ASSERT_TRUE(map.valid()); + EXPECT_NE(map.hostPtr(), nullptr); +} + +TEST(GdrMapTest, CopyToAndFrom) { + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + ASSERT_TRUE(map.valid()); + + // Write a value to GPU via GDRCopy + uint64_t writeVal = 0xDEADBEEFCAFE0123ULL; + map.copyTo(&writeVal, sizeof(uint64_t)); + + // Read it back via GDRCopy + uint64_t readVal = 0; + map.copyFrom(&readVal, sizeof(uint64_t)); + EXPECT_EQ(readVal, writeVal); + + // Also verify via cudaMemcpy + uint64_t cudaVal = 0; + MSCCLPP_CUDATHROW(cudaMemcpy(&cudaVal, gpuMem.get(), sizeof(uint64_t), cudaMemcpyDeviceToHost)); + EXPECT_EQ(cudaVal, writeVal); +} + +TEST(GdrMapTest, CopyToVisibleFromGpu) { + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + ASSERT_TRUE(map.valid()); + + // Write via GDRCopy, verify GPU sees it via cudaMemcpy + uint64_t val = 42; + map.copyTo(&val, sizeof(uint64_t)); + + uint64_t result = 0; + MSCCLPP_CUDATHROW(cudaMemcpy(&result, gpuMem.get(), sizeof(uint64_t), cudaMemcpyDeviceToHost)); + EXPECT_EQ(result, 42); +} + +TEST(GdrMapTest, MultipleWritesReadBack) { + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + ASSERT_TRUE(map.valid()); + + // Write multiple values sequentially and verify each + for (uint64_t i = 1; i <= 100; ++i) { + map.copyTo(&i, sizeof(uint64_t)); + uint64_t readback = 0; + map.copyFrom(&readback, sizeof(uint64_t)); + EXPECT_EQ(readback, i); + if (readback != i) break; + } +} + +TEST(GdrMapTest, HostPtrIsWritable) { + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + ASSERT_TRUE(map.valid()); + + // Write directly through the hostPtr (volatile store) + volatile uint64_t* ptr = reinterpret_cast(map.hostPtr()); + *ptr = 12345; + + // Read back via GDRCopy + uint64_t readback = 0; + map.copyFrom(&readback, sizeof(uint64_t)); + EXPECT_EQ(readback, 12345); +} + +TEST(GdrMapTest, HostPtrIsReadable) { + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + ASSERT_TRUE(map.valid()); + + // Write via GDRCopy copyTo (same BAR1 path as the read) + uint64_t val = 99999; + map.copyTo(&val, sizeof(uint64_t)); + + // Read through the hostPtr (volatile load via BAR1) + volatile uint64_t* ptr = reinterpret_cast(map.hostPtr()); + EXPECT_EQ(*ptr, 99999); +} + +TEST(GdrMapTest, DestroyDoesNotCrash) { + auto gpuMem = mscclpp::detail::gpuCallocShared(1); + { + mscclpp::GdrMap map(std::static_pointer_cast(gpuMem), deviceId_); + ASSERT_TRUE(map.valid()); + uint64_t val = 1; + map.copyTo(&val, sizeof(uint64_t)); + } + // After GdrMap is destroyed, gpuMem should still be valid + uint64_t result = 0; + MSCCLPP_CUDATHROW(cudaMemcpy(&result, gpuMem.get(), sizeof(uint64_t), cudaMemcpyDeviceToHost)); + EXPECT_EQ(result, 1); +} + +// GPU kernel: polls signalFromCpu until it reaches expectedIter, then writes expectedIter to ackToHost. +// Repeats for maxIter iterations. The GPU uses system-scope acquire loads on signalFromCpu +// and plain stores to ackToHost (which is host-pinned memory visible to CPU). +__global__ void kernelGdrVisibilityPingPong(volatile uint64_t* signalFromCpu, volatile uint64_t* ackToHost, + uint64_t maxIter) { + for (uint64_t iter = 1; iter <= maxIter; ++iter) { + // Poll until CPU writes the expected iteration value via GDRCopy BAR1 + while (*signalFromCpu < iter) { + } + // Ack back to CPU via host-pinned memory + *ackToHost = iter; + } +} + +TEST(GdrMapTest, CpuGpuVisibilityPingPong) { + const uint64_t maxIter = 10000; + + // signalBuf: GPU memory mapped via GDRCopy BAR1. CPU writes here, GPU polls. + auto signalBuf = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap signalMap(std::static_pointer_cast(signalBuf), deviceId_); + ASSERT_TRUE(signalMap.valid()); + + // ackBuf: host-pinned memory (gpuCallocHostShared). GPU writes here, CPU polls. + auto ackBuf = mscclpp::detail::gpuCallocHostShared(1); + volatile uint64_t* ackPtr = reinterpret_cast(ackBuf.get()); + *ackPtr = 0; + + // Launch kernel — it will poll signalBuf and write ackBuf for each iteration + kernelGdrVisibilityPingPong<<<1, 1>>>(signalBuf.get(), ackBuf.get(), maxIter); + MSCCLPP_CUDATHROW(cudaGetLastError()); + + for (uint64_t iter = 1; iter <= maxIter; ++iter) { + // CPU writes iteration value to GPU via GDRCopy BAR1 + uint64_t val = iter; + signalMap.copyTo(&val, sizeof(uint64_t)); + + // CPU polls host-pinned ack until GPU confirms it saw the value + int spin = 0; + while (*ackPtr < iter) { + if (++spin > 100000000) { + FAIL() << "GPU did not ack iteration " << iter << " (ack=" << *ackPtr << ")"; + } + } + } + + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + EXPECT_EQ(*ackPtr, maxIter); +} + +// GPU kernel that polls a counter using system-scope acquire load. +// When counter >= expectedIter, writes ack. +__global__ void kernelCounterWait(uint64_t* counter, volatile uint64_t* ackToHost, uint64_t maxIter) { + for (uint64_t iter = 1; iter <= maxIter; ++iter) { + // System-scope acquire load — matches the atomicStore(relaxed) on the CPU side + uint64_t got; + do { + got = mscclpp::atomicLoad(counter, mscclpp::memoryOrderAcquire); + } while (got < iter); + // Ack back + *ackToHost = iter; + } +} + +// Test the GDRCopy counter pattern used by HostNoAtomic mode: +// - GPU memory allocated via gpuCallocShared (cudaMalloc) +// - GdrMap for BAR1 mapping +// - CPU writes via atomicStore(relaxed) through GDRCopy BAR1 mapping +// - GPU reads via atomicLoad with memory_order_acquire +TEST(GdrMapTest, AtomicStoreCounterPingPong) { + const uint64_t maxIter = 10000; + + // Allocate GPU memory via gpuCallocShared + auto counterBuf = mscclpp::detail::gpuCallocShared(1); + mscclpp::GdrMap counterMap(std::static_pointer_cast(counterBuf), deviceId_); + ASSERT_TRUE(counterMap.valid()); + + // Ack buffer: host-pinned memory + auto ackBuf = mscclpp::detail::gpuCallocHostShared(1); + volatile uint64_t* ackPtr = reinterpret_cast(ackBuf.get()); + *ackPtr = 0; + + // Launch kernel — polls counterBuf with system-scope acquire load + kernelCounterWait<<<1, 1>>>(counterBuf.get(), ackBuf.get(), maxIter); + MSCCLPP_CUDATHROW(cudaGetLastError()); + + for (uint64_t iter = 1; iter <= maxIter; ++iter) { + // CPU writes counter via atomicStore (relaxed — GPU uses acquire on read) + mscclpp::atomicStore(counterMap.hostPtr(), iter, mscclpp::memoryOrderRelaxed); + + // Wait for GPU ack + int spin = 0; + while (*ackPtr < iter) { + if (++spin > 100000000) { + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + FAIL() << "GPU did not ack iteration " << iter; + } + } + } + + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + EXPECT_EQ(*ackPtr, maxIter); +}