From 76718e401522412e1a8f1643b2fade1273935e22 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Tue, 13 Jun 2023 12:38:06 +0800 Subject: [PATCH] Saemal/atomic signal (#96) * code complelete * fix correctness issue * Fix correctness issuee * fix lint * ass compile * Fix build issue * Fix runtime error * Fix correctness issue * Fix crash issue * minor change * Fix memory leak * Fix review comments * Finish allgather * address comments * load element to register first then store to remote address * Finish allGather * init * Build connections * allreduce_test works * Bug fix * Add CUDA flags * Add packet copy (LL) * Lint * Set tmpPtr from constructors * Lint * Multiple blocks per peer * Beautify * Temporal ring reduce * Ring reduce works correctly * Overlapping * Fix overlapping * Improve vector sum * figuring out how to use atomics * working now * wip * Enhance LL AllReduce * Support multiple blocks per peer * Fix a ring reduce bug * Fix a AllReduce kernel 2 bug * Bug fix * wip * Make it compilable * Lint * Lint * Minor changes * Unit test to reproduce memory consistency bugs * Unit test bug fixes * Fixes * Typo * wip * done with core * wip * wip * compiles * only the atomic is failing * almost working * all tests pass now * clang-12 * More jailbreaks * bug fix for common.cu * adding stdint to concurrency.hpp * Out-of-place for AllReduce kernel 2 * Optimize `sync()` * Fix mp_unit_tests * Init TestEngine with TestArgs * Change common.cu into common.cc * Cleanup common.hpp * Lint * fixes to the mscclpp-tests * fixed common.cc --------- Co-authored-by: Binyang Li Co-authored-by: Saeed Maleki --- include/mscclpp/channel.hpp | 186 +++++------- include/mscclpp/concurrency.hpp | 10 +- include/mscclpp/core.hpp | 13 +- include/mscclpp/epoch.hpp | 100 ++++--- src/connection.cc | 43 ++- src/epoch.cc | 41 ++- src/ib.cc | 76 +++-- src/include/connection.hpp | 6 + src/include/ib.hpp | 17 +- test/allgather_test_host_offloading.cu | 1 - test/mp_unit_tests.cu | 330 +++++++++++++++++---- test/mscclpp-test/CMakeLists.txt | 2 +- test/mscclpp-test/allgather_test.cu | 9 +- test/mscclpp-test/allreduce_test.cu | 101 ++++--- test/mscclpp-test/alltoall_test.cu | 7 +- test/mscclpp-test/{common.cu => common.cc} | 167 +++++++---- test/mscclpp-test/common.hpp | 27 +- test/mscclpp-test/sendrecv_test.cu | 42 +-- 18 files changed, 803 insertions(+), 375 deletions(-) rename test/mscclpp-test/{common.cu => common.cc} (70%) diff --git a/include/mscclpp/channel.hpp b/include/mscclpp/channel.hpp index edc1fcfb..491a29bc 100644 --- a/include/mscclpp/channel.hpp +++ b/include/mscclpp/channel.hpp @@ -146,32 +146,10 @@ struct DeviceChannel { put(dst, offset, src, offset, size); } - __forceinline__ __device__ void putDirect(void* dst, void* src, uint64_t dstOffset, uint64_t srcOffset, uint64_t size, - uint32_t threadId, uint32_t numThreads) { - // assume the memory is aligned to 8 bytes - uint64_t* srcAddr = (uint64_t*)((char*)src + srcOffset); - uint64_t* dstAddr = (uint64_t*)((char*)dst + dstOffset); - uint64_t ele; - size_t nElem = size % sizeof(uint64_t) ? (size + sizeof(uint64_t)) / sizeof(uint64_t) : size / sizeof(uint64_t); - for (size_t i = threadId; i < nElem; i += numThreads) { - // load to register first - ele = srcAddr[i]; - dstAddr[i] = ele; - } - } - - __forceinline__ __device__ void signalDirect() { epoch_.signalDirect(); } - - __forceinline__ __device__ void signalPacket() { epoch_.signalPacket(); } - - __forceinline__ __device__ void signal() { - epochIncrement(); - fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, channelId_).value); - } + __forceinline__ __device__ void signal() { fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, channelId_).value); } __forceinline__ __device__ void putWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, uint64_t size) { - epochIncrement(); fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, channelId_).value); } @@ -181,7 +159,6 @@ struct DeviceChannel { __forceinline__ __device__ void putWithSignalAndFlush(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, uint64_t size) { - epochIncrement(); uint64_t curFifoHead = fifo_.push( ChannelTrigger(TriggerData | TriggerFlag | TriggerSync, dst, dstOffset, src, srcOffset, size, channelId_) .value); @@ -199,47 +176,6 @@ struct DeviceChannel { __forceinline__ __device__ void wait() { epoch_.wait(); } - __forceinline__ __device__ void putPacket(void* dst, void* src, uint64_t dstOffset, uint64_t srcOffset, uint64_t size, - uint32_t threadId, uint32_t numThreads, uint32_t flag) { - // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes - uint32_t* srcBase = (uint32_t*)((char*)src + srcOffset); - ChannelPacket* dstBase = (ChannelPacket*)((char*)dst + dstOffset); - size_t nElem = size / sizeof(uint64_t); - for (size_t i = threadId; i < nElem; i += numThreads) { - ChannelPacket* pkt = &dstBase[i]; - pkt->write(srcBase[2 * i], srcBase[2 * i + 1], flag); - } - } - - __forceinline__ __device__ void putPacket(void* dst, void* src, uint64_t dstOffset, uint64_t srcOffset, uint64_t size, - uint32_t threadId, uint32_t numThreads) { - // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes - uint32_t* srcBase = (uint32_t*)((char*)src + srcOffset); - ChannelPacket* dstBase = (ChannelPacket*)((char*)dst + dstOffset); - size_t nElem = size / sizeof(uint64_t); - for (size_t i = threadId; i < nElem; i += numThreads) { - ChannelPacket* pkt = &dstBase[i]; - pkt->write(srcBase[2 * i], srcBase[2 * i + 1]); - } - } - - __forceinline__ __device__ void getPacket(void* dst, void* src, uint64_t dstOffset, uint64_t srcOffset, uint64_t size, - uint32_t threadId, uint32_t numThreads, uint32_t flag) { - // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes - ChannelPacket* srcBase = (ChannelPacket*)((char*)src + srcOffset); - uint2* dstBase = (uint2*)((char*)dst + dstOffset); - size_t nElem = size / sizeof(uint2); - for (size_t i = threadId; i < nElem; i += numThreads) { - ChannelPacket* pkt = &srcBase[i]; - dstBase[i] = pkt->read(flag); - // for future reuse - pkt->clear(); - } - } - - __forceinline__ __device__ void epochIncrement() { epoch_.epochIncrement(); } - - __forceinline__ __device__ uint64_t epochGetLocal() const { return epoch_.epochGetLocal(); } #endif // __CUDACC__ ChannelId channelId_; @@ -282,35 +218,21 @@ struct SimpleDeviceChannel { SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src) : devChan_(devChan), dst_(dst), src_(src) {} - SimpleDeviceChannel(DeviceChannel devChan, void* dstPtr, void* srcPtr, void* tmpPtr = nullptr) - : devChan_(devChan), dstPtr_(dstPtr), srcPtr_(srcPtr), tmpPtr_(tmpPtr) {} - - SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src, void* dstPtr, void* srcPtr, - void* tmpPtr = nullptr) - : devChan_(devChan), dst_(dst), src_(src), dstPtr_(dstPtr), srcPtr_(srcPtr), tmpPtr_(tmpPtr) {} + SimpleDeviceChannel(DeviceChannel devChan) : devChan_(devChan) {} SimpleDeviceChannel(const SimpleDeviceChannel& other) = default; SimpleDeviceChannel& operator=(SimpleDeviceChannel& other) = default; #ifdef __CUDACC__ - __forceinline__ __device__ void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { devChan_.put(dst_, dstOffset, src_, srcOffset, size); } __forceinline__ __device__ void put(uint64_t offset, uint64_t size) { put(offset, offset, size); } - __forceinline__ __device__ void putDirect(uint64_t offset, uint64_t size, uint32_t threadId, uint32_t numThreads) { - devChan_.putDirect(dstPtr_, srcPtr_, offset, offset, size, threadId, numThreads); - } - __forceinline__ __device__ void signal() { devChan_.signal(); } - __forceinline__ __device__ void signalDirect() { devChan_.signalDirect(); } - - __forceinline__ __device__ void signalPacket() { devChan_.signalPacket(); } - __forceinline__ __device__ void putWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { devChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); } @@ -329,37 +251,93 @@ struct SimpleDeviceChannel { __forceinline__ __device__ void wait() { devChan_.wait(); } - __forceinline__ __device__ void putPacket(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, - uint32_t numThreads, uint32_t flag) { - devChan_.putPacket(dstPtr_, srcPtr_, dstOffset, srcOffset, size, threadId, numThreads, flag); - } - - __forceinline__ __device__ void putPacket(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, - uint32_t numThreads) { - devChan_.putPacket(dstPtr_, srcPtr_, dstOffset, srcOffset, size, threadId, numThreads); - } - - __forceinline__ __device__ void getPacket(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, - uint32_t numThreads, uint32_t flag) { - devChan_.getPacket(srcPtr_, tmpPtr_, dstOffset, srcOffset, size, threadId, numThreads, flag); - } - - __forceinline__ __device__ void epochIncrement() { devChan_.epochIncrement(); } - - __forceinline__ __device__ uint64_t epochGetLocal() const { return devChan_.epochGetLocal(); } - #endif // __CUDACC__ DeviceChannel devChan_; MemoryId dst_; MemoryId src_; +}; - // these are used for direct copy - void* dstPtr_; - void* srcPtr_; +// A direct version of DeviceChannel only for CudaIpc +struct DirectChannel { + public: + DirectChannel() = default; + DirectChannel(DirectEpoch::DeviceHandle epoch, RegisteredMemory dst, void* src, void* tmp = nullptr) + : epoch_(epoch), src_(src), tmp_(tmp) { + if (!dst.transports().has(Transport::CudaIpc)) { + throw Error("DirectChannel: dst must be registered with CudaIpc", ErrorCode::InvalidUsage); + } + dst_ = dst.data(); + }; - // extra local buffer for out-of-place copy - void* tmpPtr_; +#ifdef __CUDACC__ + __forceinline__ __device__ void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, + uint32_t numThreads) { + // assume the memory is aligned to 8 bytes + uint64_t* srcAddr = (uint64_t*)((char*)src_ + srcOffset); + uint64_t* dstAddr = (uint64_t*)((char*)dst_ + dstOffset); + uint64_t ele; + size_t nElem = size % sizeof(uint64_t) ? (size + sizeof(uint64_t)) / sizeof(uint64_t) : size / sizeof(uint64_t); + for (size_t i = threadId; i < nElem; i += numThreads) { + // load to register first + ele = srcAddr[i]; + dstAddr[i] = ele; + } + } + + __forceinline__ __device__ void putPacket(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, + uint32_t numThreads, uint32_t flag) { + // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes + uint32_t* srcBase = (uint32_t*)((char*)src_ + srcOffset); + ChannelPacket* dstBase = (ChannelPacket*)((char*)dst_ + dstOffset); + size_t nElem = size / sizeof(uint64_t); + for (size_t i = threadId; i < nElem; i += numThreads) { + ChannelPacket* pkt = &dstBase[i]; + pkt->write(srcBase[2 * i], srcBase[2 * i + 1], flag); + } + } + + __forceinline__ __device__ void putPacket(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, + uint32_t numThreads) { + // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes + uint32_t* srcBase = (uint32_t*)((char*)src_ + srcOffset); + ChannelPacket* dstBase = (ChannelPacket*)((char*)dst_ + dstOffset); + size_t nElem = size / sizeof(uint64_t); + for (size_t i = threadId; i < nElem; i += numThreads) { + ChannelPacket* pkt = &dstBase[i]; + pkt->write(srcBase[2 * i], srcBase[2 * i + 1]); + } + } + + __forceinline__ __device__ void getPacket(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, + uint32_t numThreads, uint32_t flag) { + // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes + ChannelPacket* tmpBase = (ChannelPacket*)((char*)tmp_ + srcOffset); + uint2* srcBase = (uint2*)((char*)src_ + dstOffset); + size_t nElem = size / sizeof(uint2); + for (size_t i = threadId; i < nElem; i += numThreads) { + ChannelPacket* pkt = &tmpBase[i]; + srcBase[i] = pkt->read(flag); + // for future reuse + pkt->clear(); + } + } + + __forceinline__ __device__ void signal() { epoch_.signal(); } + + __forceinline__ __device__ void signalPacket() { epoch_.signalPacket(); } + + __forceinline__ __device__ void epochIncrement() { epoch_.epochIncrement(); } + + __forceinline__ __device__ uint64_t epochGetLocal() const { return epoch_.epochGetLocal(); } + + __forceinline__ __device__ void wait() { epoch_.wait(); } +#endif // __CUDACC__ + private: + DirectEpoch::DeviceHandle epoch_; + void* src_; + void* dst_; + void* tmp_; }; } // namespace channel diff --git a/include/mscclpp/concurrency.hpp b/include/mscclpp/concurrency.hpp index 7d527aea..6035f70a 100644 --- a/include/mscclpp/concurrency.hpp +++ b/include/mscclpp/concurrency.hpp @@ -1,6 +1,10 @@ #ifndef MSCCLPP_CONCURRENCY_HPP_ #define MSCCLPP_CONCURRENCY_HPP_ +#include + +#include + namespace mscclpp { struct DeviceSyncer { public: @@ -21,14 +25,12 @@ struct DeviceSyncer { if (atomicAdd(&count_, 1) == maxOldCnt) { flag_ = 1; } - while (!flag_) { - } + POLL_MAYBE_JAILBREAK(!flag_, 1000000000); } else { if (atomicSub(&count_, 1) == 1) { flag_ = 0; } - while (flag_) { - } + POLL_MAYBE_JAILBREAK(flag_, 1000000000); } isAdd_ = tmpIsAdd; } diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index c7373c21..fae4606e 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -154,10 +154,8 @@ class Communicator; class Connection; class RegisteredMemory { + protected: struct Impl; - // A shared_ptr is used since RegisteredMemory is functionally immutable, although internally some state is populated - // lazily. - std::shared_ptr pimpl; public: RegisteredMemory() = default; @@ -173,7 +171,13 @@ class RegisteredMemory { static RegisteredMemory deserialize(const std::vector& data); friend class Connection; + friend class IBConnection; friend class Communicator; + + private: + // A shared_ptr is used since RegisteredMemory is functionally immutable, although internally some state is populated + // lazily. + std::shared_ptr pimpl; }; class Connection { @@ -181,6 +185,9 @@ class Connection { virtual void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) = 0; + // src must be a CPU memory + virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0; + virtual void flush() = 0; virtual int remoteRank() = 0; diff --git a/include/mscclpp/epoch.hpp b/include/mscclpp/epoch.hpp index b56d8df7..df5f1221 100644 --- a/include/mscclpp/epoch.hpp +++ b/include/mscclpp/epoch.hpp @@ -8,87 +8,107 @@ namespace mscclpp { -struct alignas(16) EpochIds { - uint64_t outbound; - uint64_t inboundReplica; -}; - template