diff --git a/src/core/ib.cc b/src/core/ib.cc index 557f0426..f4972f46 100644 --- a/src/core/ib.cc +++ b/src/core/ib.cc @@ -84,50 +84,40 @@ IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size, bool isDataDirect) : mr_(nu if (isGpuBuff && isDmabufSupportedByGpu(gpuId)) { #if !defined(MSCCLPP_USE_ROCM) int fd = -1; - size_t rangeSize = pages * pageSize; - - // Obtain a DMA-BUF file descriptor for the GPU memory range. On platforms with a CPU-GPU - // bridge that reorders posted writes (e.g., Grace/GB200 NVLink-C2C), the PCIe mapping flag - // routes DMA through the Data Direct engine for correct ordering and higher throughput. - // Fall back to the default (non-PCIe) mapping if the flag is unsupported. -#if (CUDA_VERSION >= 12030) - CUresult cuRes = cuMemGetHandleForAddressRange(&fd, addr, rangeSize, CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, - CU_MEM_RANGE_FLAG_DMA_BUF_MAPPING_TYPE_PCIE); - if (cuRes != CUDA_SUCCESS || fd < 0) { - if (fd >= 0) ::close(fd); - fd = -1; - } - bool usedPcieFlag = (fd >= 0); -#endif // CUDA_VERSION >= 12030 - if (fd < 0) { - MSCCLPP_CUTHROW(cuMemGetHandleForAddressRange(&fd, addr, rangeSize, CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, 0)); - } - - // Register the DMA-BUF memory region. When Data Direct is available, use the mlx5dv API - // which enables hardware-level Data Direct routing for the MR. Otherwise use standard verbs. size_t offsetInDmaBuf = buffIntPtr % pageSize; int accessFlags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_RELAXED_ORDERING | IBV_ACCESS_REMOTE_ATOMIC; #if defined(MSCCLPP_USE_MLX5DV) - if (isDataDirect) { - mr_ = MLX5DV::mlx5dv_reg_dmabuf_mr(pd, offsetInDmaBuf, size, buffIntPtr, fd, accessFlags); + if (isMlx5 && MLX5DV::isAvailable()) { + // DATA_DIRECT requires a PCIe BAR1-mapped DMA-BUF fd (CU_MEM_RANGE_FLAG_DMA_BUF_MAPPING_TYPE_PCIE). + // This matches the perftest approach for achieving full bandwidth with DATA_DIRECT. + CUresult cuRes = cuMemGetHandleForAddressRange(&fd, addr, pages * pageSize, + CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, + CU_MEM_RANGE_FLAG_DMA_BUF_MAPPING_TYPE_PCIE); + if (cuRes == CUDA_SUCCESS && fd >= 0) { + mr_ = MLX5DV::mlx5dv_reg_dmabuf_mr(pd, offsetInDmaBuf, size, buffIntPtr, fd, accessFlags); + if (mr_ != nullptr) { + isDataDirect_ = true; + } else { + INFO(NET, "mlx5dv_reg_dmabuf_mr failed with PCIe DMA-BUF, falling back to regular DMA-BUF"); + ::close(fd); + fd = -1; + } + } else { + INFO(NET, "cuMemGetHandleForAddressRange with PCIE flag failed (", cuRes, "), falling back"); + if (fd >= 0) { ::close(fd); fd = -1; } + } } #endif if (mr_ == nullptr) { + if (fd < 0) { + MSCCLPP_CUTHROW( + cuMemGetHandleForAddressRange(&fd, addr, pages * pageSize, CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, 0)); + } mr_ = IBVerbs::ibv_reg_dmabuf_mr(pd, offsetInDmaBuf, size, buffIntPtr, fd, accessFlags); } - - // If MR registration failed with a PCIe-mapped fd, retry with the default mapping. -#if (CUDA_VERSION >= 12030) - if (mr_ == nullptr && usedPcieFlag) { - ::close(fd); - MSCCLPP_CUTHROW(cuMemGetHandleForAddressRange(&fd, addr, rangeSize, CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD, 0)); - mr_ = IBVerbs::ibv_reg_dmabuf_mr(pd, offsetInDmaBuf, size, buffIntPtr, fd, accessFlags); - } -#endif // CUDA_VERSION >= 12030 - - ::close(fd); + if (fd >= 0) ::close(fd); if (mr_ == nullptr) { THROW(NET, IbError, errno, "ibv_reg_dmabuf_mr failed (errno ", errno, ")"); } diff --git a/src/core/include/execution_kernel.hpp b/src/core/include/execution_kernel.hpp index 20147c30..7719e61a 100644 --- a/src/core/include/execution_kernel.hpp +++ b/src/core/include/execution_kernel.hpp @@ -173,11 +173,11 @@ MSCCLPP_DEVICE_INLINE void handlePut(const Operation& op, void* input, void* out uint32_t dstOffset = dstOffsets[tid] + getOffset(portChannelBufferTypes_[op.outputBufferRefs[tid].id], offset); uint32_t srcOffset = srcOffsets[tid] + getOffset(op.inputBufferRefs[tid].type, offset); - if constexpr (PutWithSignal) { - portChannels_[channelIndexes[tid]].putWithSignal(dstMemoryId, dstOffset, srcMemoryId, srcOffset, size); - } else if constexpr (PutWithSignalAndFlush) { + if constexpr (PutWithSignalAndFlush) { portChannels_[channelIndexes[tid]].putWithSignalAndFlush(dstMemoryId, (uint64_t)dstOffset, srcMemoryId, - (uint64_t)srcOffsets, size); + (uint64_t)srcOffset, size); + } else if constexpr (PutWithSignal) { + portChannels_[channelIndexes[tid]].putWithSignal(dstMemoryId, dstOffset, srcMemoryId, srcOffset, size); } else { portChannels_[channelIndexes[tid]].put(dstMemoryId, dstOffset, srcMemoryId, srcOffset, size); }