mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-11 17:00:22 +00:00
Use PTX red for D2D semaphore signal (#768)
## Summary - Replace the two-step `signal()` implementation (`incOutbound()` + `atomicStore()`) with a single fire-and-forget PTX `red.release.sys.global.add.u64` instruction - This eliminates one local atomic fetch-add and replaces a remote store with a remote atomic add that has no return value — more efficient on both NVIDIA (PTX `red`) and AMD (compiler optimizes `(void)fetch_add` to fire-and-forget `flat_atomic_add_x2`) - Add a C++ perf test (`PERF_TEST`) in `mp_unit` for signal+wait ping-pong latency ### Performance (H100, 2 ranks, signal+wait round-trip) ``` SemaphorePerfTest.SignalPingPong: Store-based (old): 2.595 us/iter Red-based (new): 2.345 us/iter Speedup: 1.11x ``` ## Test plan - [x] Builds successfully (`make mp_unit_tests`) - [x] `mpirun -np 2 ./build/bin/mp_unit_tests --filter "SemaphorePerfTest"` — 1.11x speedup 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -82,7 +82,6 @@ class MemoryDevice2DeviceSemaphore {
|
||||
private:
|
||||
Semaphore semaphore_;
|
||||
detail::UniqueGpuPtr<uint64_t> expectedInboundToken_;
|
||||
detail::UniqueGpuPtr<uint64_t> outboundToken_;
|
||||
|
||||
public:
|
||||
/// Constructor.
|
||||
|
||||
@@ -82,19 +82,20 @@ struct MemoryDevice2DeviceSemaphoreDeviceHandle {
|
||||
|
||||
/// Signal remote device, ensures prior memory ops complete.
|
||||
MSCCLPP_DEVICE_INLINE void signal() {
|
||||
auto outbound = incOutbound();
|
||||
#if defined(MSCCLPP_DEVICE_CUDA) && (__CUDA_ARCH__ == 800)
|
||||
// Using memoryOrderSeqCst is faster for A100.
|
||||
atomicStore(remoteInboundToken, outbound, memoryOrderSeqCst);
|
||||
#else
|
||||
atomicStore(remoteInboundToken, outbound, memoryOrderRelease);
|
||||
#if defined(MSCCLPP_DEVICE_CUDA)
|
||||
asm volatile("red.release.sys.global.add.u64 [%0], %1;" ::"l"(remoteInboundToken), "l"((uint64_t)1) : "memory");
|
||||
#elif defined(MSCCLPP_DEVICE_HIP)
|
||||
(void)atomicFetchAdd(remoteInboundToken, (uint64_t)1, memoryOrderRelease);
|
||||
#endif
|
||||
}
|
||||
|
||||
/// Relaxed signal; no memory completion guarantee. Use it only for synchronizing execution, not data.
|
||||
MSCCLPP_DEVICE_INLINE void relaxedSignal() {
|
||||
auto outbound = incOutbound();
|
||||
atomicStore(remoteInboundToken, outbound, memoryOrderRelaxed);
|
||||
#if defined(MSCCLPP_DEVICE_CUDA)
|
||||
asm volatile("red.relaxed.sys.global.add.u64 [%0], %1;" ::"l"(remoteInboundToken), "l"((uint64_t)1) : "memory");
|
||||
#elif defined(MSCCLPP_DEVICE_HIP)
|
||||
(void)atomicFetchAdd(remoteInboundToken, (uint64_t)1, memoryOrderRelaxed);
|
||||
#endif
|
||||
}
|
||||
|
||||
/// Thread-safe read of expected inbound value.
|
||||
@@ -121,27 +122,12 @@ struct MemoryDevice2DeviceSemaphoreDeviceHandle {
|
||||
return atomicLoad<uint64_t, scopeSystem>(inboundToken, memoryOrderRelaxed);
|
||||
}
|
||||
|
||||
/// Thread-safe read of outbound value.
|
||||
/// @return The outbound value.
|
||||
MSCCLPP_DEVICE_INLINE uint64_t loadOutbound() {
|
||||
return atomicLoad<uint64_t, scopeDevice>(outboundToken, memoryOrderRelaxed);
|
||||
}
|
||||
|
||||
/// Thread-safe increment of outbound value.
|
||||
/// @return The incremented outbound value.
|
||||
MSCCLPP_DEVICE_INLINE uint64_t incOutbound() {
|
||||
return atomicFetchAdd<uint64_t, scopeDevice>(outboundToken, 1, memoryOrderRelaxed) + 1;
|
||||
}
|
||||
#endif // defined(MSCCLPP_DEVICE_COMPILE)
|
||||
|
||||
/// A local memory space where the remote device will write its semaphore value and the local device will read it.
|
||||
uint64_t* inboundToken;
|
||||
|
||||
/// A local memory space where the local device stores the semaphore value to be written to the remote device.
|
||||
uint64_t* outboundToken;
|
||||
|
||||
/// A remote memory space where the local device writes its outboundToken on. This is inboundToken of the
|
||||
/// remote device.
|
||||
/// A remote memory space where the local device atomically increments. This is inboundToken of the remote device.
|
||||
uint64_t* remoteInboundToken;
|
||||
|
||||
/// A local memory space where the local device stores the expected value of the inboundToken to wait for.
|
||||
|
||||
@@ -43,7 +43,6 @@ void register_semaphore(nb::module_& m) {
|
||||
nb::class_<MemoryDevice2DeviceSemaphore::DeviceHandle>(memoryDevice2DeviceSemaphore, "DeviceHandle")
|
||||
.def(nb::init<>())
|
||||
.def_rw("inbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::inboundToken)
|
||||
.def_rw("outbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::outboundToken)
|
||||
.def_rw("remote_inbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::remoteInboundToken)
|
||||
.def_rw("expected_inbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::expectedInboundToken)
|
||||
.def_prop_ro("raw", [](const MemoryDevice2DeviceSemaphore::DeviceHandle& self) -> nb::bytes {
|
||||
|
||||
@@ -183,9 +183,7 @@ MSCCLPP_API_CPP void Host2HostSemaphore::wait(int64_t maxSpinCount) {
|
||||
}
|
||||
|
||||
MSCCLPP_API_CPP MemoryDevice2DeviceSemaphore::MemoryDevice2DeviceSemaphore(const Semaphore& semaphore)
|
||||
: semaphore_(semaphore),
|
||||
expectedInboundToken_(detail::gpuCallocUnique<uint64_t>()),
|
||||
outboundToken_(detail::gpuCallocUnique<uint64_t>()) {
|
||||
: semaphore_(semaphore), expectedInboundToken_(detail::gpuCallocUnique<uint64_t>()) {
|
||||
if (connection().localDevice().type != DeviceType::GPU) {
|
||||
throw Error("Local endpoint device type of MemoryDevice2DeviceSemaphore should be GPU", ErrorCode::InvalidUsage);
|
||||
}
|
||||
@@ -202,7 +200,6 @@ MSCCLPP_API_CPP MemoryDevice2DeviceSemaphore::DeviceHandle MemoryDevice2DeviceSe
|
||||
device.remoteInboundToken = reinterpret_cast<uint64_t*>(semaphore_.remoteMemory().data());
|
||||
device.inboundToken = reinterpret_cast<uint64_t*>(semaphore_.localMemory().data());
|
||||
device.expectedInboundToken = expectedInboundToken_.get();
|
||||
device.outboundToken = outboundToken_.get();
|
||||
return device;
|
||||
};
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ target_sources(mp_unit_tests PRIVATE
|
||||
communicator_tests.cu
|
||||
port_channel_tests.cu
|
||||
memory_channel_tests.cu
|
||||
semaphore_perf_tests.cu
|
||||
switch_channel_tests.cu
|
||||
executor_tests.cc
|
||||
)
|
||||
|
||||
@@ -176,6 +176,12 @@ class MemoryChannelOneToOneTest : public CommunicatorTestBase {
|
||||
std::unordered_map<int, std::shared_ptr<mscclpp::MemoryDevice2DeviceSemaphore>> memorySemaphores;
|
||||
};
|
||||
|
||||
class SemaphorePerfTest : public CommunicatorTestBase {
|
||||
protected:
|
||||
void SetUp() override;
|
||||
void TearDown() override;
|
||||
};
|
||||
|
||||
class SwitchChannelTest : public CommunicatorTestBase {
|
||||
protected:
|
||||
void SetUp() override;
|
||||
|
||||
73
test/mp_unit/semaphore_perf_tests.cu
Normal file
73
test/mp_unit/semaphore_perf_tests.cu
Normal file
@@ -0,0 +1,73 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
#include <mscclpp/gpu_utils.hpp>
|
||||
#include <mscclpp/semaphore.hpp>
|
||||
|
||||
#include "mp_unit_tests.hpp"
|
||||
|
||||
void SemaphorePerfTest::SetUp() {
|
||||
// Need at least two ranks within a node
|
||||
if (gEnv->nRanksPerNode < 2) {
|
||||
SKIP_TEST();
|
||||
}
|
||||
setNumRanksToUse(2);
|
||||
CommunicatorTestBase::SetUp();
|
||||
}
|
||||
|
||||
void SemaphorePerfTest::TearDown() { CommunicatorTestBase::TearDown(); }
|
||||
|
||||
__constant__ mscclpp::MemoryDevice2DeviceSemaphoreDeviceHandle gSemaphorePerfTestHandle;
|
||||
|
||||
__global__ void kernelSemaphorePingPong(int rank, int nIters) {
|
||||
mscclpp::MemoryDevice2DeviceSemaphoreDeviceHandle& sem = gSemaphorePerfTestHandle;
|
||||
|
||||
// Warmup
|
||||
for (int i = 0; i < 10; i++) {
|
||||
if ((rank ^ (i & 1)) == 0) {
|
||||
sem.signal();
|
||||
} else {
|
||||
sem.wait();
|
||||
}
|
||||
}
|
||||
|
||||
// Timed iterations — alternating signal/wait like the memory channel ping-pong
|
||||
for (int i = 0; i < nIters; i++) {
|
||||
if ((rank ^ (i & 1)) == 0) {
|
||||
sem.signal();
|
||||
} else {
|
||||
sem.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PERF_TEST(SemaphorePerfTest, SignalPingPong) {
|
||||
if (gEnv->rank >= numRanksToUse) return;
|
||||
|
||||
connectMesh(/*useIpc=*/true, /*useIb=*/false, /*useEthernet=*/false);
|
||||
|
||||
int peerRank = (gEnv->rank == 0) ? 1 : 0;
|
||||
auto d2dSemaphore = std::make_shared<mscclpp::MemoryDevice2DeviceSemaphore>(*communicator, connections[peerRank]);
|
||||
|
||||
auto devHandle = d2dSemaphore->deviceHandle();
|
||||
MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gSemaphorePerfTestHandle, &devHandle, sizeof(devHandle)));
|
||||
|
||||
const int nIters = 1000;
|
||||
const std::string testName = ::mscclpp::test::currentTestName();
|
||||
|
||||
// Warmup run
|
||||
kernelSemaphorePingPong<<<1, 1>>>(gEnv->rank, nIters);
|
||||
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
|
||||
|
||||
communicator->bootstrap()->barrier();
|
||||
|
||||
// Timed run
|
||||
mscclpp::Timer timer;
|
||||
kernelSemaphorePingPong<<<1, 1>>>(gEnv->rank, nIters);
|
||||
MSCCLPP_CUDATHROW(cudaDeviceSynchronize());
|
||||
communicator->bootstrap()->barrier();
|
||||
|
||||
if (gEnv->rank == 0) {
|
||||
std::cout << testName << ": " << std::setprecision(4) << (float)timer.elapsed() / (float)nIters << " us/iter\n";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user