From 252a422030c092163b8c75577b7223ca264f2443 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Fri, 15 May 2026 11:50:43 -0700 Subject: [PATCH 1/2] Handle PortChannel flush asynchronously from the host proxy (#802) When a `PortChannel` requests `flush()`, the host-side proxy was being blocked, which may cause head-of-line blocking of other parallel `PortChannel`s' requests. Now the proxy handles `flush()` requests asynchronously. This feature especially helps performance when we need multiple IB QPs and need to flush QPs. --- include/mscclpp/core.hpp | 2 + include/mscclpp/fifo.hpp | 5 + include/mscclpp/port_channel.hpp | 4 + include/mscclpp/port_channel_device.hpp | 33 ++- include/mscclpp/proxy.hpp | 4 +- src/core/connection.cc | 36 ++- src/core/fifo.cc | 2 + src/core/include/atomic.hpp | 16 +- src/core/include/connection.hpp | 25 ++ src/core/include/proxy_impl.hpp | 38 ++++ src/core/port_channel.cc | 64 +++++- src/core/proxy.cc | 34 +-- test/mp_unit/mp_unit_tests.hpp | 8 + test/mp_unit/port_channel_tests.cu | 289 ++++++++++++++++++++++++ 14 files changed, 507 insertions(+), 53 deletions(-) create mode 100644 src/core/include/proxy_impl.hpp diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index ca2fc34f..45b56bcc 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -676,6 +676,8 @@ class Connection { friend class Semaphore; friend class ProxyService; friend class BaseConnection; + friend struct BasePortChannel; + friend struct PortChannel; }; /// SemaphoreStub object only used for constructing Semaphore, not for direct use by the user. diff --git a/include/mscclpp/fifo.hpp b/include/mscclpp/fifo.hpp index 2ee793ad..6aae03b5 100644 --- a/include/mscclpp/fifo.hpp +++ b/include/mscclpp/fifo.hpp @@ -29,6 +29,11 @@ class Fifo { /// Remove the head trigger. void pop(); + /// Get the current tail position — the FIFO push-return value of the trigger about to be + /// (or currently being) processed by the proxy thread. Monotonically increasing. + /// @return The current tail position. + uint64_t tail() const; + /// Get FIFO size. /// @return Number of entries in the FIFO. int size() const; diff --git a/include/mscclpp/port_channel.hpp b/include/mscclpp/port_channel.hpp index ed660407..18d67524 100644 --- a/include/mscclpp/port_channel.hpp +++ b/include/mscclpp/port_channel.hpp @@ -84,8 +84,12 @@ class ProxyService : public BaseProxyService { std::vector memories_; std::shared_ptr proxy_; std::unordered_map, int> inflightRequests_; + // Latest pending TriggerSync FIFO position per connection. Proxy publishes pos+1 to the + // connection's gpuFlushDonePos_ when the CQ drains, then erases the entry. + std::unordered_map, uint64_t> pendingFlushPos_; ProxyHandlerResult handleTrigger(ProxyTrigger triggerRaw); + void progressFlushes(); }; /// Port channel without specifying source/destination memory regions. diff --git a/include/mscclpp/port_channel_device.hpp b/include/mscclpp/port_channel_device.hpp index adff3fcd..74fa3d89 100644 --- a/include/mscclpp/port_channel_device.hpp +++ b/include/mscclpp/port_channel_device.hpp @@ -17,6 +17,19 @@ using SemaphoreId = uint32_t; /// actual. using MemoryId = uint32_t; +namespace detail { +#if defined(MSCCLPP_DEVICE_COMPILE) +/// Wait until the proxy has processed and drained the TriggerSync at FIFO position `fifoPos`. +/// The proxy publishes `flushDonePos = latestCompletedPos + 1` when the CQ drains, so the +/// wait condition `flushDonePos > fifoPos` is satisfied exactly when our own request has +/// been completed. Using the FIFO push position as the wait target couples the wait to the +/// FIFO order, avoiding races when multiple GPU threads concurrently flush the same channel. +MSCCLPP_DEVICE_INLINE void waitFlush(uint64_t* flushDonePos, uint64_t fifoPos, [[maybe_unused]] int64_t maxSpinCount) { + POLL_MAYBE_JAILBREAK((atomicLoad(flushDonePos, memoryOrderAcquire) <= fifoPos), maxSpinCount); +} +#endif // defined(MSCCLPP_DEVICE_COMPILE) +} // namespace detail + struct BasePortChannelDeviceHandle { SemaphoreId semaphoreId_; @@ -26,12 +39,16 @@ struct BasePortChannelDeviceHandle { // can produce for and the sole proxy thread consumes it. FifoDeviceHandle fifo_; + // One past the highest FIFO position with a completed flush on this connection. + // Host-pinned: proxy writes after CQ drain, GPU reads in waitFlush(). + uint64_t* flushDonePos_; + MSCCLPP_INLINE BasePortChannelDeviceHandle() = default; MSCCLPP_HOST_DEVICE_INLINE BasePortChannelDeviceHandle(SemaphoreId semaphoreId, Host2DeviceSemaphoreDeviceHandle semaphore, - FifoDeviceHandle fifo) - : semaphoreId_(semaphoreId), semaphore_(semaphore), fifo_(fifo) {} + FifoDeviceHandle fifo, uint64_t* flushDonePos) + : semaphoreId_(semaphoreId), semaphore_(semaphore), fifo_(fifo), flushDonePos_(flushDonePos) {} #if defined(MSCCLPP_DEVICE_COMPILE) /// Push a TriggerData to the FIFO. @@ -86,9 +103,9 @@ struct BasePortChannelDeviceHandle { /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(MemoryId dstId, uint64_t dstOffset, MemoryId srcId, uint64_t srcOffset, uint64_t size, int64_t maxSpinCount = 1000000) { - uint64_t curFifoHead = + uint64_t pos = fifo_.push({TriggerData | TriggerFlag | TriggerSync, dstId, dstOffset, srcId, srcOffset, size, semaphoreId_}); - fifo_.sync(curFifoHead, maxSpinCount); + detail::waitFlush(flushDonePos_, pos, maxSpinCount); } /// Push a TriggerData, a TriggerFlag, and a TriggerSync at the same time to the FIFO. @@ -105,8 +122,8 @@ struct BasePortChannelDeviceHandle { /// Push a TriggerSync to the FIFO. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. MSCCLPP_DEVICE_INLINE void flush(int64_t maxSpinCount = 1000000) { - uint64_t curFifoHead = fifo_.push({TriggerSync, 0, 0, 0, 0, 1, semaphoreId_}); - fifo_.sync(curFifoHead, maxSpinCount); + uint64_t pos = fifo_.push({TriggerSync, 0, 0, 0, 0, 1, semaphoreId_}); + detail::waitFlush(flushDonePos_, pos, maxSpinCount); } /// Check if the port channel has been signaled. @@ -128,8 +145,8 @@ struct PortChannelDeviceHandle : public BasePortChannelDeviceHandle { MSCCLPP_HOST_DEVICE_INLINE PortChannelDeviceHandle(SemaphoreId semaphoreId, Host2DeviceSemaphoreDeviceHandle semaphore, FifoDeviceHandle fifo, - MemoryId dst, MemoryId src) - : BasePortChannelDeviceHandle(semaphoreId, semaphore, fifo), dst_(dst), src_(src) {} + MemoryId dst, MemoryId src, uint64_t* flushDonePos) + : BasePortChannelDeviceHandle(semaphoreId, semaphore, fifo, flushDonePos), dst_(dst), src_(src) {} #if defined(MSCCLPP_DEVICE_COMPILE) /// Push a TriggerData to the FIFO. diff --git a/include/mscclpp/proxy.hpp b/include/mscclpp/proxy.hpp index 990deabb..291206c0 100644 --- a/include/mscclpp/proxy.hpp +++ b/include/mscclpp/proxy.hpp @@ -20,8 +20,9 @@ enum class ProxyHandlerResult { }; class Proxy; +class ProxyService; -/// Handler function type for proxy. +/// Handler function type for proxy. Called once per ready FIFO trigger. using ProxyHandler = std::function; /// Host-side proxy for PortChannels. @@ -54,6 +55,7 @@ class Proxy { std::shared_ptr fifo(); private: + friend class ProxyService; struct Impl; std::unique_ptr pimpl_; }; diff --git a/src/core/connection.cc b/src/core/connection.cc index 11ecd968..e01c4a6e 100644 --- a/src/core/connection.cc +++ b/src/core/connection.cc @@ -7,13 +7,13 @@ #include #endif -#include #include #include #include #include #include "api.h" +#include "atomic.hpp" #include "context.hpp" #include "endpoint.hpp" #include "gpu_utils_internal.hpp" @@ -43,7 +43,10 @@ const RegisteredMemory::Impl& BaseConnection::getImpl(const RegisteredMemory& me Context::Impl& BaseConnection::getImpl(Context& context) { return *(context.pimpl_); } MSCCLPP_API_CPP BaseConnection::BaseConnection(std::shared_ptr context, const Endpoint& localEndpoint) - : context_(context), localEndpoint_(localEndpoint), maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()) {} + : context_(context), + localEndpoint_(localEndpoint), + maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()), + gpuFlushDonePos_(detail::gpuCallocHostShared()) {} MSCCLPP_API_CPP std::shared_ptr BaseConnection::context() const { return context_; } @@ -489,6 +492,35 @@ void IBConnection::flush(int64_t timeoutUsec) { #endif } +void IBConnection::requestFlush() { + // No-op: IB sends were already posted by prior conn.write() calls in handleTrigger. + // progressFlush() drives completion by polling the send CQ. +} + +bool IBConnection::progressFlush() { + if (recvThreadError_.load(std::memory_order_acquire)) { + THROW(CONN, Error, ErrorCode::SystemError, "IBConnection recv thread failed: ", recvThreadErrorMsg_); + } + + auto qp = qp_.lock(); + if (!qp || qp->getNumSendCqItems() == 0) { + return true; // QP expired or CQ already drained. + } + + int wcNum = qp->pollSendCq(); + if (wcNum < 0) { + THROW(NET, IbError, errno, "pollSendCq failed in progressFlush"); + } + for (int i = 0; i < wcNum; ++i) { + int status = qp->getSendWcStatus(i); + if (status != static_cast(WsStatus::Success)) { + THROW(NET, Error, ErrorCode::SystemError, + "an IB work item failed in progressFlush: ", qp->getSendWcStatusString(i)); + } + } + return qp->getNumSendCqItems() == 0; +} + // EthernetConnection EthernetConnection::EthernetConnection(std::shared_ptr context, const Endpoint& localEndpoint, diff --git a/src/core/fifo.cc b/src/core/fifo.cc index e0ac9916..b11775d8 100644 --- a/src/core/fifo.cc +++ b/src/core/fifo.cc @@ -53,6 +53,8 @@ MSCCLPP_API_CPP void Fifo::pop() { atomicStore(pimpl_->tail.get(), curTail + 1, memoryOrderRelease); } +MSCCLPP_API_CPP uint64_t Fifo::tail() const { return *(pimpl_->tail); } + MSCCLPP_API_CPP int Fifo::size() const { return pimpl_->size; } MSCCLPP_API_CPP FifoDeviceHandle Fifo::deviceHandle() const { diff --git a/src/core/include/atomic.hpp b/src/core/include/atomic.hpp index 26f549f2..b6079162 100644 --- a/src/core/include/atomic.hpp +++ b/src/core/include/atomic.hpp @@ -4,18 +4,16 @@ #ifndef MSCCLPP_ATOMIC_HPP_ #define MSCCLPP_ATOMIC_HPP_ -#if defined(MSCCLPP_USE_CUDA) -#ifndef MSCCLPP_DEVICE_CUDA +// On CUDA host-side compiles, force atomic_device.hpp's CUDA branch so host code uses +// cuda::atomic_ref (for system-scope ordering with GPU readers). On CUDA device compiles +// (MSCCLPP_DEVICE_CUDA already set by device.hpp) and on ROCm builds, include normally — +// atomic_device.hpp's branch selection works correctly without forcing. +#if defined(MSCCLPP_USE_CUDA) && !defined(MSCCLPP_DEVICE_CUDA) #define MSCCLPP_DEVICE_CUDA #include #undef MSCCLPP_DEVICE_CUDA -#endif // !defined(MSCCLPP_DEVICE_CUDA) -#else // !defined(MSCCLPP_USE_CUDA) -#ifndef MSCCLPP_DEVICE_HIP -#define MSCCLPP_DEVICE_HIP +#else #include -#undef MSCCLPP_DEVICE_HIP -#endif // !defined(MSCCLPP_DEVICE_HIP) -#endif // !defined(MSCCLPP_USE_CUDA) +#endif #endif // MSCCLPP_ATOMIC_HPP_ \ No newline at end of file diff --git a/src/core/include/connection.hpp b/src/core/include/connection.hpp index 22a9930f..eda4b3ef 100644 --- a/src/core/include/connection.hpp +++ b/src/core/include/connection.hpp @@ -50,6 +50,23 @@ class BaseConnection { /// When false, the NIC writes directly to the semaphore's registered memory (e.g., via atomics). virtual bool isSignalForwarding() const { return false; } + /// Request a flush. Subclasses that support async flush (e.g. IBConnection) override this + /// to be a no-op and rely on progressFlush() to drive completion via the proxy thread. + /// The default does a blocking flush(); progressFlush() then trivially returns true. + /// @note Only call from the proxy thread. + virtual void requestFlush() { flush(); } + + /// Progress pending async flush operations (non-blocking CQ poll). + /// @note Only call from the proxy thread. + /// @return true if no flush is pending (CQ fully drained or no request). + virtual bool progressFlush() { return true; } + + /// Get pointer to the GPU-visible flush-done position (host-pinned memory). + /// ProxyService writes "one past the highest completed FIFO position" here when the CQ + /// drains; GPU threads spin on it (`waitFlush`) until it surpasses their own push position. + /// @note Pointer is valid for the lifetime of this Connection. + uint64_t* getFlushDonePtr() const { return gpuFlushDonePos_.get(); } + virtual Transport transport() const = 0; virtual Transport remoteTransport() const = 0; @@ -75,6 +92,11 @@ class BaseConnection { std::shared_ptr context_; Endpoint localEndpoint_; int maxWriteQueueSize_; + + // GPU-visible flush-done position (host-pinned memory). ProxyService writes one past the + // highest FIFO position whose TriggerSync request has fully completed on this connection + // (CQ drained for IB, synchronous flush() returned for non-IB). + std::shared_ptr gpuFlushDonePos_; }; class CudaIpcConnection : public BaseConnection { @@ -149,6 +171,9 @@ class IBConnection : public BaseConnection { void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush(int64_t timeoutUsec) override; + + void requestFlush() override; + bool progressFlush() override; }; class EthernetConnection : public BaseConnection { diff --git a/src/core/include/proxy_impl.hpp b/src/core/include/proxy_impl.hpp new file mode 100644 index 00000000..a588e5df --- /dev/null +++ b/src/core/include/proxy_impl.hpp @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_PROXY_IMPL_HPP_ +#define MSCCLPP_PROXY_IMPL_HPP_ + +#include +#include +#include +#include +#include +#include + +namespace mscclpp { + +struct Proxy::Impl { + ProxyHandler handler; + std::function threadInit; + std::function progressHandler; + std::shared_ptr fifo; + std::atomic_bool threadStarted; + std::thread service; + std::atomic_bool running; + + Impl(ProxyHandler handler, std::function threadInit, int fifoSize) + : handler(handler), + threadInit(threadInit), + fifo(std::make_shared(fifoSize)), + threadStarted(false), + running(false) {} + + // Must be called before start() — the proxy thread captures progressHandler at start time. + void setProgressHandler(std::function h) { progressHandler = std::move(h); } +}; + +} // namespace mscclpp + +#endif // MSCCLPP_PROXY_IMPL_HPP_ diff --git a/src/core/port_channel.cc b/src/core/port_channel.cc index b8242db3..210b2eeb 100644 --- a/src/core/port_channel.cc +++ b/src/core/port_channel.cc @@ -1,11 +1,14 @@ // Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. +// Licensed under the MIT License. #include #include #include "api.h" -#include "debug.h" +#include "atomic.hpp" +#include "connection.hpp" +#include "logger.hpp" +#include "proxy_impl.hpp" namespace mscclpp { @@ -34,11 +37,12 @@ MSCCLPP_API_CPP ProxyService::ProxyService(int fifoSize) { MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevice)); if (deviceNumaNode >= 0) { numaBind(deviceNumaNode); - INFO(MSCCLPP_INIT, "NUMA node of ProxyService proxy thread is set to %d", deviceNumaNode); + INFO(CONN, "NUMA node of ProxyService proxy thread is set to ", deviceNumaNode); } }; auto handlerFunc = [&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }; proxy_ = std::make_shared(handlerFunc, initFunc, fifoSize); + proxy_->pimpl_->setProgressHandler([this]() { progressFlushes(); }); } MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator, @@ -84,9 +88,28 @@ MSCCLPP_API_CPP PortChannel ProxyService::portChannel(SemaphoreId id, MemoryId d MSCCLPP_API_CPP void ProxyService::startProxy(bool blocking) { proxy_->start(blocking); } -MSCCLPP_API_CPP void ProxyService::stopProxy() { proxy_->stop(); } +MSCCLPP_API_CPP void ProxyService::stopProxy() { + proxy_->stop(); + // Drain pending flushes. After a bounded loop, force-unblock any still-pending GPU + // waiters with a sentinel write (UINT64_MAX > any FIFO position). + for (int i = 0; i < 1000 && !pendingFlushPos_.empty(); ++i) { + progressFlushes(); + } + if (!pendingFlushPos_.empty()) { + WARN(CONN, "stopProxy: ", pendingFlushPos_.size(), " connections still pending; writing sentinel"); + for (auto& [conn, pos] : pendingFlushPos_) { + if (uint64_t* ptr = conn->getFlushDonePtr()) atomicStore(ptr, UINT64_MAX, memoryOrderRelease); + } + pendingFlushPos_.clear(); + } +} ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) { + // The proxy is the sole FIFO consumer and processes in strict push order, so the FIFO's + // tail (between poll() and pop()) matches the value GPU's fifo_.push() returned for this + // trigger — use it directly as our per-trigger sequence number. + uint64_t pos = proxy_->fifo()->tail(); + std::shared_ptr semaphore = semaphores_[trigger.fields.semaphoreId]; auto& conn = semaphore->connection(); @@ -105,9 +128,15 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) { numRequests++; } - if (((trigger.fields.type & TriggerSync) && numRequests > 0) || - (maxWriteQueueSize != -1 && numRequests >= maxWriteQueueSize)) { - conn.flush(); + if (trigger.fields.type & TriggerSync) { + // Record this TriggerSync's FIFO position. The GPU caller is spinning on + // flushDonePos_ > pos; progressFlushes() will publish pos+1 once the CQ drains. + // Later TriggerSyncs on the same conn overwrite — CQ drain completes them all at once. + conn.impl_->requestFlush(); + pendingFlushPos_[conn.impl_] = pos; + numRequests = 0; + } else if (maxWriteQueueSize != -1 && numRequests >= maxWriteQueueSize) { + conn.flush(); // flow-control flush stays blocking numRequests = 0; } @@ -115,12 +144,27 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) { } MSCCLPP_API_CPP BasePortChannel::DeviceHandle BasePortChannel::deviceHandle() const { - return BasePortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle()); + auto& conn = semaphore_->connection(); + return BasePortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle(), + conn.impl_->getFlushDonePtr()); } MSCCLPP_API_CPP PortChannel::DeviceHandle PortChannel::deviceHandle() const { - return PortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle(), dst_, - src_); + auto& conn = semaphore_->connection(); + return PortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle(), dst_, src_, + conn.impl_->getFlushDonePtr()); +} + +void ProxyService::progressFlushes() { + for (auto it = pendingFlushPos_.begin(); it != pendingFlushPos_.end();) { + if (it->first->progressFlush()) { + // CQ drained: publish pos+1 to unblock GPU waiters whose own pos <= recorded pos. + atomicStore(it->first->getFlushDonePtr(), it->second + 1, memoryOrderRelease); + it = pendingFlushPos_.erase(it); + } else { + ++it; + } + } } } // namespace mscclpp diff --git a/src/core/proxy.cc b/src/core/proxy.cc index de5b90fc..554336e8 100644 --- a/src/core/proxy.cc +++ b/src/core/proxy.cc @@ -1,38 +1,21 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include #include #include #include #include #include -#include #include "api.h" -#include "debug.h" +#include "logger.hpp" +#include "proxy_impl.hpp" namespace mscclpp { constexpr int ProxyStopCheckPeriod = 1000; constexpr int ProxyStartWarnPeriod = 1000; -struct Proxy::Impl { - ProxyHandler handler; - std::function threadInit; - std::shared_ptr fifo; - std::atomic_bool threadStarted; - std::thread service; - std::atomic_bool running; - - Impl(ProxyHandler handler, std::function threadInit, int fifoSize) - : handler(handler), - threadInit(threadInit), - fifo(std::make_shared(fifoSize)), - threadStarted(false), - running(false) {} -}; - MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler, std::function threadInit, int fifoSize) { pimpl_ = std::make_unique(handler, threadInit, fifoSize); } @@ -70,18 +53,23 @@ MSCCLPP_API_CPP void Proxy::start(bool blocking) { pimpl_->threadStarted.store(true, std::memory_order_release); - ProxyHandler handler = this->pimpl_->handler; - auto fifo = this->pimpl_->fifo; + ProxyHandler handler = pimpl_->handler; + auto progressHandler = pimpl_->progressHandler; + auto fifo = pimpl_->fifo; ProxyTrigger trigger; int runCnt = ProxyStopCheckPeriod; for (;;) { if (runCnt-- == 0) { runCnt = ProxyStopCheckPeriod; - if (!this->pimpl_->running.load(std::memory_order_acquire)) { + if (!pimpl_->running.load(std::memory_order_acquire)) { break; } } + // Per-iteration system work (e.g. progressing pending async flushes), + // run regardless of whether a trigger is ready this iteration. + if (progressHandler) progressHandler(); + // Poll to see if we are ready to send anything trigger = fifo->poll(); if (trigger.fst == 0 || trigger.snd == 0) { // TODO: this check is a potential pitfall for custom triggers @@ -107,7 +95,7 @@ MSCCLPP_API_CPP void Proxy::start(bool blocking) { count--; if (count == 0) { count = ProxyStartWarnPeriod; - WARN("Proxy thread startup taking longer than expected."); + WARN(CONN, "Proxy thread startup taking longer than expected."); } } } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index f4a26cf9..eb8b5485 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -160,6 +160,14 @@ class PortChannelOneToOneTest : public CommunicatorTestBase { void testPacketPingPong(bool useIbOnly, IbMode ibMode = IbMode::Default); void testPacketPingPongPerf(bool useIbOnly, IbMode ibMode = IbMode::Default); void testBandwidth(PingPongTestParams params); + void setupMultiQpChannels(int numQps, size_t elemsPerChan, IbMode ibMode, int tagBase, + std::vector>& sendBuffs, + std::vector& localMems, + std::vector& remoteMems, + std::vector& portChannels); + void testMultiQpBandwidth(IbMode ibMode, int numQps); + void testMultiQpFlushStress(IbMode ibMode, int numQps); + void testSameChanConcurrentFlush(IbMode ibMode); std::shared_ptr proxyService; }; diff --git a/test/mp_unit/port_channel_tests.cu b/test/mp_unit/port_channel_tests.cu index 4b1b0cfb..47034cdb 100644 --- a/test/mp_unit/port_channel_tests.cu +++ b/test/mp_unit/port_channel_tests.cu @@ -625,3 +625,292 @@ PERF_TEST(PortChannelOneToOneTest, BandwidthIbHostNoAtomicMode) { testBandwidth(PingPongTestParams{ .useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::HostNoAtomic}); } + +static constexpr int kMaxQps = 4; +__constant__ DeviceHandle gMultiQpPortChans[kMaxQps]; + +// Multi-QP bandwidth kernel: barrier on QP 0 only, then putWithSignal on all QPs. +// Only one signal/wait pair is needed for sync between two GPU kernels. +__global__ void kernelMultiQpBandwidth(int nElemPerChan, int nIters, int numQps) { + if (threadIdx.x != 0) return; + for (int i = 0; i < nIters; i++) { + // Barrier on QP 0 only — syncs both ranks + gMultiQpPortChans[0].signal(); + gMultiQpPortChans[0].wait(); + // Data transfer: put on all QPs simultaneously + for (int q = 0; q < numQps; q++) { + gMultiQpPortChans[q].putWithSignal(0, nElemPerChan * sizeof(int)); + } + // Wait for all remote data arrivals + for (int q = 0; q < numQps; q++) { + gMultiQpPortChans[q].wait(); + } + } +} + +// Multi-QP setup helper: bootstrap N parallel IB connections + port channels in two +// futures-based phases (issue all async ops before resolving any, to avoid deadlock). +// tagBase: distinct base used by each caller so concurrent tests don't clash on tags. +void PortChannelOneToOneTest::setupMultiQpChannels(int numQps, size_t elemsPerChan, IbMode ibMode, int tagBase, + std::vector>& sendBuffs, + std::vector& localMems, + std::vector& remoteMems, + std::vector& portChannels) { + const int peer = 1 - communicator->bootstrap()->getRank(); + sendBuffs.assign(numQps, nullptr); + localMems.assign(numQps, mscclpp::RegisteredMemory{}); + remoteMems.assign(numQps, mscclpp::RegisteredMemory{}); + portChannels.clear(); + + std::vector> connFutures(numQps); + std::vector> remoteMemFutures(numQps); + + for (int q = 0; q < numQps; q++) { + sendBuffs[q] = mscclpp::GpuBuffer(elemsPerChan).memory(); + localMems[q] = communicator->registerMemory(sendBuffs[q].get(), elemsPerChan * sizeof(int), ibTransport); + + mscclpp::EndpointConfig cfg; + cfg.transport = ibTransport; + cfg.ib.gidIndex = std::stoi(gEnv->args["ib_gid_index"]); + cfg.ib.mode = ibMode; + + connFutures[q] = communicator->connect(cfg, peer, tagBase + q); + communicator->sendMemory(localMems[q], peer, tagBase + numQps + q); + remoteMemFutures[q] = communicator->recvMemory(peer, tagBase + numQps + q); + } + + for (int q = 0; q < numQps; q++) { + auto conn = connFutures[q].get(); + remoteMems[q] = remoteMemFutures[q].get(); + auto sema = communicator->buildSemaphore(conn, peer, tagBase + 2 * numQps + q).get(); + mscclpp::SemaphoreId cid = proxyService->addSemaphore(sema); + portChannels.emplace_back( + proxyService->portChannel(cid, proxyService->addMemory(remoteMems[q]), proxyService->addMemory(localMems[q]))); + } +} + +void PortChannelOneToOneTest::testMultiQpBandwidth(IbMode ibMode, int numQps) { + if (gEnv->rank >= numRanksToUse) return; + + const int rank = communicator->bootstrap()->getRank(); + const int maxElemPerChan = 32 * 1024 * 1024; // 128 MB per channel + + std::vector> sendBuffs; + std::vector localMems; + std::vector remoteMems; + std::vector portChannels; + setupMultiQpChannels(numQps, maxElemPerChan, ibMode, /*tagBase=*/100, sendBuffs, localMems, remoteMems, portChannels); + + std::vector> handles; + for (auto& ch : portChannels) handles.push_back(ch.deviceHandle()); + ASSERT_EQ(handles.size(), static_cast(numQps)); + ASSERT_LE(numQps, kMaxQps); // numQps must not exceed __constant__ array size (kMaxQps) + MSCCLPP_CUDATHROW( + cudaMemcpyToSymbol(gMultiQpPortChans, handles.data(), numQps * sizeof(DeviceHandle))); + + proxyService->startProxy(); + + const std::string testName = ::mscclpp::test::currentTestName(); + const std::string qpLabel = std::to_string(numQps) + " QP" + (numQps > 1 ? "s" : ""); + + for (int nElemPerChan : + {256, 16 * 1024, 256 * 1024, 1024 * 1024, 4 * 1024 * 1024, 16 * 1024 * 1024, 32 * 1024 * 1024}) { + int nIters = 10000; + // Warm-up + kernelMultiQpBandwidth<<<1, 1>>>(nElemPerChan, 10, numQps); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + communicator->bootstrap()->barrier(); + + // Measure + mscclpp::Timer timer; + kernelMultiQpBandwidth<<<1, 1>>>(nElemPerChan, nIters, numQps); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + double elapsedUs = timer.elapsed(); + communicator->bootstrap()->barrier(); + + if (rank == 0) { + double totalBytes = (double)nElemPerChan * sizeof(int) * numQps; + double elapsedMsPerIter = elapsedUs / 1e3 / nIters; + double gbps = totalBytes / elapsedMsPerIter * 1e-6; + double totalSizeKB = totalBytes / 1024.0; + std::string label; + if (totalSizeKB >= 1024.0) + label = std::to_string((int)(totalSizeKB / 1024.0)) + " MB"; + else + label = std::to_string((int)totalSizeKB) + " KB"; + ::mscclpp::test::reportPerfResult(label + " (" + qpLabel + ")", gbps, "GB/s"); + } + } + + proxyService->stopProxy(); + + for (auto& m : localMems) registeredMemories.push_back(m); + for (auto& m : remoteMems) registeredMemories.push_back(m); +} + +PERF_TEST(PortChannelOneToOneTest, MultiQpBandwidthIbHostMode) { + REQUIRE_IBVERBS; + REQUIRE_GDR_FOR_IB_MODE(IbMode::Host); + for (int numQps : {1, 2, 4}) { + testMultiQpBandwidth(IbMode::Host, numQps); + } +} + +PERF_TEST(PortChannelOneToOneTest, MultiQpBandwidthIbHostNoAtomicMode) { + REQUIRE_IBVERBS; + REQUIRE_GDR_FOR_IB_MODE(IbMode::HostNoAtomic); + for (int numQps : {1, 2, 4}) { + testMultiQpBandwidth(IbMode::HostNoAtomic, numQps); + } +} + +// Multi-QP flush-stress kernel: one thread per QP, all calling putWithSignalAndFlush +// concurrently so all N CQ drains are in flight on the proxy thread at once. +// This is the concurrent-flush worst case the async-progress design protects against. +__global__ void kernelMultiQpFlushStress(int nElemPerChan, int nIters, int numQps) { + int q = threadIdx.x; + if (q >= numQps) return; + for (int i = 0; i < nIters; i++) { + if (q == 0) { + gMultiQpPortChans[0].signal(); + gMultiQpPortChans[0].wait(); + } + __syncthreads(); + gMultiQpPortChans[q].putWithSignalAndFlush(0, nElemPerChan * sizeof(int)); + gMultiQpPortChans[q].wait(); + __syncthreads(); + } +} + +void PortChannelOneToOneTest::testMultiQpFlushStress(IbMode ibMode, int numQps) { + if (gEnv->rank >= numRanksToUse) return; + + const int rank = communicator->bootstrap()->getRank(); + const int maxElemPerChan = 64 * 1024; + + std::vector> sendBuffs; + std::vector localMems; + std::vector remoteMems; + std::vector portChannels; + setupMultiQpChannels(numQps, maxElemPerChan, ibMode, /*tagBase=*/400, sendBuffs, localMems, remoteMems, portChannels); + + std::vector> handles; + for (auto& ch : portChannels) handles.push_back(ch.deviceHandle()); + ASSERT_EQ(handles.size(), static_cast(numQps)); + ASSERT_LE(numQps, kMaxQps); + MSCCLPP_CUDATHROW( + cudaMemcpyToSymbol(gMultiQpPortChans, handles.data(), numQps * sizeof(DeviceHandle))); + + proxyService->startProxy(); + + const std::string qpLabel = std::to_string(numQps) + " QP" + (numQps > 1 ? "s" : ""); + + for (int nElemPerChan : {256, 4 * 1024, 64 * 1024}) { + int nIters = 2000; + kernelMultiQpFlushStress<<<1, numQps>>>(nElemPerChan, 10, numQps); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + communicator->bootstrap()->barrier(); + + mscclpp::Timer timer; + kernelMultiQpFlushStress<<<1, numQps>>>(nElemPerChan, nIters, numQps); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + double elapsedUs = timer.elapsed(); + communicator->bootstrap()->barrier(); + + if (rank == 0) { + double usPerIter = elapsedUs / nIters; + double usPerIterPerQp = usPerIter / numQps; + int bytesPerChan = nElemPerChan * (int)sizeof(int); + std::string sizeLabel = (bytesPerChan >= 1024) ? (std::to_string(bytesPerChan / 1024) + " KB") + : (std::to_string(bytesPerChan) + " B"); + ::mscclpp::test::reportPerfResult(sizeLabel + " (" + qpLabel + ") per-iter", usPerIter, "us"); + ::mscclpp::test::reportPerfResult(sizeLabel + " (" + qpLabel + ") per-iter/QP", usPerIterPerQp, "us"); + } + } + + proxyService->stopProxy(); + + for (auto& m : localMems) registeredMemories.push_back(m); + for (auto& m : remoteMems) registeredMemories.push_back(m); +} + +PERF_TEST(PortChannelOneToOneTest, MultiQpFlushStressIbHostMode) { + REQUIRE_IBVERBS; + REQUIRE_GDR_FOR_IB_MODE(IbMode::Host); + for (int numQps : {1, 2, 4}) { + testMultiQpFlushStress(IbMode::Host, numQps); + } +} + +PERF_TEST(PortChannelOneToOneTest, MultiQpFlushStressIbHostNoAtomicMode) { + REQUIRE_IBVERBS; + REQUIRE_GDR_FOR_IB_MODE(IbMode::HostNoAtomic); + for (int numQps : {1, 2, 4}) { + testMultiQpFlushStress(IbMode::HostNoAtomic, numQps); + } +} + +// Same-channel concurrent-flush kernel: N GPU threads on the same PortChannel each call +// putWithSignalAndFlush in lockstep. Stresses the FIFO-position-based wait target so that +// each caller waits on its own TriggerSync rather than on a globally-incrementing counter +// that could be assigned out-of-order relative to the FIFO push order. +__constant__ DeviceHandle gSingleChanForConcurrentFlush; + +__global__ void kernelSameChanConcurrentFlush(int nIters) { + auto& chan = gSingleChanForConcurrentFlush; + int tid = threadIdx.x; + for (int i = 0; i < nIters; i++) { + // Each thread writes to a distinct slot (so puts don't overlap on remote side), + // then concurrently flushes on the same channel. + uint64_t offset = tid * sizeof(int); + chan.putWithSignalAndFlush(offset, offset, sizeof(int)); + // Each thread waits for one signal from the remote rank's symmetric putWithSignalAndFlush. + chan.wait(); + } +} + +void PortChannelOneToOneTest::testSameChanConcurrentFlush(IbMode ibMode) { + if (gEnv->rank >= numRanksToUse) return; + + constexpr int nThreads = 4; + std::vector> sendBuffs; + std::vector localMems; + std::vector remoteMems; + std::vector portChannels; + setupMultiQpChannels(/*numQps=*/1, /*elemsPerChan=*/nThreads, ibMode, /*tagBase=*/700, sendBuffs, localMems, + remoteMems, portChannels); + + DeviceHandle handle = portChannels[0].deviceHandle(); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gSingleChanForConcurrentFlush, &handle, sizeof(handle))); + + proxyService->startProxy(); + + // Warm-up + kernelSameChanConcurrentFlush<<<1, nThreads>>>(10); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + communicator->bootstrap()->barrier(); + + // Measure: a successful completion (no deadlock, no CQ error) validates that each + // concurrent-flush caller waited on its own TriggerSync (not someone else's earlier one). + const int nIters = 500; + mscclpp::Timer timer; + kernelSameChanConcurrentFlush<<<1, nThreads>>>(nIters); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + double elapsedUs = timer.elapsed(); + communicator->bootstrap()->barrier(); + + if (communicator->bootstrap()->getRank() == 0) { + double usPerIter = elapsedUs / nIters; + ::mscclpp::test::reportPerfResult(std::to_string(nThreads) + " threads same-chan per-iter", usPerIter, "us"); + } + + proxyService->stopProxy(); + for (auto& m : localMems) registeredMemories.push_back(m); + for (auto& m : remoteMems) registeredMemories.push_back(m); +} + +TEST(PortChannelOneToOneTest, SameChanConcurrentFlushIbHostMode) { + REQUIRE_IBVERBS; + REQUIRE_GDR_FOR_IB_MODE(IbMode::Host); + testSameChanConcurrentFlush(IbMode::Host); +} From 60a6d7219f1d97c32c275864ed3fb456b6b3f300 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 15 May 2026 14:06:50 -0700 Subject: [PATCH 2/2] Clean up completed communicator receives (#804) ## Summary - Release the reference after last requests are ready. - Keep ordered receive chaining for repeated rank/tag operations while cleaning up completed receive bookkeeping. --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/core/communicator.cc | 102 +++++++++++++++++------------- src/core/include/communicator.hpp | 5 +- 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/src/core/communicator.cc b/src/core/communicator.cc index c95ca421..41e46bc5 100644 --- a/src/core/communicator.cc +++ b/src/core/communicator.cc @@ -7,6 +7,36 @@ namespace mscclpp { +namespace { + +template +std::shared_future makeOrderedRecvFuture(Impl* impl, int remoteRank, int tag, Func func) { + // Weak placeholder to avoid a reference cycle; updated with the real recvItem after the future is created. + auto thisRecvItem = std::make_shared>(); + auto future = std::async(std::launch::deferred, + [impl, remoteRank, tag, thisRecvItem, lastRecvItem = impl->getLastRecvItem(remoteRank, tag), + func = std::move(func)]() mutable { + auto cleanup = [impl, remoteRank, tag, thisRecvItem]() { + impl->clearLastRecvItemIfMatches(remoteRank, tag, thisRecvItem->lock()); + }; + + if (lastRecvItem) { + // Recursive call to the previous receive items + lastRecvItem->wait(); + } + auto result = func(); + cleanup(); + return result; + }); + auto sharedFuture = std::shared_future(std::move(future)); + auto recvItem = std::make_shared>(sharedFuture); + *thisRecvItem = recvItem; + impl->setLastRecvItem(remoteRank, tag, recvItem); + return sharedFuture; +} + +} // namespace + Communicator::Impl::Impl(std::shared_ptr bootstrap, std::shared_ptr context) : bootstrap_(bootstrap) { if (!context) { @@ -32,6 +62,14 @@ std::shared_ptr Communicator::Impl::getLastRecvItem(int remoteRank return it->second; } +void Communicator::Impl::clearLastRecvItemIfMatches(int remoteRank, int tag, + const std::shared_ptr& expectedItem) { + auto it = lastRecvItems_.find({remoteRank, tag}); + if (it != lastRecvItems_.end() && it->second == expectedItem) { + lastRecvItems_.erase(it); + } +} + MSCCLPP_API_CPP Communicator::~Communicator() = default; MSCCLPP_API_CPP Communicator::Communicator(std::shared_ptr bootstrap, std::shared_ptr context) @@ -83,19 +121,11 @@ MSCCLPP_API_CPP std::shared_future Communicator::recvMemory(in locRecvMemList.push_back(std::move(locRecvMem)); return future; } - auto future = std::async(std::launch::deferred, - [this, remoteRank, tag, lastRecvItem = pimpl_->getLastRecvItem(remoteRank, tag)]() { - if (lastRecvItem) { - // Recursive call to the previous receive items - lastRecvItem->wait(); - } - std::vector data; - bootstrap()->recv(data, remoteRank, tag); - return RegisteredMemory::deserialize(data); - }); - auto shared_future = std::shared_future(std::move(future)); - pimpl_->setLastRecvItem(remoteRank, tag, std::make_shared>(shared_future)); - return shared_future; + return makeOrderedRecvFuture(pimpl_.get(), remoteRank, tag, [this, remoteRank, tag]() { + std::vector data; + bootstrap()->recv(data, remoteRank, tag); + return RegisteredMemory::deserialize(data); + }); } MSCCLPP_API_CPP std::shared_future Communicator::connect(const Endpoint& localEndpoint, int remoteRank, @@ -112,22 +142,15 @@ MSCCLPP_API_CPP std::shared_future Communicator::connect(const Endpo bootstrap()->send(localEndpoint.serialize(), remoteRank, tag); - auto future = std::async(std::launch::deferred, [this, remoteRank, tag, localEndpoint, - lastRecvItem = pimpl_->getLastRecvItem(remoteRank, tag)]() mutable { - if (lastRecvItem) { - // Recursive call to the previous receive items - lastRecvItem->wait(); - } - std::vector data; - bootstrap()->recv(data, remoteRank, tag); - auto remoteEndpoint = Endpoint::deserialize(data); - auto connection = context()->connect(localEndpoint, remoteEndpoint); - pimpl_->connectionInfos_[connection.impl_.get()] = {remoteRank, tag}; - return connection; - }); - auto shared_future = std::shared_future(std::move(future)); - pimpl_->setLastRecvItem(remoteRank, tag, std::make_shared>(shared_future)); - return shared_future; + return makeOrderedRecvFuture(pimpl_.get(), remoteRank, tag, + [this, remoteRank, tag, localEndpoint]() mutable { + std::vector data; + bootstrap()->recv(data, remoteRank, tag); + auto remoteEndpoint = Endpoint::deserialize(data); + auto connection = context()->connect(localEndpoint, remoteEndpoint); + pimpl_->connectionInfos_[connection.impl_.get()] = {remoteRank, tag}; + return connection; + }); } MSCCLPP_API_CPP std::shared_future Communicator::connect(const EndpointConfig& localConfig, int remoteRank, @@ -141,21 +164,12 @@ MSCCLPP_API_CPP std::shared_future Communicator::buildSemaphore(const SemaphoreStub localStub(connection); bootstrap()->send(localStub.serialize(), remoteRank, tag); - auto future = - std::async(std::launch::deferred, [this, remoteRank, tag, lastRecvItem = pimpl_->getLastRecvItem(remoteRank, tag), - localStub = localStub]() mutable { - if (lastRecvItem) { - // Recursive call to the previous receive items - lastRecvItem->wait(); - } - std::vector data; - bootstrap()->recv(data, remoteRank, tag); - auto remoteStub = SemaphoreStub::deserialize(data); - return Semaphore(localStub, remoteStub); - }); - auto shared_future = std::shared_future(std::move(future)); - pimpl_->setLastRecvItem(remoteRank, tag, std::make_shared>(shared_future)); - return shared_future; + return makeOrderedRecvFuture(pimpl_.get(), remoteRank, tag, [this, remoteRank, tag, localStub]() mutable { + std::vector data; + bootstrap()->recv(data, remoteRank, tag); + auto remoteStub = SemaphoreStub::deserialize(data); + return Semaphore(localStub, remoteStub); + }); } MSCCLPP_API_CPP int Communicator::remoteRankOf(const Connection& connection) { diff --git a/src/core/include/communicator.hpp b/src/core/include/communicator.hpp index 8d7539ef..f15e20f7 100644 --- a/src/core/include/communicator.hpp +++ b/src/core/include/communicator.hpp @@ -62,7 +62,7 @@ struct Communicator::Impl { std::unordered_map connectionInfos_; // Temporary storage for the latest RecvItem of each {remoteRank, tag} pair. - // If the RecvItem gets ready, it will be removed at the next call to getLastRecvItem. + // The RecvItem is removed when it finishes or when getLastRecvItem observes that it is ready. std::unordered_map, std::shared_ptr, PairHash> lastRecvItems_; // RegisteredMemory items sent to the local rank of each tag. Sending memory to the local rank is @@ -79,6 +79,9 @@ struct Communicator::Impl { // If the item is ready, it will be removed from the map and nullptr will be returned. std::shared_ptr getLastRecvItem(int remoteRank, int tag); + // Clear the last RecvItem only if it still matches the expected item. + void clearLastRecvItemIfMatches(int remoteRank, int tag, const std::shared_ptr& expectedItem); + struct Connector; };