diff --git a/include/mscclpp/semaphore.hpp b/include/mscclpp/semaphore.hpp index 27f9aefa..85787c95 100644 --- a/include/mscclpp/semaphore.hpp +++ b/include/mscclpp/semaphore.hpp @@ -82,7 +82,6 @@ class MemoryDevice2DeviceSemaphore { private: Semaphore semaphore_; detail::UniqueGpuPtr expectedInboundToken_; - detail::UniqueGpuPtr outboundToken_; public: /// Constructor. diff --git a/include/mscclpp/semaphore_device.hpp b/include/mscclpp/semaphore_device.hpp index f1b01e89..a790a6e1 100644 --- a/include/mscclpp/semaphore_device.hpp +++ b/include/mscclpp/semaphore_device.hpp @@ -82,19 +82,20 @@ struct MemoryDevice2DeviceSemaphoreDeviceHandle { /// Signal remote device, ensures prior memory ops complete. MSCCLPP_DEVICE_INLINE void signal() { - auto outbound = incOutbound(); -#if defined(MSCCLPP_DEVICE_CUDA) && (__CUDA_ARCH__ == 800) - // Using memoryOrderSeqCst is faster for A100. - atomicStore(remoteInboundToken, outbound, memoryOrderSeqCst); -#else - atomicStore(remoteInboundToken, outbound, memoryOrderRelease); +#if defined(MSCCLPP_DEVICE_CUDA) + asm volatile("red.release.sys.global.add.u64 [%0], %1;" ::"l"(remoteInboundToken), "l"((uint64_t)1) : "memory"); +#elif defined(MSCCLPP_DEVICE_HIP) + (void)atomicFetchAdd(remoteInboundToken, (uint64_t)1, memoryOrderRelease); #endif } /// Relaxed signal; no memory completion guarantee. Use it only for synchronizing execution, not data. MSCCLPP_DEVICE_INLINE void relaxedSignal() { - auto outbound = incOutbound(); - atomicStore(remoteInboundToken, outbound, memoryOrderRelaxed); +#if defined(MSCCLPP_DEVICE_CUDA) + asm volatile("red.relaxed.sys.global.add.u64 [%0], %1;" ::"l"(remoteInboundToken), "l"((uint64_t)1) : "memory"); +#elif defined(MSCCLPP_DEVICE_HIP) + (void)atomicFetchAdd(remoteInboundToken, (uint64_t)1, memoryOrderRelaxed); +#endif } /// Thread-safe read of expected inbound value. @@ -121,27 +122,12 @@ struct MemoryDevice2DeviceSemaphoreDeviceHandle { return atomicLoad(inboundToken, memoryOrderRelaxed); } - /// Thread-safe read of outbound value. - /// @return The outbound value. - MSCCLPP_DEVICE_INLINE uint64_t loadOutbound() { - return atomicLoad(outboundToken, memoryOrderRelaxed); - } - - /// Thread-safe increment of outbound value. - /// @return The incremented outbound value. - MSCCLPP_DEVICE_INLINE uint64_t incOutbound() { - return atomicFetchAdd(outboundToken, 1, memoryOrderRelaxed) + 1; - } #endif // defined(MSCCLPP_DEVICE_COMPILE) /// A local memory space where the remote device will write its semaphore value and the local device will read it. uint64_t* inboundToken; - /// A local memory space where the local device stores the semaphore value to be written to the remote device. - uint64_t* outboundToken; - - /// A remote memory space where the local device writes its outboundToken on. This is inboundToken of the - /// remote device. + /// A remote memory space where the local device atomically increments. This is inboundToken of the remote device. uint64_t* remoteInboundToken; /// A local memory space where the local device stores the expected value of the inboundToken to wait for. diff --git a/python/csrc/semaphore_py.cpp b/python/csrc/semaphore_py.cpp index 36d559f2..17c06a7d 100644 --- a/python/csrc/semaphore_py.cpp +++ b/python/csrc/semaphore_py.cpp @@ -43,7 +43,6 @@ void register_semaphore(nb::module_& m) { nb::class_(memoryDevice2DeviceSemaphore, "DeviceHandle") .def(nb::init<>()) .def_rw("inbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::inboundToken) - .def_rw("outbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::outboundToken) .def_rw("remote_inbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::remoteInboundToken) .def_rw("expected_inbound_token", &MemoryDevice2DeviceSemaphore::DeviceHandle::expectedInboundToken) .def_prop_ro("raw", [](const MemoryDevice2DeviceSemaphore::DeviceHandle& self) -> nb::bytes { diff --git a/src/core/semaphore.cc b/src/core/semaphore.cc index c6eb1e23..bea43327 100644 --- a/src/core/semaphore.cc +++ b/src/core/semaphore.cc @@ -183,9 +183,7 @@ MSCCLPP_API_CPP void Host2HostSemaphore::wait(int64_t maxSpinCount) { } MSCCLPP_API_CPP MemoryDevice2DeviceSemaphore::MemoryDevice2DeviceSemaphore(const Semaphore& semaphore) - : semaphore_(semaphore), - expectedInboundToken_(detail::gpuCallocUnique()), - outboundToken_(detail::gpuCallocUnique()) { + : semaphore_(semaphore), expectedInboundToken_(detail::gpuCallocUnique()) { if (connection().localDevice().type != DeviceType::GPU) { throw Error("Local endpoint device type of MemoryDevice2DeviceSemaphore should be GPU", ErrorCode::InvalidUsage); } @@ -202,7 +200,6 @@ MSCCLPP_API_CPP MemoryDevice2DeviceSemaphore::DeviceHandle MemoryDevice2DeviceSe device.remoteInboundToken = reinterpret_cast(semaphore_.remoteMemory().data()); device.inboundToken = reinterpret_cast(semaphore_.localMemory().data()); device.expectedInboundToken = expectedInboundToken_.get(); - device.outboundToken = outboundToken_.get(); return device; }; diff --git a/test/mp_unit/CMakeLists.txt b/test/mp_unit/CMakeLists.txt index b99bb09d..d4004e8e 100644 --- a/test/mp_unit/CMakeLists.txt +++ b/test/mp_unit/CMakeLists.txt @@ -8,6 +8,7 @@ target_sources(mp_unit_tests PRIVATE communicator_tests.cu port_channel_tests.cu memory_channel_tests.cu + semaphore_perf_tests.cu switch_channel_tests.cu executor_tests.cc ) diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index 03e4cbde..5f95d660 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -176,6 +176,12 @@ class MemoryChannelOneToOneTest : public CommunicatorTestBase { std::unordered_map> memorySemaphores; }; +class SemaphorePerfTest : public CommunicatorTestBase { + protected: + void SetUp() override; + void TearDown() override; +}; + class SwitchChannelTest : public CommunicatorTestBase { protected: void SetUp() override; diff --git a/test/mp_unit/semaphore_perf_tests.cu b/test/mp_unit/semaphore_perf_tests.cu new file mode 100644 index 00000000..92560539 --- /dev/null +++ b/test/mp_unit/semaphore_perf_tests.cu @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include +#include + +#include "mp_unit_tests.hpp" + +void SemaphorePerfTest::SetUp() { + // Need at least two ranks within a node + if (gEnv->nRanksPerNode < 2) { + SKIP_TEST(); + } + setNumRanksToUse(2); + CommunicatorTestBase::SetUp(); +} + +void SemaphorePerfTest::TearDown() { CommunicatorTestBase::TearDown(); } + +__constant__ mscclpp::MemoryDevice2DeviceSemaphoreDeviceHandle gSemaphorePerfTestHandle; + +__global__ void kernelSemaphorePingPong(int rank, int nIters) { + mscclpp::MemoryDevice2DeviceSemaphoreDeviceHandle& sem = gSemaphorePerfTestHandle; + + // Warmup + for (int i = 0; i < 10; i++) { + if ((rank ^ (i & 1)) == 0) { + sem.signal(); + } else { + sem.wait(); + } + } + + // Timed iterations — alternating signal/wait like the memory channel ping-pong + for (int i = 0; i < nIters; i++) { + if ((rank ^ (i & 1)) == 0) { + sem.signal(); + } else { + sem.wait(); + } + } +} + +PERF_TEST(SemaphorePerfTest, SignalPingPong) { + if (gEnv->rank >= numRanksToUse) return; + + connectMesh(/*useIpc=*/true, /*useIb=*/false, /*useEthernet=*/false); + + int peerRank = (gEnv->rank == 0) ? 1 : 0; + auto d2dSemaphore = std::make_shared(*communicator, connections[peerRank]); + + auto devHandle = d2dSemaphore->deviceHandle(); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gSemaphorePerfTestHandle, &devHandle, sizeof(devHandle))); + + const int nIters = 1000; + const std::string testName = ::mscclpp::test::currentTestName(); + + // Warmup run + kernelSemaphorePingPong<<<1, 1>>>(gEnv->rank, nIters); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + communicator->bootstrap()->barrier(); + + // Timed run + mscclpp::Timer timer; + kernelSemaphorePingPong<<<1, 1>>>(gEnv->rank, nIters); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + communicator->bootstrap()->barrier(); + + if (gEnv->rank == 0) { + std::cout << testName << ": " << std::setprecision(4) << (float)timer.elapsed() / (float)nIters << " us/iter\n"; + } +}