Merge branch 'main' into binyli/mnnvl

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Binyang Li
2026-05-15 21:36:36 +00:00
16 changed files with 529 additions and 82 deletions

View File

@@ -676,6 +676,8 @@ class Connection {
friend class Semaphore;
friend class ProxyService;
friend class BaseConnection;
friend struct BasePortChannel;
friend struct PortChannel;
};
/// SemaphoreStub object only used for constructing Semaphore, not for direct use by the user.

View File

@@ -29,6 +29,11 @@ class Fifo {
/// Remove the head trigger.
void pop();
/// Get the current tail position — the FIFO push-return value of the trigger about to be
/// (or currently being) processed by the proxy thread. Monotonically increasing.
/// @return The current tail position.
uint64_t tail() const;
/// Get FIFO size.
/// @return Number of entries in the FIFO.
int size() const;

View File

@@ -84,8 +84,12 @@ class ProxyService : public BaseProxyService {
std::vector<RegisteredMemory> memories_;
std::shared_ptr<Proxy> proxy_;
std::unordered_map<std::shared_ptr<BaseConnection>, int> inflightRequests_;
// Latest pending TriggerSync FIFO position per connection. Proxy publishes pos+1 to the
// connection's gpuFlushDonePos_ when the CQ drains, then erases the entry.
std::unordered_map<std::shared_ptr<BaseConnection>, uint64_t> pendingFlushPos_;
ProxyHandlerResult handleTrigger(ProxyTrigger triggerRaw);
void progressFlushes();
};
/// Port channel without specifying source/destination memory regions.

View File

@@ -17,6 +17,19 @@ using SemaphoreId = uint32_t;
/// actual.
using MemoryId = uint32_t;
namespace detail {
#if defined(MSCCLPP_DEVICE_COMPILE)
/// Wait until the proxy has processed and drained the TriggerSync at FIFO position `fifoPos`.
/// The proxy publishes `flushDonePos = latestCompletedPos + 1` when the CQ drains, so the
/// wait condition `flushDonePos > fifoPos` is satisfied exactly when our own request has
/// been completed. Using the FIFO push position as the wait target couples the wait to the
/// FIFO order, avoiding races when multiple GPU threads concurrently flush the same channel.
MSCCLPP_DEVICE_INLINE void waitFlush(uint64_t* flushDonePos, uint64_t fifoPos, [[maybe_unused]] int64_t maxSpinCount) {
POLL_MAYBE_JAILBREAK((atomicLoad<uint64_t, scopeSystem>(flushDonePos, memoryOrderAcquire) <= fifoPos), maxSpinCount);
}
#endif // defined(MSCCLPP_DEVICE_COMPILE)
} // namespace detail
struct BasePortChannelDeviceHandle {
SemaphoreId semaphoreId_;
@@ -26,12 +39,16 @@ struct BasePortChannelDeviceHandle {
// can produce for and the sole proxy thread consumes it.
FifoDeviceHandle fifo_;
// One past the highest FIFO position with a completed flush on this connection.
// Host-pinned: proxy writes after CQ drain, GPU reads in waitFlush().
uint64_t* flushDonePos_;
MSCCLPP_INLINE BasePortChannelDeviceHandle() = default;
MSCCLPP_HOST_DEVICE_INLINE BasePortChannelDeviceHandle(SemaphoreId semaphoreId,
Host2DeviceSemaphoreDeviceHandle semaphore,
FifoDeviceHandle fifo)
: semaphoreId_(semaphoreId), semaphore_(semaphore), fifo_(fifo) {}
FifoDeviceHandle fifo, uint64_t* flushDonePos)
: semaphoreId_(semaphoreId), semaphore_(semaphore), fifo_(fifo), flushDonePos_(flushDonePos) {}
#if defined(MSCCLPP_DEVICE_COMPILE)
/// Push a TriggerData to the FIFO.
@@ -86,9 +103,9 @@ struct BasePortChannelDeviceHandle {
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(MemoryId dstId, uint64_t dstOffset, MemoryId srcId,
uint64_t srcOffset, uint64_t size, int64_t maxSpinCount = 1000000) {
uint64_t curFifoHead =
uint64_t pos =
fifo_.push({TriggerData | TriggerFlag | TriggerSync, dstId, dstOffset, srcId, srcOffset, size, semaphoreId_});
fifo_.sync(curFifoHead, maxSpinCount);
detail::waitFlush(flushDonePos_, pos, maxSpinCount);
}
/// Push a TriggerData, a TriggerFlag, and a TriggerSync at the same time to the FIFO.
@@ -105,8 +122,8 @@ struct BasePortChannelDeviceHandle {
/// Push a TriggerSync to the FIFO.
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
MSCCLPP_DEVICE_INLINE void flush(int64_t maxSpinCount = 1000000) {
uint64_t curFifoHead = fifo_.push({TriggerSync, 0, 0, 0, 0, 1, semaphoreId_});
fifo_.sync(curFifoHead, maxSpinCount);
uint64_t pos = fifo_.push({TriggerSync, 0, 0, 0, 0, 1, semaphoreId_});
detail::waitFlush(flushDonePos_, pos, maxSpinCount);
}
/// Check if the port channel has been signaled.
@@ -128,8 +145,8 @@ struct PortChannelDeviceHandle : public BasePortChannelDeviceHandle {
MSCCLPP_HOST_DEVICE_INLINE PortChannelDeviceHandle(SemaphoreId semaphoreId,
Host2DeviceSemaphoreDeviceHandle semaphore, FifoDeviceHandle fifo,
MemoryId dst, MemoryId src)
: BasePortChannelDeviceHandle(semaphoreId, semaphore, fifo), dst_(dst), src_(src) {}
MemoryId dst, MemoryId src, uint64_t* flushDonePos)
: BasePortChannelDeviceHandle(semaphoreId, semaphore, fifo, flushDonePos), dst_(dst), src_(src) {}
#if defined(MSCCLPP_DEVICE_COMPILE)
/// Push a TriggerData to the FIFO.

View File

@@ -20,8 +20,9 @@ enum class ProxyHandlerResult {
};
class Proxy;
class ProxyService;
/// Handler function type for proxy.
/// Handler function type for proxy. Called once per ready FIFO trigger.
using ProxyHandler = std::function<ProxyHandlerResult(ProxyTrigger)>;
/// Host-side proxy for PortChannels.
@@ -54,6 +55,7 @@ class Proxy {
std::shared_ptr<Fifo> fifo();
private:
friend class ProxyService;
struct Impl;
std::unique_ptr<Impl> pimpl_;
};

View File

@@ -11,42 +11,24 @@ namespace mscclpp {
namespace {
template <typename Fn>
class ScopeGuard {
public:
explicit ScopeGuard(Fn fn) : fn_(std::move(fn)) {}
ScopeGuard(const ScopeGuard&) = delete;
ScopeGuard& operator=(const ScopeGuard&) = delete;
~ScopeGuard() { fn_(); }
private:
Fn fn_;
};
template <typename Fn>
ScopeGuard<Fn> makeScopeGuard(Fn fn) {
return ScopeGuard<Fn>(std::move(fn));
}
template <typename T, typename Impl, typename Fn>
std::shared_future<T> makeOrderedRecvFuture(Impl* impl, int remoteRank, int tag, Fn fn) {
template <typename T, typename Impl, typename Func>
std::shared_future<T> makeOrderedRecvFuture(Impl* impl, int remoteRank, int tag, Func func) {
// Weak placeholder to avoid a reference cycle; updated with the real recvItem after the future is created.
auto thisRecvItem = std::make_shared<std::weak_ptr<BaseRecvItem>>();
auto future = std::async(std::launch::deferred,
[impl, remoteRank, tag, thisRecvItem, lastRecvItem = impl->getLastRecvItem(remoteRank, tag),
fn = std::move(fn)]() mutable {
[[maybe_unused]] auto cleanup = makeScopeGuard([impl, remoteRank, tag, thisRecvItem]() {
auto item = thisRecvItem->lock();
auto it = impl->lastRecvItems_.find({remoteRank, tag});
if (item && it != impl->lastRecvItems_.end() && it->second == item) {
impl->lastRecvItems_.erase(it);
}
});
func = std::move(func)]() mutable {
auto cleanup = [impl, remoteRank, tag, thisRecvItem]() {
impl->clearLastRecvItemIfMatches(remoteRank, tag, thisRecvItem->lock());
};
if (lastRecvItem) {
// Recursive call to the previous receive items
lastRecvItem->wait();
}
return fn();
auto result = func();
cleanup();
return result;
});
auto sharedFuture = std::shared_future<T>(std::move(future));
auto recvItem = std::make_shared<RecvItem<T>>(sharedFuture);
@@ -82,6 +64,14 @@ std::shared_ptr<BaseRecvItem> Communicator::Impl::getLastRecvItem(int remoteRank
return it->second;
}
void Communicator::Impl::clearLastRecvItemIfMatches(int remoteRank, int tag,
const std::shared_ptr<BaseRecvItem>& expectedItem) {
auto it = lastRecvItems_.find({remoteRank, tag});
if (it != lastRecvItems_.end() && it->second == expectedItem) {
lastRecvItems_.erase(it);
}
}
MSCCLPP_API_CPP Communicator::~Communicator() = default;
MSCCLPP_API_CPP Communicator::Communicator(std::shared_ptr<Bootstrap> bootstrap, std::shared_ptr<Context> context)

View File

@@ -7,13 +7,13 @@
#include <mscclpp/npkit/npkit.hpp>
#endif
#include <mscclpp/atomic_device.hpp>
#include <mscclpp/numa.hpp>
#include <mscclpp/utils.hpp>
#include <sstream>
#include <thread>
#include "api.h"
#include "atomic.hpp"
#include "context.hpp"
#include "endpoint.hpp"
#include "gpu_utils_internal.hpp"
@@ -43,7 +43,10 @@ const RegisteredMemory::Impl& BaseConnection::getImpl(const RegisteredMemory& me
Context::Impl& BaseConnection::getImpl(Context& context) { return *(context.pimpl_); }
MSCCLPP_API_CPP BaseConnection::BaseConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint)
: context_(context), localEndpoint_(localEndpoint), maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()) {}
: context_(context),
localEndpoint_(localEndpoint),
maxWriteQueueSize_(localEndpoint.maxWriteQueueSize()),
gpuFlushDonePos_(detail::gpuCallocHostShared<uint64_t>()) {}
MSCCLPP_API_CPP std::shared_ptr<Context> BaseConnection::context() const { return context_; }
@@ -489,6 +492,35 @@ void IBConnection::flush(int64_t timeoutUsec) {
#endif
}
void IBConnection::requestFlush() {
// No-op: IB sends were already posted by prior conn.write() calls in handleTrigger.
// progressFlush() drives completion by polling the send CQ.
}
bool IBConnection::progressFlush() {
if (recvThreadError_.load(std::memory_order_acquire)) {
THROW(CONN, Error, ErrorCode::SystemError, "IBConnection recv thread failed: ", recvThreadErrorMsg_);
}
auto qp = qp_.lock();
if (!qp || qp->getNumSendCqItems() == 0) {
return true; // QP expired or CQ already drained.
}
int wcNum = qp->pollSendCq();
if (wcNum < 0) {
THROW(NET, IbError, errno, "pollSendCq failed in progressFlush");
}
for (int i = 0; i < wcNum; ++i) {
int status = qp->getSendWcStatus(i);
if (status != static_cast<int>(WsStatus::Success)) {
THROW(NET, Error, ErrorCode::SystemError,
"an IB work item failed in progressFlush: ", qp->getSendWcStatusString(i));
}
}
return qp->getNumSendCqItems() == 0;
}
// EthernetConnection
EthernetConnection::EthernetConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint,

View File

@@ -53,6 +53,8 @@ MSCCLPP_API_CPP void Fifo::pop() {
atomicStore(pimpl_->tail.get(), curTail + 1, memoryOrderRelease);
}
MSCCLPP_API_CPP uint64_t Fifo::tail() const { return *(pimpl_->tail); }
MSCCLPP_API_CPP int Fifo::size() const { return pimpl_->size; }
MSCCLPP_API_CPP FifoDeviceHandle Fifo::deviceHandle() const {

View File

@@ -4,18 +4,16 @@
#ifndef MSCCLPP_ATOMIC_HPP_
#define MSCCLPP_ATOMIC_HPP_
#if defined(MSCCLPP_USE_CUDA)
#ifndef MSCCLPP_DEVICE_CUDA
// On CUDA host-side compiles, force atomic_device.hpp's CUDA branch so host code uses
// cuda::atomic_ref (for system-scope ordering with GPU readers). On CUDA device compiles
// (MSCCLPP_DEVICE_CUDA already set by device.hpp) and on ROCm builds, include normally —
// atomic_device.hpp's branch selection works correctly without forcing.
#if defined(MSCCLPP_USE_CUDA) && !defined(MSCCLPP_DEVICE_CUDA)
#define MSCCLPP_DEVICE_CUDA
#include <mscclpp/atomic_device.hpp>
#undef MSCCLPP_DEVICE_CUDA
#endif // !defined(MSCCLPP_DEVICE_CUDA)
#else // !defined(MSCCLPP_USE_CUDA)
#ifndef MSCCLPP_DEVICE_HIP
#define MSCCLPP_DEVICE_HIP
#else
#include <mscclpp/atomic_device.hpp>
#undef MSCCLPP_DEVICE_HIP
#endif // !defined(MSCCLPP_DEVICE_HIP)
#endif // !defined(MSCCLPP_USE_CUDA)
#endif
#endif // MSCCLPP_ATOMIC_HPP_

View File

@@ -62,7 +62,7 @@ struct Communicator::Impl {
std::unordered_map<const BaseConnection*, ConnectionInfo> connectionInfos_;
// Temporary storage for the latest RecvItem of each {remoteRank, tag} pair.
// If the RecvItem gets ready, it will be removed at the next call to getLastRecvItem.
// The RecvItem is removed when it finishes or when getLastRecvItem observes that it is ready.
std::unordered_map<std::pair<int, int>, std::shared_ptr<BaseRecvItem>, PairHash> lastRecvItems_;
// RegisteredMemory items sent to the local rank of each tag. Sending memory to the local rank is
@@ -79,6 +79,9 @@ struct Communicator::Impl {
// If the item is ready, it will be removed from the map and nullptr will be returned.
std::shared_ptr<BaseRecvItem> getLastRecvItem(int remoteRank, int tag);
// Clear the last RecvItem only if it still matches the expected item.
void clearLastRecvItemIfMatches(int remoteRank, int tag, const std::shared_ptr<BaseRecvItem>& expectedItem);
struct Connector;
};

View File

@@ -50,6 +50,23 @@ class BaseConnection {
/// When false, the NIC writes directly to the semaphore's registered memory (e.g., via atomics).
virtual bool isSignalForwarding() const { return false; }
/// Request a flush. Subclasses that support async flush (e.g. IBConnection) override this
/// to be a no-op and rely on progressFlush() to drive completion via the proxy thread.
/// The default does a blocking flush(); progressFlush() then trivially returns true.
/// @note Only call from the proxy thread.
virtual void requestFlush() { flush(); }
/// Progress pending async flush operations (non-blocking CQ poll).
/// @note Only call from the proxy thread.
/// @return true if no flush is pending (CQ fully drained or no request).
virtual bool progressFlush() { return true; }
/// Get pointer to the GPU-visible flush-done position (host-pinned memory).
/// ProxyService writes "one past the highest completed FIFO position" here when the CQ
/// drains; GPU threads spin on it (`waitFlush`) until it surpasses their own push position.
/// @note Pointer is valid for the lifetime of this Connection.
uint64_t* getFlushDonePtr() const { return gpuFlushDonePos_.get(); }
virtual Transport transport() const = 0;
virtual Transport remoteTransport() const = 0;
@@ -75,6 +92,11 @@ class BaseConnection {
std::shared_ptr<Context> context_;
Endpoint localEndpoint_;
int maxWriteQueueSize_;
// GPU-visible flush-done position (host-pinned memory). ProxyService writes one past the
// highest FIFO position whose TriggerSync request has fully completed on this connection
// (CQ drained for IB, synchronous flush() returned for non-IB).
std::shared_ptr<uint64_t> gpuFlushDonePos_;
};
class CudaIpcConnection : public BaseConnection {
@@ -149,6 +171,9 @@ class IBConnection : public BaseConnection {
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
void flush(int64_t timeoutUsec) override;
void requestFlush() override;
bool progressFlush() override;
};
class EthernetConnection : public BaseConnection {

View File

@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#ifndef MSCCLPP_PROXY_IMPL_HPP_
#define MSCCLPP_PROXY_IMPL_HPP_
#include <atomic>
#include <functional>
#include <memory>
#include <mscclpp/fifo.hpp>
#include <mscclpp/proxy.hpp>
#include <thread>
namespace mscclpp {
struct Proxy::Impl {
ProxyHandler handler;
std::function<void()> threadInit;
std::function<void()> progressHandler;
std::shared_ptr<Fifo> fifo;
std::atomic_bool threadStarted;
std::thread service;
std::atomic_bool running;
Impl(ProxyHandler handler, std::function<void()> threadInit, int fifoSize)
: handler(handler),
threadInit(threadInit),
fifo(std::make_shared<Fifo>(fifoSize)),
threadStarted(false),
running(false) {}
// Must be called before start() — the proxy thread captures progressHandler at start time.
void setProgressHandler(std::function<void()> h) { progressHandler = std::move(h); }
};
} // namespace mscclpp
#endif // MSCCLPP_PROXY_IMPL_HPP_

View File

@@ -1,11 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// Licensed under the MIT License.
#include <mscclpp/numa.hpp>
#include <mscclpp/port_channel.hpp>
#include "api.h"
#include "debug.h"
#include "atomic.hpp"
#include "connection.hpp"
#include "logger.hpp"
#include "proxy_impl.hpp"
namespace mscclpp {
@@ -34,11 +37,12 @@ MSCCLPP_API_CPP ProxyService::ProxyService(int fifoSize) {
MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevice));
if (deviceNumaNode >= 0) {
numaBind(deviceNumaNode);
INFO(MSCCLPP_INIT, "NUMA node of ProxyService proxy thread is set to %d", deviceNumaNode);
INFO(CONN, "NUMA node of ProxyService proxy thread is set to ", deviceNumaNode);
}
};
auto handlerFunc = [&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); };
proxy_ = std::make_shared<Proxy>(handlerFunc, initFunc, fifoSize);
proxy_->pimpl_->setProgressHandler([this]() { progressFlushes(); });
}
MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator,
@@ -84,9 +88,28 @@ MSCCLPP_API_CPP PortChannel ProxyService::portChannel(SemaphoreId id, MemoryId d
MSCCLPP_API_CPP void ProxyService::startProxy(bool blocking) { proxy_->start(blocking); }
MSCCLPP_API_CPP void ProxyService::stopProxy() { proxy_->stop(); }
MSCCLPP_API_CPP void ProxyService::stopProxy() {
proxy_->stop();
// Drain pending flushes. After a bounded loop, force-unblock any still-pending GPU
// waiters with a sentinel write (UINT64_MAX > any FIFO position).
for (int i = 0; i < 1000 && !pendingFlushPos_.empty(); ++i) {
progressFlushes();
}
if (!pendingFlushPos_.empty()) {
WARN(CONN, "stopProxy: ", pendingFlushPos_.size(), " connections still pending; writing sentinel");
for (auto& [conn, pos] : pendingFlushPos_) {
if (uint64_t* ptr = conn->getFlushDonePtr()) atomicStore(ptr, UINT64_MAX, memoryOrderRelease);
}
pendingFlushPos_.clear();
}
}
ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) {
// The proxy is the sole FIFO consumer and processes in strict push order, so the FIFO's
// tail (between poll() and pop()) matches the value GPU's fifo_.push() returned for this
// trigger — use it directly as our per-trigger sequence number.
uint64_t pos = proxy_->fifo()->tail();
std::shared_ptr<Host2DeviceSemaphore> semaphore = semaphores_[trigger.fields.semaphoreId];
auto& conn = semaphore->connection();
@@ -105,9 +128,15 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) {
numRequests++;
}
if (((trigger.fields.type & TriggerSync) && numRequests > 0) ||
(maxWriteQueueSize != -1 && numRequests >= maxWriteQueueSize)) {
conn.flush();
if (trigger.fields.type & TriggerSync) {
// Record this TriggerSync's FIFO position. The GPU caller is spinning on
// flushDonePos_ > pos; progressFlushes() will publish pos+1 once the CQ drains.
// Later TriggerSyncs on the same conn overwrite — CQ drain completes them all at once.
conn.impl_->requestFlush();
pendingFlushPos_[conn.impl_] = pos;
numRequests = 0;
} else if (maxWriteQueueSize != -1 && numRequests >= maxWriteQueueSize) {
conn.flush(); // flow-control flush stays blocking
numRequests = 0;
}
@@ -115,12 +144,27 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) {
}
MSCCLPP_API_CPP BasePortChannel::DeviceHandle BasePortChannel::deviceHandle() const {
return BasePortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle());
auto& conn = semaphore_->connection();
return BasePortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle(),
conn.impl_->getFlushDonePtr());
}
MSCCLPP_API_CPP PortChannel::DeviceHandle PortChannel::deviceHandle() const {
return PortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle(), dst_,
src_);
auto& conn = semaphore_->connection();
return PortChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo()->deviceHandle(), dst_, src_,
conn.impl_->getFlushDonePtr());
}
void ProxyService::progressFlushes() {
for (auto it = pendingFlushPos_.begin(); it != pendingFlushPos_.end();) {
if (it->first->progressFlush()) {
// CQ drained: publish pos+1 to unblock GPU waiters whose own pos <= recorded pos.
atomicStore(it->first->getFlushDonePtr(), it->second + 1, memoryOrderRelease);
it = pendingFlushPos_.erase(it);
} else {
++it;
}
}
}
} // namespace mscclpp

View File

@@ -1,38 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <atomic>
#include <mscclpp/core.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <mscclpp/numa.hpp>
#include <mscclpp/proxy.hpp>
#include <mscclpp/utils.hpp>
#include <thread>
#include "api.h"
#include "debug.h"
#include "logger.hpp"
#include "proxy_impl.hpp"
namespace mscclpp {
constexpr int ProxyStopCheckPeriod = 1000;
constexpr int ProxyStartWarnPeriod = 1000;
struct Proxy::Impl {
ProxyHandler handler;
std::function<void()> threadInit;
std::shared_ptr<Fifo> fifo;
std::atomic_bool threadStarted;
std::thread service;
std::atomic_bool running;
Impl(ProxyHandler handler, std::function<void()> threadInit, int fifoSize)
: handler(handler),
threadInit(threadInit),
fifo(std::make_shared<Fifo>(fifoSize)),
threadStarted(false),
running(false) {}
};
MSCCLPP_API_CPP Proxy::Proxy(ProxyHandler handler, std::function<void()> threadInit, int fifoSize) {
pimpl_ = std::make_unique<Impl>(handler, threadInit, fifoSize);
}
@@ -70,18 +53,23 @@ MSCCLPP_API_CPP void Proxy::start(bool blocking) {
pimpl_->threadStarted.store(true, std::memory_order_release);
ProxyHandler handler = this->pimpl_->handler;
auto fifo = this->pimpl_->fifo;
ProxyHandler handler = pimpl_->handler;
auto progressHandler = pimpl_->progressHandler;
auto fifo = pimpl_->fifo;
ProxyTrigger trigger;
int runCnt = ProxyStopCheckPeriod;
for (;;) {
if (runCnt-- == 0) {
runCnt = ProxyStopCheckPeriod;
if (!this->pimpl_->running.load(std::memory_order_acquire)) {
if (!pimpl_->running.load(std::memory_order_acquire)) {
break;
}
}
// Per-iteration system work (e.g. progressing pending async flushes),
// run regardless of whether a trigger is ready this iteration.
if (progressHandler) progressHandler();
// Poll to see if we are ready to send anything
trigger = fifo->poll();
if (trigger.fst == 0 || trigger.snd == 0) { // TODO: this check is a potential pitfall for custom triggers
@@ -107,7 +95,7 @@ MSCCLPP_API_CPP void Proxy::start(bool blocking) {
count--;
if (count == 0) {
count = ProxyStartWarnPeriod;
WARN("Proxy thread startup taking longer than expected.");
WARN(CONN, "Proxy thread startup taking longer than expected.");
}
}
}

View File

@@ -160,6 +160,14 @@ class PortChannelOneToOneTest : public CommunicatorTestBase {
void testPacketPingPong(bool useIbOnly, IbMode ibMode = IbMode::Default);
void testPacketPingPongPerf(bool useIbOnly, IbMode ibMode = IbMode::Default);
void testBandwidth(PingPongTestParams params);
void setupMultiQpChannels(int numQps, size_t elemsPerChan, IbMode ibMode, int tagBase,
std::vector<std::shared_ptr<int>>& sendBuffs,
std::vector<mscclpp::RegisteredMemory>& localMems,
std::vector<mscclpp::RegisteredMemory>& remoteMems,
std::vector<mscclpp::PortChannel>& portChannels);
void testMultiQpBandwidth(IbMode ibMode, int numQps);
void testMultiQpFlushStress(IbMode ibMode, int numQps);
void testSameChanConcurrentFlush(IbMode ibMode);
std::shared_ptr<mscclpp::ProxyService> proxyService;
};

View File

@@ -625,3 +625,292 @@ PERF_TEST(PortChannelOneToOneTest, BandwidthIbHostNoAtomicMode) {
testBandwidth(PingPongTestParams{
.useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::HostNoAtomic});
}
static constexpr int kMaxQps = 4;
__constant__ DeviceHandle<mscclpp::PortChannel> gMultiQpPortChans[kMaxQps];
// Multi-QP bandwidth kernel: barrier on QP 0 only, then putWithSignal on all QPs.
// Only one signal/wait pair is needed for sync between two GPU kernels.
__global__ void kernelMultiQpBandwidth(int nElemPerChan, int nIters, int numQps) {
if (threadIdx.x != 0) return;
for (int i = 0; i < nIters; i++) {
// Barrier on QP 0 only — syncs both ranks
gMultiQpPortChans[0].signal();
gMultiQpPortChans[0].wait();
// Data transfer: put on all QPs simultaneously
for (int q = 0; q < numQps; q++) {
gMultiQpPortChans[q].putWithSignal(0, nElemPerChan * sizeof(int));
}
// Wait for all remote data arrivals
for (int q = 0; q < numQps; q++) {
gMultiQpPortChans[q].wait();
}
}
}
// Multi-QP setup helper: bootstrap N parallel IB connections + port channels in two
// futures-based phases (issue all async ops before resolving any, to avoid deadlock).
// tagBase: distinct base used by each caller so concurrent tests don't clash on tags.
void PortChannelOneToOneTest::setupMultiQpChannels(int numQps, size_t elemsPerChan, IbMode ibMode, int tagBase,
std::vector<std::shared_ptr<int>>& sendBuffs,
std::vector<mscclpp::RegisteredMemory>& localMems,
std::vector<mscclpp::RegisteredMemory>& remoteMems,
std::vector<mscclpp::PortChannel>& portChannels) {
const int peer = 1 - communicator->bootstrap()->getRank();
sendBuffs.assign(numQps, nullptr);
localMems.assign(numQps, mscclpp::RegisteredMemory{});
remoteMems.assign(numQps, mscclpp::RegisteredMemory{});
portChannels.clear();
std::vector<std::shared_future<mscclpp::Connection>> connFutures(numQps);
std::vector<std::shared_future<mscclpp::RegisteredMemory>> remoteMemFutures(numQps);
for (int q = 0; q < numQps; q++) {
sendBuffs[q] = mscclpp::GpuBuffer<int>(elemsPerChan).memory();
localMems[q] = communicator->registerMemory(sendBuffs[q].get(), elemsPerChan * sizeof(int), ibTransport);
mscclpp::EndpointConfig cfg;
cfg.transport = ibTransport;
cfg.ib.gidIndex = std::stoi(gEnv->args["ib_gid_index"]);
cfg.ib.mode = ibMode;
connFutures[q] = communicator->connect(cfg, peer, tagBase + q);
communicator->sendMemory(localMems[q], peer, tagBase + numQps + q);
remoteMemFutures[q] = communicator->recvMemory(peer, tagBase + numQps + q);
}
for (int q = 0; q < numQps; q++) {
auto conn = connFutures[q].get();
remoteMems[q] = remoteMemFutures[q].get();
auto sema = communicator->buildSemaphore(conn, peer, tagBase + 2 * numQps + q).get();
mscclpp::SemaphoreId cid = proxyService->addSemaphore(sema);
portChannels.emplace_back(
proxyService->portChannel(cid, proxyService->addMemory(remoteMems[q]), proxyService->addMemory(localMems[q])));
}
}
void PortChannelOneToOneTest::testMultiQpBandwidth(IbMode ibMode, int numQps) {
if (gEnv->rank >= numRanksToUse) return;
const int rank = communicator->bootstrap()->getRank();
const int maxElemPerChan = 32 * 1024 * 1024; // 128 MB per channel
std::vector<std::shared_ptr<int>> sendBuffs;
std::vector<mscclpp::RegisteredMemory> localMems;
std::vector<mscclpp::RegisteredMemory> remoteMems;
std::vector<mscclpp::PortChannel> portChannels;
setupMultiQpChannels(numQps, maxElemPerChan, ibMode, /*tagBase=*/100, sendBuffs, localMems, remoteMems, portChannels);
std::vector<DeviceHandle<mscclpp::PortChannel>> handles;
for (auto& ch : portChannels) handles.push_back(ch.deviceHandle());
ASSERT_EQ(handles.size(), static_cast<size_t>(numQps));
ASSERT_LE(numQps, kMaxQps); // numQps must not exceed __constant__ array size (kMaxQps)
MSCCLPP_CUDATHROW(
cudaMemcpyToSymbol(gMultiQpPortChans, handles.data(), numQps * sizeof(DeviceHandle<mscclpp::PortChannel>)));
proxyService->startProxy();
const std::string testName = ::mscclpp::test::currentTestName();
const std::string qpLabel = std::to_string(numQps) + " QP" + (numQps > 1 ? "s" : "");
for (int nElemPerChan :
{256, 16 * 1024, 256 * 1024, 1024 * 1024, 4 * 1024 * 1024, 16 * 1024 * 1024, 32 * 1024 * 1024}) {
int nIters = 10000;
// Warm-up
kernelMultiQpBandwidth<<<1, 1>>>(nElemPerChan, 10, numQps);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
communicator->bootstrap()->barrier();
// Measure
mscclpp::Timer timer;
kernelMultiQpBandwidth<<<1, 1>>>(nElemPerChan, nIters, numQps);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
double elapsedUs = timer.elapsed();
communicator->bootstrap()->barrier();
if (rank == 0) {
double totalBytes = (double)nElemPerChan * sizeof(int) * numQps;
double elapsedMsPerIter = elapsedUs / 1e3 / nIters;
double gbps = totalBytes / elapsedMsPerIter * 1e-6;
double totalSizeKB = totalBytes / 1024.0;
std::string label;
if (totalSizeKB >= 1024.0)
label = std::to_string((int)(totalSizeKB / 1024.0)) + " MB";
else
label = std::to_string((int)totalSizeKB) + " KB";
::mscclpp::test::reportPerfResult(label + " (" + qpLabel + ")", gbps, "GB/s");
}
}
proxyService->stopProxy();
for (auto& m : localMems) registeredMemories.push_back(m);
for (auto& m : remoteMems) registeredMemories.push_back(m);
}
PERF_TEST(PortChannelOneToOneTest, MultiQpBandwidthIbHostMode) {
REQUIRE_IBVERBS;
REQUIRE_GDR_FOR_IB_MODE(IbMode::Host);
for (int numQps : {1, 2, 4}) {
testMultiQpBandwidth(IbMode::Host, numQps);
}
}
PERF_TEST(PortChannelOneToOneTest, MultiQpBandwidthIbHostNoAtomicMode) {
REQUIRE_IBVERBS;
REQUIRE_GDR_FOR_IB_MODE(IbMode::HostNoAtomic);
for (int numQps : {1, 2, 4}) {
testMultiQpBandwidth(IbMode::HostNoAtomic, numQps);
}
}
// Multi-QP flush-stress kernel: one thread per QP, all calling putWithSignalAndFlush
// concurrently so all N CQ drains are in flight on the proxy thread at once.
// This is the concurrent-flush worst case the async-progress design protects against.
__global__ void kernelMultiQpFlushStress(int nElemPerChan, int nIters, int numQps) {
int q = threadIdx.x;
if (q >= numQps) return;
for (int i = 0; i < nIters; i++) {
if (q == 0) {
gMultiQpPortChans[0].signal();
gMultiQpPortChans[0].wait();
}
__syncthreads();
gMultiQpPortChans[q].putWithSignalAndFlush(0, nElemPerChan * sizeof(int));
gMultiQpPortChans[q].wait();
__syncthreads();
}
}
void PortChannelOneToOneTest::testMultiQpFlushStress(IbMode ibMode, int numQps) {
if (gEnv->rank >= numRanksToUse) return;
const int rank = communicator->bootstrap()->getRank();
const int maxElemPerChan = 64 * 1024;
std::vector<std::shared_ptr<int>> sendBuffs;
std::vector<mscclpp::RegisteredMemory> localMems;
std::vector<mscclpp::RegisteredMemory> remoteMems;
std::vector<mscclpp::PortChannel> portChannels;
setupMultiQpChannels(numQps, maxElemPerChan, ibMode, /*tagBase=*/400, sendBuffs, localMems, remoteMems, portChannels);
std::vector<DeviceHandle<mscclpp::PortChannel>> handles;
for (auto& ch : portChannels) handles.push_back(ch.deviceHandle());
ASSERT_EQ(handles.size(), static_cast<size_t>(numQps));
ASSERT_LE(numQps, kMaxQps);
MSCCLPP_CUDATHROW(
cudaMemcpyToSymbol(gMultiQpPortChans, handles.data(), numQps * sizeof(DeviceHandle<mscclpp::PortChannel>)));
proxyService->startProxy();
const std::string qpLabel = std::to_string(numQps) + " QP" + (numQps > 1 ? "s" : "");
for (int nElemPerChan : {256, 4 * 1024, 64 * 1024}) {
int nIters = 2000;
kernelMultiQpFlushStress<<<1, numQps>>>(nElemPerChan, 10, numQps);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
communicator->bootstrap()->barrier();
mscclpp::Timer timer;
kernelMultiQpFlushStress<<<1, numQps>>>(nElemPerChan, nIters, numQps);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
double elapsedUs = timer.elapsed();
communicator->bootstrap()->barrier();
if (rank == 0) {
double usPerIter = elapsedUs / nIters;
double usPerIterPerQp = usPerIter / numQps;
int bytesPerChan = nElemPerChan * (int)sizeof(int);
std::string sizeLabel = (bytesPerChan >= 1024) ? (std::to_string(bytesPerChan / 1024) + " KB")
: (std::to_string(bytesPerChan) + " B");
::mscclpp::test::reportPerfResult(sizeLabel + " (" + qpLabel + ") per-iter", usPerIter, "us");
::mscclpp::test::reportPerfResult(sizeLabel + " (" + qpLabel + ") per-iter/QP", usPerIterPerQp, "us");
}
}
proxyService->stopProxy();
for (auto& m : localMems) registeredMemories.push_back(m);
for (auto& m : remoteMems) registeredMemories.push_back(m);
}
PERF_TEST(PortChannelOneToOneTest, MultiQpFlushStressIbHostMode) {
REQUIRE_IBVERBS;
REQUIRE_GDR_FOR_IB_MODE(IbMode::Host);
for (int numQps : {1, 2, 4}) {
testMultiQpFlushStress(IbMode::Host, numQps);
}
}
PERF_TEST(PortChannelOneToOneTest, MultiQpFlushStressIbHostNoAtomicMode) {
REQUIRE_IBVERBS;
REQUIRE_GDR_FOR_IB_MODE(IbMode::HostNoAtomic);
for (int numQps : {1, 2, 4}) {
testMultiQpFlushStress(IbMode::HostNoAtomic, numQps);
}
}
// Same-channel concurrent-flush kernel: N GPU threads on the same PortChannel each call
// putWithSignalAndFlush in lockstep. Stresses the FIFO-position-based wait target so that
// each caller waits on its own TriggerSync rather than on a globally-incrementing counter
// that could be assigned out-of-order relative to the FIFO push order.
__constant__ DeviceHandle<mscclpp::PortChannel> gSingleChanForConcurrentFlush;
__global__ void kernelSameChanConcurrentFlush(int nIters) {
auto& chan = gSingleChanForConcurrentFlush;
int tid = threadIdx.x;
for (int i = 0; i < nIters; i++) {
// Each thread writes to a distinct slot (so puts don't overlap on remote side),
// then concurrently flushes on the same channel.
uint64_t offset = tid * sizeof(int);
chan.putWithSignalAndFlush(offset, offset, sizeof(int));
// Each thread waits for one signal from the remote rank's symmetric putWithSignalAndFlush.
chan.wait();
}
}
void PortChannelOneToOneTest::testSameChanConcurrentFlush(IbMode ibMode) {
if (gEnv->rank >= numRanksToUse) return;
constexpr int nThreads = 4;
std::vector<std::shared_ptr<int>> sendBuffs;
std::vector<mscclpp::RegisteredMemory> localMems;
std::vector<mscclpp::RegisteredMemory> remoteMems;
std::vector<mscclpp::PortChannel> portChannels;
setupMultiQpChannels(/*numQps=*/1, /*elemsPerChan=*/nThreads, ibMode, /*tagBase=*/700, sendBuffs, localMems,
remoteMems, portChannels);
DeviceHandle<mscclpp::PortChannel> handle = portChannels[0].deviceHandle();
MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gSingleChanForConcurrentFlush, &handle, sizeof(handle)));
proxyService->startProxy();
// Warm-up
kernelSameChanConcurrentFlush<<<1, nThreads>>>(10);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
communicator->bootstrap()->barrier();
// Measure: a successful completion (no deadlock, no CQ error) validates that each
// concurrent-flush caller waited on its own TriggerSync (not someone else's earlier one).
const int nIters = 500;
mscclpp::Timer timer;
kernelSameChanConcurrentFlush<<<1, nThreads>>>(nIters);
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
double elapsedUs = timer.elapsed();
communicator->bootstrap()->barrier();
if (communicator->bootstrap()->getRank() == 0) {
double usPerIter = elapsedUs / nIters;
::mscclpp::test::reportPerfResult(std::to_string(nThreads) + " threads same-chan per-iter", usPerIter, "us");
}
proxyService->stopProxy();
for (auto& m : localMems) registeredMemories.push_back(m);
for (auto& m : remoteMems) registeredMemories.push_back(m);
}
TEST(PortChannelOneToOneTest, SameChanConcurrentFlushIbHostMode) {
REQUIRE_IBVERBS;
REQUIRE_GDR_FOR_IB_MODE(IbMode::Host);
testSameChanConcurrentFlush(IbMode::Host);
}