This commit is contained in:
Changho Hwang
2026-03-05 23:28:39 +00:00
parent 448ceb66f6
commit 7ce841bed0
3 changed files with 28 additions and 28 deletions

View File

@@ -233,7 +233,7 @@ void IBConnection::recvThreadFunc() {
newValueHost = *static_cast<volatile uint64_t*>(localSignalGpuPtr_);
}
// Read token address from the local stored address (set by setRemoteUpdateDstAddr)
// Read token address from the local stored address (set by setSignalForwardingDst)
if (remoteUpdateDstAddr_ != 0) {
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(remoteUpdateDstAddr_);
@@ -319,7 +319,7 @@ IBConnection::IBConnection(std::shared_ptr<Context> context, const Endpoint& loc
// Data Direct requires all three conditions:
// 1. Signal GPU buffer MR registered with MLX5DV_REG_DMABUF_ACCESS_DATA_DIRECT
// 2. Local signal GPU GDRCopy mapping pinned with GDR_PIN_FLAG_FORCE_PCIE
// 3. (remoteUpdateDstAddr GDRCopy mapping checked at setRemoteUpdateDstAddr time)
// 3. (signal forwarding dst GDRCopy mapping checked at setSignalForwardingDst time)
// When all conditions are met, RDMA data writes and GDRCopy token writes both go
// through the Data Direct engine, guaranteeing GPU memory visibility at CQE poll time.
auto qp = qp_.lock();
@@ -356,23 +356,23 @@ Transport IBConnection::transport() const { return transport_; }
Transport IBConnection::remoteTransport() const { return remoteTransport_; }
bool IBConnection::usesRecvThread() const { return ibNoAtomic_; }
bool IBConnection::usesSignalForwarding() const { return ibNoAtomic_; }
void IBConnection::setRemoteUpdateDstAddr(std::shared_ptr<uint64_t> gpuMem) {
remoteUpdateDstAddr_ = reinterpret_cast<uint64_t>(gpuMem.get());
void IBConnection::setSignalForwardingDst(std::shared_ptr<uint64_t> mem) {
remoteUpdateDstAddr_ = reinterpret_cast<uint64_t>(mem.get());
if (gdrEnabled()) {
if (gpuMem) {
remoteUpdateDstAddrMap_ = std::make_unique<GdrMap>(std::move(gpuMem), localGpuDeviceId_);
if (mem) {
remoteUpdateDstAddrMap_ = std::make_unique<GdrMap>(std::move(mem), localGpuDeviceId_);
// Data Direct requires the token write mapping to also use FORCE_PCIE
if (dataDirectEnabled_ && !(remoteUpdateDstAddrMap_ && remoteUpdateDstAddrMap_->valid())) {
dataDirectEnabled_ = false;
INFO(CONN, "IBConnection: Data Direct disabled (remoteUpdateDstAddr GDRCopy mapping not available)");
INFO(CONN, "IBConnection: Data Direct disabled (signal forwarding dst GDRCopy mapping not available)");
}
} else {
remoteUpdateDstAddrMap_.reset();
}
}
INFO(CONN, "IBConnection setRemoteUpdateDstAddr: ", (void*)remoteUpdateDstAddr_);
INFO(CONN, "IBConnection setSignalForwardingDst: ", (void*)remoteUpdateDstAddr_);
}
void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,

View File

@@ -37,16 +37,16 @@ class BaseConnection {
virtual void flush(int64_t timeoutUsec = -1) = 0;
/// Set the local address where remote updateAndSync operations should write.
/// This is called by the receiver to specify where incoming signals should be written.
/// Set the local address where forwarded signals should be written.
/// This is called by the receiver to specify where incoming signals should be forwarded.
/// Default implementation is a no-op for connections that don't need it.
/// @param gpuMem Shared pointer to the GPU/CPU memory for incoming writes (nullptr to clear).
virtual void setRemoteUpdateDstAddr(std::shared_ptr<uint64_t> /*gpuMem*/) {}
/// @param mem Shared pointer to the memory for incoming writes (nullptr to clear).
virtual void setSignalForwardingDst(std::shared_ptr<uint64_t> /*mem*/) {}
/// Whether this connection uses a recv thread for signaling (host-no-atomic mode).
/// Whether this connection uses signal forwarding (e.g., IB host-no-atomic mode).
/// When true, the semaphore must allocate a separate inboundToken_ for the recv thread to write to.
/// When false, the NIC writes directly to the semaphore's registered memory (e.g., via atomics).
virtual bool usesRecvThread() const { return false; }
virtual bool usesSignalForwarding() const { return false; }
virtual Transport transport() const = 0;
@@ -137,12 +137,12 @@ class IBConnection : public BaseConnection {
IBConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint, const Endpoint& remoteEndpoint);
~IBConnection();
/// Set the local address where remote updateAndSync operations will write.
/// Set the local address where forwarded signals should be written.
/// Must be called before the remote sends any updateAndSync in host-no-atomic mode.
/// @param gpuMem Shared pointer to the GPU/CPU memory for incoming writes (nullptr to clear).
void setRemoteUpdateDstAddr(std::shared_ptr<uint64_t> gpuMem) override;
/// @param mem Shared pointer to the memory for incoming writes (nullptr to clear).
void setSignalForwardingDst(std::shared_ptr<uint64_t> mem) override;
bool usesRecvThread() const override;
bool usesSignalForwarding() const override;
Transport transport() const override;

View File

@@ -123,8 +123,8 @@ MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(const Semaphore& sema
THROW(CONN, Error, ErrorCode::InvalidUsage, "Local endpoint device type of Host2DeviceSemaphore should be GPU");
}
auto connImpl = BaseConnection::getImpl(connection());
if (connImpl->usesRecvThread()) {
// Host-no-atomic mode: the recv thread writes the token to GPU memory.
if (connImpl->usesSignalForwarding()) {
// Signal forwarding mode: the recv thread writes the token to GPU memory.
// Allocate a separate inbound token via plain cudaMalloc (not TokenPool/VMM)
// so that it is always compatible with GDRCopy pinning (VMM memory cannot be pinned by gdr_pin_buffer).
CudaDeviceGuard deviceGuard(connection().localDevice().id);
@@ -133,9 +133,9 @@ MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(const Semaphore& sema
#else
inboundToken_ = detail::gpuCallocShared<uint64_t>();
#endif
connImpl->setRemoteUpdateDstAddr(inboundToken_);
connImpl->setSignalForwardingDst(inboundToken_);
}
// When usesRecvThread() is false (e.g., atomic mode), inboundToken_ stays null
// When usesSignalForwarding() is false (e.g., atomic mode), inboundToken_ stays null
// and the GPU polls the SemaphoreStub token directly (the NIC atomic target).
}
@@ -144,9 +144,9 @@ MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(Communicator& communi
MSCCLPP_API_CPP Host2DeviceSemaphore::~Host2DeviceSemaphore() {
if (inboundToken_) {
// Clear the connection's remote update address (and any associated GdrMap)
// Clear the connection's signal forwarding destination (and any associated GdrMap)
// before inboundToken_ is freed, to avoid use-after-free on the pinned GPU memory.
BaseConnection::getImpl(connection())->setRemoteUpdateDstAddr(nullptr);
BaseConnection::getImpl(connection())->setSignalForwardingDst(nullptr);
}
}
@@ -178,12 +178,12 @@ MSCCLPP_API_CPP Host2HostSemaphore::Host2HostSemaphore(const Semaphore& semaphor
THROW(CONN, Error, ErrorCode::InvalidUsage, "Local endpoint device type of Host2HostSemaphore should be CPU");
}
auto connImpl = BaseConnection::getImpl(connection());
if (connImpl->usesRecvThread()) {
// Host-no-atomic mode: tell the recv thread where to write the incoming token.
if (connImpl->usesSignalForwarding()) {
// Signal forwarding mode: tell the recv thread where to write the incoming token.
// Non-owning shared_ptr: Host2HostSemaphore outlives the connection, so the memory stays valid.
auto token =
std::shared_ptr<uint64_t>(reinterpret_cast<uint64_t*>(semaphore_.localMemory().data()), [](uint64_t*) {});
connImpl->setRemoteUpdateDstAddr(std::move(token));
connImpl->setSignalForwardingDst(std::move(token));
}
}