Fix a FIFO correctness bug (#549)

* Add a FIFO test code that reproduced a correctness issue
* Fix the correctness issue by using pinned memory instead of cudaMemcpy

---------

Co-authored-by: Binyang Li <binyli@microsoft.com>
This commit is contained in:
Changho Hwang
2025-07-11 16:53:59 -07:00
committed by GitHub
parent 9b71d524b3
commit 20eca28942
18 changed files with 96 additions and 155 deletions

View File

@@ -205,6 +205,30 @@ steps:
kill $CHILD_PID
workingDirectory: '$(System.DefaultWorkingDirectory)'
- task: Bash@3
name: FifoPerfBenchmark
displayName: FIFO Performance Benchmark
inputs:
targetType: 'inline'
script: |
set -e
HOSTFILE=$(System.DefaultWorkingDirectory)/test/deploy/hostfile_ci
SSH_OPTION="StrictHostKeyChecking=no"
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
: > azureuser@10.0.0.4
tail -f azureuser@10.0.0.4 &
CHILD_PID=$!
parallel-ssh -o . -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}"\
-O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\
set -e; \
export PATH=/usr/local/mpi/bin:\$PATH; \
export LD_LIBRARY_PATH=/root/mscclpp/build:\$LD_LIBRARY_PATH; \
cd /root/mscclpp; \
./build/test/perf/fifo_test"'
kill $CHILD_PID
workingDirectory: '$(System.DefaultWorkingDirectory)'
- task: AzureCLI@2
name: StopVMSS
displayName: Deallocate VMSS

View File

@@ -69,7 +69,6 @@ else()
message(FATAL_ERROR "No compatible GPU found. Set MSCCLPP_USE_CUDA or MSCCLPP_USE_ROCM to ON.")
endif()
endif()
if(MSCCLPP_GPU_ARCHS)
string(STRIP "${MSCCLPP_GPU_ARCHS}" MSCCLPP_GPU_ARCHS)
string(REPLACE " " ";" MSCCLPP_GPU_ARCHS "${MSCCLPP_GPU_ARCHS}")

View File

@@ -67,9 +67,12 @@ MSCCL++ also provides efficient synchronization methods, `signal()`, `flush()`,
```cpp
// Only one thread is needed for this function.
__device__ void barrier() {
// Inform the peer GPU that I have arrived at this point.
// Inform the peer GPU that I have arrived at this point and
// all previous memory operations are done.
channel.signal();
// Flush the previous signal() call, which will wait for completion of signaling.
// One may call flush() to make sure all previous channel operations
// are complete from the local device's perspective.
// flush() is unnecessary in this example.
channel.flush();
// Wait for the peer GPU to call signal().
channel.wait();

View File

@@ -90,12 +90,6 @@ class Env {
/// Default is false.
const bool forceDisableNvls;
/// Env name: `MSCCLPP_FIFO_USE_TAIL_REPLICA`. If set to true, it will replicate the FIFO tail on the GPU memory,
/// which makes the GPU poll on the tail faster, but requires a periodic FIFO flush to update the replica on the GPU.
/// If set to false, the GPU will directly read the tail from the host memory, which is slower but does not require
/// periodic flushes. Default is true.
const bool fifoUseTailReplica;
private:
Env();

View File

@@ -29,10 +29,6 @@ class Fifo {
/// Remove the head trigger.
void pop();
/// Flushes the tail of the FIFO.
/// @param sync If true, waits for the flush to complete before returning.
void flushTail(bool sync = false);
/// Get FIFO size.
/// @return Number of entries in the FIFO.
int size() const;
@@ -41,6 +37,9 @@ class Fifo {
/// @return FifoDeviceHandle for device access.
FifoDeviceHandle deviceHandle() const;
[[deprecated("flushTail() is now no-op and no longer needed. This will be removed in a future release.")]] void
flushTail([[maybe_unused]] bool sync = false) {}
private:
struct Impl;
std::unique_ptr<Impl> pimpl_;

View File

@@ -15,37 +15,17 @@
namespace mscclpp {
#if defined(MSCCLPP_DEVICE_COMPILE)
MSCCLPP_DEVICE_INLINE uint64_t hostLoadRelaxed(uint64_t* ptr) { return atomicLoad(ptr, memoryOrderRelaxed); }
#endif // defined(MSCCLPP_DEVICE_COMPILE)
/// Pair of 64-bit unsigned integers used as a trigger for the proxy.
///
/// This struct is used as a work element in the concurrent FIFO where multiple device threads can push
/// ProxyTrigger elements and a single host proxy thread consumes these work elements.
///
/// Do not use the most significant bit of snd as it is reserved for memory consistency purposes.
/// Used as a work element in the concurrent FIFO.
/// Most significant bit of snd is reserved.
struct alignas(16) ProxyTrigger {
uint64_t fst, snd;
};
/// A concurrent FIFO where multiple device threads (the number of threads should not exceed the FIFO size) can push
/// work elements and a single host proxy thread consumes them.
///
/// The FIFO has a head pointer allocated on the device which starts at 0 and goes up to 2^64-1, which is almost
/// infinity. If `env()->fifoUseTailReplica` is true, there are two copies of the tail, one on the device,
/// FifoDeviceHandle::tailReplica, and another on the host, namely, hostTail.
/// The host always has the "true" tail and occasionally pushes it to the copy on the device.
/// Therefore, most of the time, the device has a stale version. The invariants are: tailReplica <= hostTail <= head.
/// The push() function increments head, hostTail is updated in Fifo::pop(), and it occasionally flushes
/// it to tailReplica via Fifo::flushTail().
///
/// If `env()->fifoUseTailReplica` is false, FifoDeviceHandle::tailReplica points to the original tail on the host.
/// In this case, the tail is always up-to-date and there is no need to flush it to the device.
///
/// Duplicating the tail is a good idea because the FIFO is large enough, and we do not need frequent updates for the
/// tail as there is usually enough space for device threads to push their work into.
///
/// Concurrent FIFO where multiple device threads (the number of threads should not exceed the FIFO size) to push
/// Head pointer is on device, tail pointer is on host (readable by device).
/// The FIFOs capacity is limited only by MAX_UINT64—effectively infinite for practical use. Exceeding this limit will
/// overflow the counter and lead to undefined behavior.
struct FifoDeviceHandle {
#if defined(MSCCLPP_DEVICE_COMPILE)
/// Push a trigger to the FIFO.
@@ -59,14 +39,9 @@ struct FifoDeviceHandle {
constexpr uint64_t flipMask = uint64_t{1} << uint64_t{63};
trigger.snd ^= flipMask;
// Only one of two conditions need to be met to proceed. Either the tail has advanced enough or where we need to
// write to is 0. However, the first condition is faster to check since the tail is flushed periodically anyways but
// for the second condition we need to read CPU memory.
// As atomic access is slow, we first check using the bare pointer and then use the atomic load if the
// condition is not met.
if (prevHead >= size + *tailReplica) {
OR_POLL_MAYBE_JAILBREAK((prevHead >= size + atomicLoad(tailReplica, memoryOrderRelaxed)),
(hostLoadRelaxed(&(triggers[prevHead % size].fst)) != 0), maxSpinCount);
// Wait until the trigger is freed by the host.
if (prevHead >= size + *tailCache) {
sync(prevHead - size, maxSpinCount);
}
ProxyTrigger* triggerPtr = &(triggers[prevHead % size]);
@@ -92,10 +67,12 @@ struct FifoDeviceHandle {
/// @param fifoHead FIFO head where the trigger was pushed.
/// @param maxSpinCount Max spin count before assert. Never assert if negative.
MSCCLPP_DEVICE_INLINE void sync(uint64_t fifoHead, [[maybe_unused]] int64_t maxSpinCount = 1000000) {
// Same as push but in this case checking the first condition is probably faster since for tail to be pushed we need
// to wait for cudaMemcpy to be done.
OR_POLL_MAYBE_JAILBREAK((fifoHead >= atomicLoad(tailReplica, memoryOrderRelaxed)),
(hostLoadRelaxed(&(triggers[fifoHead % size].fst)) != 0), maxSpinCount);
uint64_t val;
POLL_MAYBE_JAILBREAK((fifoHead >= (val = atomicLoad(tail, memoryOrderAcquire))), maxSpinCount);
// If multiple threads sync in parallel, this may write a stale value to tailCache.
// This is fine, as the tailCache is for avoiding unnecessary syncs from the push(),
// which can work as long as the tailCache is not stale by the length of the FIFO.
*tailCache = val;
}
#endif // defined(MSCCLPP_DEVICE_COMPILE)
@@ -103,8 +80,10 @@ struct FifoDeviceHandle {
ProxyTrigger* triggers;
/// FIFO head on device.
uint64_t* head;
/// FIFO tail replica on device.
uint64_t* tailReplica;
/// FIFO tail on host.
uint64_t* tail;
/// Cached tail value.
uint64_t* tailCache;
/// FIFO size.
int size;
};

View File

@@ -15,8 +15,6 @@ namespace mscclpp {
enum class ProxyHandlerResult {
/// Move to next trigger in FIFO.
Continue,
/// Flush the FIFO and move to next trigger.
FlushFifoTailAndContinue,
/// Stop and exit proxy.
Stop,
};

View File

@@ -13,7 +13,7 @@ void register_fifo(nb::module_& m) {
nb::class_<FifoDeviceHandle>(m, "FifoDeviceHandle")
.def_rw("triggers", &FifoDeviceHandle::triggers)
.def_rw("tail_replica", &FifoDeviceHandle::tailReplica)
.def_rw("tail", &FifoDeviceHandle::tail)
.def_rw("head", &FifoDeviceHandle::head)
.def_rw("size", &FifoDeviceHandle::size)
.def_prop_ro("raw", [](const FifoDeviceHandle& self) -> nb::bytes {
@@ -24,7 +24,6 @@ void register_fifo(nb::module_& m) {
.def(nb::init<int>(), nb::arg("size") = DEFAULT_FIFO_SIZE)
.def("poll", &Fifo::poll)
.def("pop", &Fifo::pop)
.def("flush_tail", &Fifo::flushTail, nb::arg("sync") = false)
.def("size", &Fifo::size)
.def("device_handle", &Fifo::deviceHandle);
}

View File

@@ -51,7 +51,7 @@ class MyProxyService {
semaphores_[nghr]->signal();
connections_[nghr]->flush();
}
return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue;
return mscclpp::ProxyHandlerResult::Continue;
}
void start() { proxy_.start(); }

View File

@@ -66,8 +66,7 @@ Env::Env()
forceNcclFallbackOperation(readEnv<std::string>("MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION", "")),
enableNcclFallback(readEnv<bool>("MSCCLPP_ENABLE_NCCL_FALLBACK", false)),
disableChannelCache(readEnv<bool>("MSCCLPP_DISABLE_CHANNEL_CACHE", false)),
forceDisableNvls(readEnv<bool>("MSCCLPP_FORCE_DISABLE_NVLS", false)),
fifoUseTailReplica(readEnv<bool>("MSCCLPP_FIFO_USE_TAIL_REPLICA", true)) {}
forceDisableNvls(readEnv<bool>("MSCCLPP_FORCE_DISABLE_NVLS", false)) {}
std::shared_ptr<Env> env() {
static std::shared_ptr<Env> globalEnv = std::shared_ptr<Env>(new Env());
@@ -91,7 +90,6 @@ std::shared_ptr<Env> env() {
logEnv("MSCCLPP_ENABLE_NCCL_FALLBACK", globalEnv->enableNcclFallback);
logEnv("MSCCLPP_DISABLE_CHANNEL_CACHE", globalEnv->disableChannelCache);
logEnv("MSCCLPP_FORCE_DISABLE_NVLS", globalEnv->forceDisableNvls);
logEnv("MSCCLPP_FIFO_USE_TAIL_REPLICA", globalEnv->fifoUseTailReplica);
}
return globalEnv;
}

View File

@@ -14,23 +14,16 @@ namespace mscclpp {
struct Fifo::Impl {
detail::UniqueGpuHostPtr<ProxyTrigger> triggers;
detail::UniqueGpuPtr<uint64_t> head;
std::shared_ptr<uint64_t> tailHost;
detail::UniqueGpuPtr<uint64_t> tailReplica;
detail::UniqueGpuHostPtr<uint64_t> tail;
detail::UniqueGpuPtr<uint64_t> tailCache;
const int size;
// for transferring fifo tail
CudaStreamWithFlags stream;
Impl(int size)
: triggers(detail::gpuCallocHostUnique<ProxyTrigger>(size)),
head(detail::gpuCallocUnique<uint64_t>()),
tailHost(env()->fifoUseTailReplica ? std::make_shared<uint64_t>(0) : detail::gpuCallocHostShared<uint64_t>()),
tailReplica(env()->fifoUseTailReplica ? detail::gpuCallocUnique<uint64_t>() : nullptr),
size(size) {
if (env()->fifoUseTailReplica) {
stream.set(cudaStreamNonBlocking);
}
}
tail(detail::gpuCallocHostUnique<uint64_t>()),
tailCache(detail::gpuCallocUnique<uint64_t>()),
size(size) {}
};
MSCCLPP_API_CPP Fifo::Fifo(int size) {
@@ -47,7 +40,7 @@ MSCCLPP_API_CPP Fifo::~Fifo() = default;
MSCCLPP_API_CPP ProxyTrigger Fifo::poll() {
ProxyTrigger trigger;
ProxyTrigger* ptr = &pimpl_->triggers.get()[*(pimpl_->tailHost) % pimpl_->size];
ProxyTrigger* ptr = &pimpl_->triggers.get()[*(pimpl_->tail) % pimpl_->size];
// we are loading fst first. if fst is non-zero then snd is also valid
trigger.fst = atomicLoad(&(ptr->fst), memoryOrderAcquire);
trigger.snd = ptr->snd;
@@ -55,23 +48,9 @@ MSCCLPP_API_CPP ProxyTrigger Fifo::poll() {
}
MSCCLPP_API_CPP void Fifo::pop() {
uint64_t curTail = *(pimpl_->tailHost);
uint64_t curTail = *(pimpl_->tail);
pimpl_->triggers.get()[curTail % pimpl_->size].fst = 0;
*(pimpl_->tailHost) = curTail + 1;
}
MSCCLPP_API_CPP void Fifo::flushTail([[maybe_unused]] bool sync) {
if (!env()->fifoUseTailReplica) {
// Nothing to flush if the tail is not replicated.
return;
}
// Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure that the fifo can
// make progress even if there is no request mscclppSync. However, mscclppSync type is for flush request.
MSCCLPP_CUDATHROW(cudaMemcpyAsync(pimpl_->tailReplica.get(), pimpl_->tailHost.get(), sizeof(uint64_t),
cudaMemcpyHostToDevice, pimpl_->stream));
if (sync) {
MSCCLPP_CUDATHROW(cudaStreamSynchronize(pimpl_->stream));
}
atomicStore(pimpl_->tail.get(), curTail + 1, memoryOrderRelease);
}
MSCCLPP_API_CPP int Fifo::size() const { return pimpl_->size; }
@@ -80,8 +59,8 @@ MSCCLPP_API_CPP FifoDeviceHandle Fifo::deviceHandle() const {
FifoDeviceHandle deviceHandle;
deviceHandle.triggers = pimpl_->triggers.get();
deviceHandle.head = pimpl_->head.get();
// tailReplica refers to the original tail if `fifoUseTailReplica == false`.
deviceHandle.tailReplica = env()->fifoUseTailReplica ? pimpl_->tailReplica.get() : pimpl_->tailHost.get();
deviceHandle.tail = pimpl_->tail.get();
deviceHandle.tailCache = pimpl_->tailCache.get();
deviceHandle.size = pimpl_->size;
return deviceHandle;
}

View File

@@ -69,7 +69,6 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) {
ChannelTrigger* trigger = reinterpret_cast<ChannelTrigger*>(&triggerRaw);
std::shared_ptr<Host2DeviceSemaphore> semaphore = semaphores_[trigger->fields.semaphoreId];
auto result = ProxyHandlerResult::Continue;
int maxWriteQueueSize = semaphore->connection()->getMaxWriteQueueSize();
auto& numRequests = inflightRequests_[semaphore->connection()];
@@ -89,11 +88,10 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) {
if (((trigger->fields.type & TriggerSync) && numRequests > 0) ||
(maxWriteQueueSize != -1 && numRequests > maxWriteQueueSize)) {
semaphore->connection()->flush();
result = ProxyHandlerResult::FlushFifoTailAndContinue;
numRequests = 0;
}
return result;
return ProxyHandlerResult::Continue;
}
MSCCLPP_API_CPP BasePortChannel::DeviceHandle BasePortChannel::deviceHandle() const {

View File

@@ -15,10 +15,6 @@ namespace mscclpp {
constexpr int ProxyStopCheckPeriod = 1000;
// Unless explicitly requested, a flush of the tail to device memory is triggered for every ProxyFlushPeriod.
// As long as the FIFO size is large enough, having a stale tail is not a problem.
constexpr int ProxyFlushPeriod = 4;
struct Proxy::Impl {
ProxyHandler handler;
std::function<void()> threadInit;
@@ -67,10 +63,7 @@ MSCCLPP_API_CPP void Proxy::start() {
std::atomic_bool& running = this->pimpl_->running;
ProxyTrigger trigger;
int flushPeriod = std::min(fifo->size(), ProxyFlushPeriod);
int runCnt = ProxyStopCheckPeriod;
uint64_t flushCnt = 0;
for (;;) {
if (runCnt-- == 0) {
runCnt = ProxyStopCheckPeriod;
@@ -89,19 +82,11 @@ MSCCLPP_API_CPP void Proxy::start() {
// Send completion: reset only the high 64 bits
fifo->pop();
// Flush the tail to device memory. This is either triggered every flushPeriod to make sure that the fifo can make
// progress even if there is no request mscclppSync. However, mscclppSync type is for flush request.
if ((++flushCnt % flushPeriod) == 0 || result == ProxyHandlerResult::FlushFifoTailAndContinue) {
fifo->flushTail();
}
if (result == ProxyHandlerResult::Stop) {
break;
}
}
// make sure the tail is flushed before we shut the proxy
fifo->flushTail(/*sync=*/true);
});
}

View File

@@ -158,7 +158,7 @@ class MyProxyService {
}
flusher++;
}
return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue;
return mscclpp::ProxyHandlerResult::Continue;
}
void start() { proxy_.start(); }

View File

@@ -1,6 +1,6 @@
{"name":"allgather", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":291.52, "busBw":255.08, "size":1073741824, "time":3683.13, "target":"throughput"}
{"name":"allgather", "kernel":2, "ranks":16,"ranksPerNode":8, "algBw":244.61, "busBw":229.33, "size":3221225472, "time":13168.31,"target":"throughput"}
{"name":"allgather", "kernel":3, "ranks":8, "ranksPerNode":8, "algBw":0.1112, "busBw":0.0973, "size":8192, "time":73.63, "target":"latency"}
{"name":"allgather", "kernel":3, "ranks":8, "ranksPerNode":8, "algBw":0.1012, "busBw":0.090, "size":8192, "time":83.41, "target":"latency"}
{"name":"allreduce", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":139.41, "busBw":243.96, "size":1073741824, "time":7701.98, "target":"throughput"}
{"name":"allreduce", "kernel":2, "ranks":8, "ranksPerNode":8, "algBw":1.25, "busBw":2.19, "size":8192, "time":6.51, "target":"latency"}
{"name":"allreduce", "kernel":2, "ranks":16,"ranksPerNode":8, "algBw":0.44, "busBw":0.83, "size":8192, "time":18.42, "target":"latency"}

View File

@@ -569,7 +569,7 @@ mscclpp::ProxyHandlerResult AllGatherProxyService::handleTrigger(mscclpp::ProxyT
conn->flush();
}
}
return mscclpp::ProxyHandlerResult::FlushFifoTailAndContinue;
return mscclpp::ProxyHandlerResult::Continue;
}
class AllGatherTestColl : public BaseTestColl {

View File

@@ -27,9 +27,11 @@ __constant__ mscclpp::FifoDeviceHandle gFifoDeviceHandle;
__global__ void kernelFifoPush(size_t numTriggers) {
mscclpp::FifoDeviceHandle& fifo = gFifoDeviceHandle;
int tid = threadIdx.x + blockIdx.x * blockDim.x;
mscclpp::ProxyTrigger trigger;
for (size_t i = 1; i <= numTriggers; ++i) {
trigger.fst = i;
trigger.snd = tid ^ i;
fifo.push(trigger);
}
}
@@ -37,8 +39,10 @@ __global__ void kernelFifoPush(size_t numTriggers) {
__global__ void kernelFifoPushSync(size_t numTriggers) {
mscclpp::FifoDeviceHandle& fifo = gFifoDeviceHandle;
mscclpp::ProxyTrigger trigger;
int tid = threadIdx.x + blockIdx.x * blockDim.x;
for (size_t i = 1; i <= numTriggers; ++i) {
trigger.fst = i;
trigger.snd = tid ^ i;
fifo.sync(fifo.push(trigger));
}
}
@@ -50,8 +54,10 @@ static void setupCuda(int& cudaDevice, int& numaNode) {
}
// Helper function to consume triggers from FIFO
static bool consumeTriggers(std::unique_ptr<mscclpp::Fifo>& hostFifo, int numTriggers, int flushPeriod) {
for (int i = 0; i < numTriggers; ++i) {
static bool consumeTriggers(std::unique_ptr<mscclpp::Fifo>& hostFifo, int numTriggers, int parallel) {
int totalTriggers = numTriggers * parallel;
std::unordered_map<int, int> triggerCounts;
for (int i = 0; i < totalTriggers; ++i) {
mscclpp::ProxyTrigger trigger;
uint64_t spin = 0;
do {
@@ -63,12 +69,10 @@ static bool consumeTriggers(std::unique_ptr<mscclpp::Fifo>& hostFifo, int numTri
// Process trigger (see src/proxy.cc)
trigger.snd ^= ((uint64_t)1 << (uint64_t)63);
trigger.snd = trigger.snd ^ trigger.fst;
assert(triggerCounts[trigger.snd] + 1 == trigger.fst);
triggerCounts[trigger.snd]++;
hostFifo->pop();
// Flush periodically
if (((i + 1) % flushPeriod) == 0) {
hostFifo->flushTail();
}
}
return true;
}
@@ -76,7 +80,7 @@ static bool consumeTriggers(std::unique_ptr<mscclpp::Fifo>& hostFifo, int numTri
// Helper function to run a single kernel variant and return performance metrics
std::tuple<double, double, int, int> runSingleKernelVariant(void (*kernel)(size_t),
std::unique_ptr<mscclpp::Fifo>& hostFifo,
cudaStream_t stream, int flushPeriod, int numParallel) {
cudaStream_t stream, int numParallel) {
// Calculate triggers based on FIFO size
const int numTriggers = std::max(MIN_TRIGGERS, static_cast<int>(hostFifo->size() * TRIGGERS_PER_FIFO_SIZE));
const int warmupTriggers =
@@ -87,10 +91,9 @@ std::tuple<double, double, int, int> runSingleKernelVariant(void (*kernel)(size_
utils::CUDA_CHECK(cudaGetLastError());
// Process warmup triggers (note: total triggers = warmupTriggers * numParallel)
if (!consumeTriggers(hostFifo, warmupTriggers * numParallel, flushPeriod)) {
if (!consumeTriggers(hostFifo, warmupTriggers, numParallel)) {
return {0.0, 0.0, 0, 0}; // Return error values
}
hostFifo->flushTail();
utils::CUDA_CHECK(cudaStreamSynchronize(stream));
// Benchmark
@@ -101,10 +104,9 @@ std::tuple<double, double, int, int> runSingleKernelVariant(void (*kernel)(size_
utils::CUDA_CHECK(cudaGetLastError());
// Process all triggers
if (!consumeTriggers(hostFifo, numTriggers * numParallel, flushPeriod)) {
if (!consumeTriggers(hostFifo, numTriggers, numParallel)) {
return {0.0, 0.0, 0, 0};
}
hostFifo->flushTail(true);
utils::CUDA_CHECK(cudaStreamSynchronize(stream));
timer.stop();
@@ -118,13 +120,13 @@ std::tuple<double, double, int, int> runSingleKernelVariant(void (*kernel)(size_
return {throughput, duration_us, totalTriggers, warmupTriggers * numParallel};
}
void runFifoTestVariant(std::unique_ptr<mscclpp::Fifo>& hostFifo, cudaStream_t stream, int numParallel, int flushPeriod,
void runFifoTestVariant(std::unique_ptr<mscclpp::Fifo>& hostFifo, cudaStream_t stream, int numParallel,
nlohmann::ordered_json& combinedMetrics) {
auto [pushThroughput, pushDuration, numTriggers, warmupTriggers] =
runSingleKernelVariant(kernelFifoPush, hostFifo, stream, flushPeriod, numParallel);
runSingleKernelVariant(kernelFifoPush, hostFifo, stream, numParallel);
auto [syncThroughput, syncDuration, syncNumTriggers, syncWarmupTriggers] =
runSingleKernelVariant(kernelFifoPushSync, hostFifo, stream, flushPeriod, numParallel);
runSingleKernelVariant(kernelFifoPushSync, hostFifo, stream, numParallel);
auto formatThroughput = [](double thru) {
return double(int(thru * 10)) / 10.0; // Round to 1 decimal place
@@ -141,21 +143,17 @@ void runFifoTestVariant(std::unique_ptr<mscclpp::Fifo>& hostFifo, cudaStream_t s
struct FifoTestConfig {
int fifoSize;
int flushPeriod;
std::vector<int> parallelismLevels;
// Constructor with default parallelism levels
FifoTestConfig(int size, int flush, const std::vector<int>& parallel = {1, 2, 4, 8, 16})
: fifoSize(size), flushPeriod(flush), parallelismLevels(parallel) {}
FifoTestConfig(int size, const std::vector<int>& parallel = {1, 2, 4, 8, 16})
: fifoSize(size), parallelismLevels(parallel) {}
};
void runFifoTest(const FifoTestConfig& config, [[maybe_unused]] int rank, [[maybe_unused]] int worldSize,
[[maybe_unused]] int localRank) {
if (config.fifoSize <= 0 || config.flushPeriod <= 0) {
throw std::invalid_argument("FIFO size and flush period must be positive");
}
if (config.flushPeriod > config.fifoSize) {
throw std::invalid_argument("Flush period cannot be larger than FIFO size");
if (config.fifoSize <= 0) {
throw std::invalid_argument("FIFO size must be positive");
}
if (config.parallelismLevels.empty()) {
throw std::invalid_argument("At least one parallelism level must be specified");
@@ -173,8 +171,7 @@ void runFifoTest(const FifoTestConfig& config, [[maybe_unused]] int rank, [[mayb
utils::CUDA_CHECK(cudaStreamCreate(&stream));
// Create test name with parallelism range
std::string testName =
"FifoTest_Size" + std::to_string(config.fifoSize) + "_Flush" + std::to_string(config.flushPeriod) + "_Parallel";
std::string testName = "FifoTest_Size" + std::to_string(config.fifoSize) + "_Parallel";
// Add parallelism range to test name (e.g., "P1-16" or "P1-4-16-64")
if (!config.parallelismLevels.empty()) {
@@ -185,8 +182,7 @@ void runFifoTest(const FifoTestConfig& config, [[maybe_unused]] int rank, [[mayb
// If parallelism levels have non-standard steps, include more detail
if (config.parallelismLevels.size() > 2 &&
(config.parallelismLevels[1] != 2 * config.parallelismLevels[0] || config.parallelismLevels.size() > 3)) {
testName = "FifoTest_Size" + std::to_string(config.fifoSize) + "_Flush" + std::to_string(config.flushPeriod) +
"_ParallelCustom";
testName = "FifoTest_Size" + std::to_string(config.fifoSize) + "_ParallelCustom";
}
}
}
@@ -194,8 +190,7 @@ void runFifoTest(const FifoTestConfig& config, [[maybe_unused]] int rank, [[mayb
// Print test configuration
if (utils::isMainRank()) {
std::stringstream ss;
ss << "Running FIFO test with size=" << config.fifoSize << ", flush_period=" << config.flushPeriod
<< ", parallelism_levels=[";
ss << "Running FIFO test with size=" << config.fifoSize << ", parallelism_levels=[";
for (size_t i = 0; i < config.parallelismLevels.size(); ++i) {
if (i > 0) ss << ",";
ss << config.parallelismLevels[i];
@@ -207,11 +202,10 @@ void runFifoTest(const FifoTestConfig& config, [[maybe_unused]] int rank, [[mayb
nlohmann::ordered_json combinedMetrics;
for (int numParallel : config.parallelismLevels) {
runFifoTestVariant(hostFifo, stream, numParallel, config.flushPeriod, combinedMetrics);
runFifoTestVariant(hostFifo, stream, numParallel, combinedMetrics);
}
std::map<std::string, std::string> testParams;
testParams["flush_period"] = std::to_string(config.flushPeriod);
testParams["fifo_size"] = std::to_string(static_cast<int>(hostFifo->size()));
// Add parallelism levels to test parameters
@@ -230,12 +224,9 @@ void runFifoTest(const FifoTestConfig& config, [[maybe_unused]] int rank, [[mayb
void runAllFifoTests([[maybe_unused]] int rank, [[maybe_unused]] int worldSize, [[maybe_unused]] int localRank) {
// clang-format off
std::vector<FifoTestConfig> configs = {
{1, 1, {1, 8, 64, 512}},
{128, 4, {1, 8, 64, 512}},
{128, 128, {1, 8, 64, 512}},
{512, 4, {1, 8, 64, 512}},
{512, 128, {1, 8, 64, 512}},
{512, 512, {1, 8, 64, 512}},
{1, {1}},
{128, {1, 8, 64, 128}},
{512, {1, 8, 64, 256, 512}},
};
// clang-format on

View File

@@ -50,7 +50,6 @@ TEST(FifoTest, Fifo) {
trigger.snd = 0;
uint64_t spin = 0;
uint64_t flushCnt = 0;
mscclpp::Timer timer(3);
for (uint64_t i = 0; i < ITER; ++i) {
trigger = hostFifo.poll();
@@ -66,12 +65,8 @@ TEST(FifoTest, Fifo) {
ASSERT_TRUE(trigger.fst == (i + 1));
ASSERT_TRUE(trigger.snd == (i + 1));
hostFifo.pop();
if ((++flushCnt % hostFifo.size()) == 0) {
hostFifo.flushTail();
}
spin = 0;
}
hostFifo.flushTail(true);
std::stringstream ss;
ss << "FifoTest.Fifo: " << (float)timer.elapsed() / ITER << " us/iter\n";