mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-04-20 06:49:29 +00:00
Atomic for semaphores instead of fences (#188)
Co-authored-by: Pratyush Patel <pratyushpatel.1995@gmail.com> Co-authored-by: Esha Choukse <eschouks@microsoft.com> Co-authored-by: Changho Hwang <changhohwang@microsoft.com>
This commit is contained in:
@@ -4,6 +4,8 @@
|
||||
#ifndef MSCCLPP_FIFO_DEVICE_HPP_
|
||||
#define MSCCLPP_FIFO_DEVICE_HPP_
|
||||
|
||||
#include <cuda/atomic>
|
||||
|
||||
#include "poll.hpp"
|
||||
|
||||
namespace mscclpp {
|
||||
@@ -38,7 +40,9 @@ struct FifoDeviceHandle {
|
||||
/// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative.
|
||||
/// @return The new head of the FIFO.
|
||||
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger, int64_t maxSpinCount = 1000000) {
|
||||
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1);
|
||||
uint64_t curFifoHead =
|
||||
cuda::atomic_ref<uint64_t, cuda::thread_scope_device>{*this->head}.fetch_add(1, cuda::memory_order_relaxed);
|
||||
|
||||
// make the last bit intentionally non-zero so that we can safely poll. Don't worry, we will change it back in host
|
||||
// side
|
||||
trigger.snd ^= ((uint64_t)1 << (uint64_t)63);
|
||||
@@ -46,15 +50,22 @@ struct FifoDeviceHandle {
|
||||
// Only one of two conditions need to be met to proceed. Either the tail has advanced enough or where we need to
|
||||
// write to is 0. However, the first condition is faster to check since the tail is flushed periodically anyways but
|
||||
// for the second condition we need to read CPU memory.
|
||||
// As volatile access is slow, we first check using the bare pointer and then use the volatile pointer if the
|
||||
// As atomic access is slow, we first check using the bare pointer and then use the atomic load if the
|
||||
// condition is not met.
|
||||
if (curFifoHead >= size + *(this->tailReplica)) {
|
||||
OR_POLL_MAYBE_JAILBREAK(curFifoHead >= size + *((volatile uint64_t*)this->tailReplica),
|
||||
*(volatile uint64_t*)&this->triggers[curFifoHead % size] != 0, maxSpinCount);
|
||||
OR_POLL_MAYBE_JAILBREAK(
|
||||
(curFifoHead >= size + cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*this->tailReplica}.load(
|
||||
cuda::memory_order_relaxed)),
|
||||
(cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{this->triggers[curFifoHead % size].fst}.load(
|
||||
cuda::memory_order_relaxed) != 0),
|
||||
maxSpinCount);
|
||||
}
|
||||
|
||||
ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % size]);
|
||||
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd));
|
||||
|
||||
// store with memory order release so that the while loop does not go pass this.
|
||||
asm volatile("st.global.release.cta.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd));
|
||||
|
||||
return curFifoHead;
|
||||
}
|
||||
|
||||
@@ -65,8 +76,12 @@ struct FifoDeviceHandle {
|
||||
__forceinline__ __device__ void sync(uint64_t curFifoHead, int64_t maxSpinCount = 1000000) {
|
||||
// Same as push but in this case checking the fist condition is probably faster since for tail to be pushed we need
|
||||
// to wait for cudaMemcpy to be done.
|
||||
OR_POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % size]) != 0,
|
||||
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead, maxSpinCount);
|
||||
OR_POLL_MAYBE_JAILBREAK(
|
||||
(curFifoHead >=
|
||||
cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*this->tailReplica}.load(cuda::memory_order_relaxed)),
|
||||
(cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{this->triggers[curFifoHead % size].fst}.load(
|
||||
cuda::memory_order_relaxed) != 0),
|
||||
maxSpinCount);
|
||||
}
|
||||
#endif // __CUDACC__
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
|
||||
#ifdef __CUDACC__
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
extern "C" __device__ void __assert_fail(const char *__assertion, const char *__file, unsigned int __line,
|
||||
const char *__function) __THROW;
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
#ifndef MSCCLPP_SEMAPHORE_DEVICE_HPP_
|
||||
#define MSCCLPP_SEMAPHORE_DEVICE_HPP_
|
||||
|
||||
#include <cuda/atomic>
|
||||
|
||||
#include "poll.hpp"
|
||||
|
||||
namespace mscclpp {
|
||||
@@ -14,7 +16,8 @@ struct Host2DeviceSemaphoreDeviceHandle {
|
||||
/// Poll if the host has signaled.
|
||||
/// @return true if the host has signaled.
|
||||
__forceinline__ __device__ bool poll() {
|
||||
bool signaled = (*(volatile uint64_t*)(inboundSemaphoreId) > (*expectedInboundSemaphoreId));
|
||||
bool signaled = (cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*inboundSemaphoreId}.load(
|
||||
cuda::memory_order_acquire) > (*expectedInboundSemaphoreId));
|
||||
if (signaled) (*expectedInboundSemaphoreId) += 1;
|
||||
return signaled;
|
||||
}
|
||||
@@ -22,7 +25,9 @@ struct Host2DeviceSemaphoreDeviceHandle {
|
||||
/// Wait for the host to signal.
|
||||
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) {
|
||||
(*expectedInboundSemaphoreId) += 1;
|
||||
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)(inboundSemaphoreId) < (*expectedInboundSemaphoreId), maxSpinCount);
|
||||
POLL_MAYBE_JAILBREAK((cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*inboundSemaphoreId}.load(
|
||||
cuda::memory_order_acquire) < (*expectedInboundSemaphoreId)),
|
||||
maxSpinCount);
|
||||
}
|
||||
#endif // __CUDACC__
|
||||
|
||||
@@ -36,7 +41,8 @@ struct SmDevice2DeviceSemaphoreDeviceHandle {
|
||||
/// Poll if the remote device has signaled.
|
||||
/// @return true if the remote device has signaled.
|
||||
__forceinline__ __device__ bool poll() {
|
||||
bool signaled = ((*inboundSemaphoreId) > (*expectedInboundSemaphoreId));
|
||||
bool signaled = (cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*inboundSemaphoreId}.load(
|
||||
cuda::memory_order_acquire) > (*expectedInboundSemaphoreId));
|
||||
if (signaled) (*expectedInboundSemaphoreId) += 1;
|
||||
return signaled;
|
||||
}
|
||||
@@ -44,7 +50,9 @@ struct SmDevice2DeviceSemaphoreDeviceHandle {
|
||||
/// Wait for the remote device to signal.
|
||||
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) {
|
||||
(*expectedInboundSemaphoreId) += 1;
|
||||
POLL_MAYBE_JAILBREAK((*inboundSemaphoreId) < (*expectedInboundSemaphoreId), maxSpinCount);
|
||||
POLL_MAYBE_JAILBREAK((cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*inboundSemaphoreId}.load(
|
||||
cuda::memory_order_acquire) < (*expectedInboundSemaphoreId)),
|
||||
maxSpinCount);
|
||||
}
|
||||
|
||||
/// Signal the remote device.
|
||||
@@ -55,9 +63,9 @@ struct SmDevice2DeviceSemaphoreDeviceHandle {
|
||||
__forceinline__ __device__ void signal() {
|
||||
// This fence ensures that preceding writes are visible on the peer GPU before the incremented
|
||||
// `outboundSemaphoreId` is visible.
|
||||
__threadfence_system();
|
||||
semaphoreIncrement();
|
||||
*remoteInboundSemaphoreId = semaphoreGetLocal();
|
||||
cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*remoteInboundSemaphoreId}.store(semaphoreGetLocal(),
|
||||
cuda::memory_order_seq_cst);
|
||||
}
|
||||
|
||||
/// Signal the remote device for copied packets.
|
||||
@@ -78,9 +86,9 @@ struct SmDevice2DeviceSemaphoreDeviceHandle {
|
||||
__forceinline__ __device__ uint64_t semaphoreGetLocal() const { return *outboundSemaphoreId; }
|
||||
#endif // __CUDACC__
|
||||
|
||||
volatile uint64_t* inboundSemaphoreId;
|
||||
uint64_t* inboundSemaphoreId;
|
||||
uint64_t* outboundSemaphoreId;
|
||||
volatile uint64_t* remoteInboundSemaphoreId;
|
||||
uint64_t* remoteInboundSemaphoreId;
|
||||
uint64_t* expectedInboundSemaphoreId;
|
||||
};
|
||||
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
typedef unsigned long long uint64_t;
|
||||
typedef unsigned long long uintptr_t;
|
||||
typedef unsigned int uint32_t;
|
||||
typedef unsigned short uint16_t;
|
||||
@@ -1,8 +1,6 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include "common.hpp"
|
||||
// other headers
|
||||
#include <mscclpp/semaphore_device.hpp>
|
||||
|
||||
// be careful about using semaphore[my_rank] as it is an invalid semaphore and it is there just for simplicity of
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include "common.hpp"
|
||||
// other headers
|
||||
#include "mscclpp/fifo_device.hpp"
|
||||
#include <mscclpp/fifo_device.hpp>
|
||||
|
||||
extern "C" __global__ void __launch_bounds__(1024, 1) fifo(mscclpp::FifoDeviceHandle fifo) {
|
||||
mscclpp::ProxyTrigger trigger;
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include "common.hpp"
|
||||
// other headers
|
||||
#include <mscclpp/semaphore_device.hpp>
|
||||
|
||||
// be careful about using semaphore[my_rank] as it is an invalid semaphore and it is there just for simplicity of
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include "common.hpp"
|
||||
// other headers
|
||||
#include <mscclpp/fifo_device.hpp>
|
||||
#include <mscclpp/semaphore_device.hpp>
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include "common.hpp"
|
||||
// other headers
|
||||
#include <mscclpp/packet.hpp>
|
||||
#include <mscclpp/proxy_channel_device.hpp>
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include "common.hpp"
|
||||
// other headers
|
||||
#include <mscclpp/sm_channel_device.hpp>
|
||||
|
||||
// be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing
|
||||
|
||||
11
src/fifo.cc
11
src/fifo.cc
@@ -1,6 +1,7 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include <cuda/atomic>
|
||||
#include <mscclpp/cuda_utils.hpp>
|
||||
#include <mscclpp/fifo.hpp>
|
||||
|
||||
@@ -40,15 +41,17 @@ MSCCLPP_API_CPP Fifo::~Fifo() = default;
|
||||
|
||||
MSCCLPP_API_CPP ProxyTrigger Fifo::poll() {
|
||||
ProxyTrigger trigger;
|
||||
volatile ProxyTrigger* ptr =
|
||||
reinterpret_cast<volatile ProxyTrigger*>(&pimpl->triggers.get()[pimpl->hostTail % pimpl->size]);
|
||||
trigger.fst = ptr->fst;
|
||||
ProxyTrigger* ptr = &pimpl->triggers.get()[pimpl->hostTail % pimpl->size];
|
||||
// we are loading fst first. if fst is non-zero then snd is also valid
|
||||
trigger.fst = cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{ptr->fst}.load(cuda::memory_order_acquire);
|
||||
trigger.snd = ptr->snd;
|
||||
return trigger;
|
||||
}
|
||||
|
||||
MSCCLPP_API_CPP void Fifo::pop() {
|
||||
*(volatile uint64_t*)(&pimpl->triggers.get()[pimpl->hostTail % pimpl->size]) = 0;
|
||||
cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{pimpl->triggers.get()[pimpl->hostTail % pimpl->size].fst}.store(
|
||||
0, cuda::memory_order_release);
|
||||
|
||||
(pimpl->hostTail)++;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
#include <cuda/atomic>
|
||||
#include <mscclpp/semaphore.hpp>
|
||||
|
||||
#include "api.h"
|
||||
@@ -66,7 +67,8 @@ MSCCLPP_API_CPP void Host2HostSemaphore::signal() {
|
||||
}
|
||||
|
||||
MSCCLPP_API_CPP bool Host2HostSemaphore::poll() {
|
||||
bool signaled = (*(volatile uint64_t*)localInboundSemaphore_.get() > (*expectedInboundSemaphore_));
|
||||
bool signaled = (cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*(uint64_t*)localInboundSemaphore_.get()}.load(
|
||||
cuda::memory_order_acquire) > (*expectedInboundSemaphore_));
|
||||
if (signaled) (*expectedInboundSemaphore_) += 1;
|
||||
return signaled;
|
||||
}
|
||||
@@ -74,7 +76,8 @@ MSCCLPP_API_CPP bool Host2HostSemaphore::poll() {
|
||||
MSCCLPP_API_CPP void Host2HostSemaphore::wait(int64_t maxSpinCount) {
|
||||
(*expectedInboundSemaphore_) += 1;
|
||||
int64_t spinCount = 0;
|
||||
while (*(volatile uint64_t*)localInboundSemaphore_.get() < (*expectedInboundSemaphore_)) {
|
||||
while (cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*(uint64_t*)localInboundSemaphore_.get()}.load(
|
||||
cuda::memory_order_acquire) < (*expectedInboundSemaphore_)) {
|
||||
if (spinCount++ == maxSpinCount) {
|
||||
throw Error("Host2HostSemaphore::wait timed out", ErrorCode::Timeout);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user