From bb7b85a8105759cd6748f7f833c71a2e03675d07 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Fri, 7 Jul 2023 15:05:46 +0800 Subject: [PATCH] 2-node AllReduce improvements (#118) * Added `get()` interfaces to `SmChannel` * Improved 2-node (8 gpus/node) AllReduce: algbw 139GB/s for 1GB (kernel 3) and 99GB/s for 48MB (kernel 4) * Fixed a FIFO perf bug * Several fixes & validations in mscclpp-test --------- Co-authored-by: Binyang Li Co-authored-by: Saeed Maleki --- .azure-pipelines/integration-test.yml | 14 + .azure-pipelines/ut.yml | 11 + .github/workflows/lint.yml | 4 +- include/mscclpp/fifo.hpp | 11 +- include/mscclpp/sm_channel.hpp | 223 +++++++++- test/deploy/perf_ndmv4.jsonl | 22 +- test/deploy/run_tests.sh | 12 + test/deploy/setup.sh | 5 + test/mp_unit/sm_channel_tests.cu | 98 ++++- test/mscclpp-test/allgather_test.cu | 212 ++++++++-- test/mscclpp-test/allreduce_test.cu | 546 +++++++++++++++++++++++-- test/mscclpp-test/alltoall_test.cu | 10 +- test/mscclpp-test/check_perf_result.py | 41 +- test/mscclpp-test/common.cc | 71 +++- test/mscclpp-test/common.hpp | 14 + test/mscclpp-test/sendrecv_test.cu | 10 +- 16 files changed, 1171 insertions(+), 133 deletions(-) diff --git a/.azure-pipelines/integration-test.yml b/.azure-pipelines/integration-test.yml index b291d5b0..1b29c932 100644 --- a/.azure-pipelines/integration-test.yml +++ b/.azure-pipelines/integration-test.yml @@ -27,6 +27,18 @@ steps: make -j workingDirectory: '$(System.DefaultWorkingDirectory)' +- task: Bash@3 + name: LockGPUClock + displayName: Lock GPU clock frequency + inputs: + targetType: 'inline' + script: | + sudo nvidia-smi -pm 1 + for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do + sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i + done + workingDirectory: '$(System.DefaultWorkingDirectory)' + - task: Bash@3 name: AllGatherTest displayName: Run mscclpp AllGather test @@ -63,6 +75,8 @@ steps: mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -o output.jsonl mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 1 -o output.jsonl mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 2 -o output.jsonl + mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 3 -o output.jsonl + mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 4 -o output.jsonl workingDirectory: '$(System.DefaultWorkingDirectory)' - task: Bash@3 diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index cc110b92..76e31456 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -30,6 +30,17 @@ jobs: make -j workingDirectory: '$(System.DefaultWorkingDirectory)' + - task: Bash@3 + name: LockGPUClock + displayName: Lock GPU clock frequency + inputs: + targetType: 'inline' + script: | + sudo nvidia-smi -pm 1 + for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do + sudo nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i + done + workingDirectory: '$(System.DefaultWorkingDirectory)' - task: Bash@3 name: UnitTests diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 7f566b53..ffd8ea3a 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -14,7 +14,9 @@ jobs: uses: actions/checkout@v3 - name: Install ClangFormat - run: sudo apt-get install -y clang-format + run: | + sudo apt-get update + sudo apt-get install -y clang-format - name: Run cpplint run: | diff --git a/include/mscclpp/fifo.hpp b/include/mscclpp/fifo.hpp index 744927c0..d9aa7711 100644 --- a/include/mscclpp/fifo.hpp +++ b/include/mscclpp/fifo.hpp @@ -35,11 +35,16 @@ struct DeviceProxyFifo { __forceinline__ __device__ uint64_t push(ProxyTrigger trigger) { uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1); - // only one of these two polls need to be met to proceed. Either the tail has advanced enough or where we need to + // Only one of two conditions need to be met to proceed. Either the tail has advanced enough or where we need to // write to is 0. However, the first condition is faster to check since the tail is flushed periodically anyways but // for the second condition we need to read CPU memory. - OR_POLL_MAYBE_JAILBREAK(curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica), - *(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0, 1000000); + // As volatile access is slow, we first check using the bare pointer and then use the volatile pointer if the + // condition is not met. + if (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *(this->tailReplica)) { + OR_POLL_MAYBE_JAILBREAK(curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica), + *(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0, + 1000000); + } ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]); asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd)); diff --git a/include/mscclpp/sm_channel.hpp b/include/mscclpp/sm_channel.hpp index 7214f415..e9776c6e 100644 --- a/include/mscclpp/sm_channel.hpp +++ b/include/mscclpp/sm_channel.hpp @@ -7,29 +7,223 @@ #include #include #include +#include namespace mscclpp { -// A direct version of DeviceChannel only for CudaIpc +/// Channel for accessing peer memory directly from SM. struct SmChannel { + private: + SmDevice2DeviceSemaphore::DeviceHandle semaphore_; + void* src_; + void* dst_; + void* getPacketBuffer_; + +#ifdef __CUDACC__ + /// Helper for aligned data type access. + /// @tparam T The data type. + template + struct Element { + static constexpr bool is4B = (sizeof(T) == 4); + static constexpr bool is8B = (sizeof(T) == 8); + static constexpr bool is4Bx2 = + (std::is_same::value || std::is_same::value || std::is_same::value); + static constexpr bool is4Bx4 = + (std::is_same::value || std::is_same::value || std::is_same::value); + static constexpr bool is8Bx2 = + (std::is_same::value || std::is_same::value || std::is_same::value); + // Note: we do not support long2 and ulong2 as their size may differ on different platforms. + static constexpr bool isValid = (is4B || is8B || is4Bx2 || is4Bx4 || is8Bx2); + + /// Load an element from DRAM. + /// + /// This is a warpper of ld.volatile.global.* PTX instruction. Address alignment is not this function's + /// responsibility. + /// + /// @param v The value to be loaded. + /// @param p The address of the value to be loaded. + /// + static __forceinline__ __device__ void load(T& v, const T* p) { + if constexpr (is4B) { + asm volatile("ld.volatile.global.u32 %0, [%1];" : "=r"(v) : "l"(p) : "memory"); + } else if constexpr (is8B) { + asm volatile("ld.volatile.global.u64 %0, [%1];" : "=l"(v) : "l"(p) : "memory"); + } else if constexpr (is4Bx2) { + asm volatile("ld.volatile.global.v2.u32 {%0,%1}, [%2];" : "=r"(v.x), "=r"(v.y) : "l"(p) : "memory"); + } else if constexpr (is4Bx4) { + asm volatile("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" + : "=r"(v.w), "=r"(v.x), "=r"(v.y), "=r"(v.z) + : "l"(p) + : "memory"); + } else if constexpr (is8Bx2) { + asm volatile("ld.volatile.global.v2.u64 {%0,%1}, [%2];" : "=l"(v.x), "=l"(v.y) : "l"(p) : "memory"); + } + static_assert(isValid, "Unsupported type T"); + } + + /// Write an element on DRAM. + /// + /// This is a wrapper of st.volatile.global.* PTX instruction. Address alignment is not this function's + /// responsibility. + /// + /// @param p The address of the value to be written. + /// @param v The value to be written. + /// + static __forceinline__ __device__ void store(T* p, const T& v) { + if constexpr (is4B) { + asm volatile("st.volatile.global.u32 [%0], %1;" : : "l"(p), "r"(v) : "memory"); + } else if constexpr (is8B) { + asm volatile("st.volatile.global.u64 [%0], %1;" : : "l"(p), "l"(v) : "memory"); + } else if constexpr (is4Bx2) { + asm volatile("st.volatile.global.v2.u32 [%0], {%1,%2};" : : "l"(p), "r"(v.x), "r"(v.y) : "memory"); + } else if constexpr (is4Bx4) { + asm volatile("st.volatile.global.v4.u32 [%0], {%1,%2,%3,%4};" + : + : "l"(p), "r"(v.w), "r"(v.x), "r"(v.y), "r"(v.z) + : "memory"); + } else if constexpr (is8Bx2) { + asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" : : "l"(p), "l"(v.x), "l"(v.y) : "memory"); + } + static_assert(isValid, "Unsupported type T"); + } + + /// Copy aligned elements from the source memory to the destination memory. + /// + /// This function is intended to be collectively called by multiple threads. Each thread copies a part of elements. + /// + /// @param dst The destination address. + /// @param src The source address. + /// @param numElems The number of elements to be copied. + /// @param threadId The index of the current thread among all threads running this function. This is different from + /// the `threadIdx` in CUDA. + /// @param numThreads The total number of threads that run this function. + /// + static __forceinline__ __device__ void copy(T* dst, T* src, uint64_t numElems, uint32_t threadId, + uint32_t numThreads) { + T reg; + for (size_t i = threadId; i < numElems; i += numThreads) { + // Load to register first. + load(reg, src + i); + store(dst + i, reg); + } + } + }; +#endif // __CUDACC__ + public: SmChannel() = default; SmChannel(SmDevice2DeviceSemaphore::DeviceHandle semaphore, RegisteredMemory dst, void* src, void* getPacketBuffer = nullptr); #ifdef __CUDACC__ - __forceinline__ __device__ void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, - uint32_t numThreads) { - // assume the memory is aligned to 8 bytes - uint64_t* srcAddr = (uint64_t*)((char*)src_ + srcOffset); - uint64_t* dstAddr = (uint64_t*)((char*)dst_ + dstOffset); - uint64_t ele; - size_t nElem = size % sizeof(uint64_t) ? (size + sizeof(uint64_t)) / sizeof(uint64_t) : size / sizeof(uint64_t); - for (size_t i = threadId; i < nElem; i += numThreads) { - // load to register first - ele = srcAddr[i]; - dstAddr[i] = ele; + /// Load a value from the remote memory. + /// @tparam T The type of the value to be loaded. + /// @param index The index of the value to be loaded. The offset in bytes is calculated as index * sizeof(T). + /// @return The value loaded. + template + __forceinline__ __device__ T read(uint64_t index) { + T v; + Element::load(v, (T*)dst_ + index); + return v; + } + + /// Write a value to the remote memory. + /// @tparam T The type of the value to be written. + /// @param index The index of the value to be written. The offset in bytes is calculated as index * sizeof(T). + /// @param v The value to be written. + template + __forceinline__ __device__ void write(uint64_t index, const T& v) { + Element::store((T*)dst_ + index, v); + } + + /// Copy aligned data from the source memory to the destination memory. + /// + /// This function is a warpper of @ref Element::copy(). Unlike @ref Element::copy(), this function can copy + /// remainder bytes when @p CopyRemainder is true. Still, the copying bytes must be a multiple of 4. + /// + /// @tparam Alignment The alignment of the source and destination addresses. Should be 4, 8, or a multiple of 16. + /// @tparam CopyRemainder Whether to copy remainder bytes when the number of bytes is not a multiple of @p Alignment. + /// @param dst The destination address. Should be aligned to @p Alignment in the same way as @p src. + /// @param src The source address. Should be aligned to @p Alignment in the same way as @p dst. + /// @param bytes Bytes of the data to be copied. Should be a multiple of @p Alignment. + /// @param threadId The index of the current thread among all threads running this function. This is different from + /// the `threadIdx` in CUDA. + /// @param numThreads The total number of threads that run this function. + /// + template + __forceinline__ __device__ void copy(void* dst, void* src, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { + static_assert(Alignment == 4 || Alignment == 8 || Alignment % 16 == 0, "Unsupported alignment"); + using Type = typename std::conditional::type>::type; + int* dstInt = reinterpret_cast(dst); + int* srcInt = reinterpret_cast(src); + const uintptr_t dstPtr = reinterpret_cast(dst); + const uintptr_t srcPtr = reinterpret_cast(src); + const uint64_t numInt = bytes / sizeof(int); + Type* dstElem = reinterpret_cast((dstPtr + sizeof(Type) - 1) / sizeof(Type) * sizeof(Type)); + Type* srcElem = reinterpret_cast((srcPtr + sizeof(Type) - 1) / sizeof(Type) * sizeof(Type)); + uint64_t nFirstInt = (reinterpret_cast(dstElem) - dstPtr) / sizeof(int); + if (CopyRemainder) { + // Copy the remainder integers at the beginning. + Element::copy(dstInt, srcInt, nFirstInt, threadId, numThreads); } + // Copy elements. + constexpr uint64_t nIntPerElem = sizeof(Type) / sizeof(int); + uint64_t nElem = (numInt - nFirstInt) / nIntPerElem; + Element::copy(dstElem, srcElem, nElem, threadId, numThreads); + if (CopyRemainder && nIntPerElem > 1) { + // Copy the remainder integers at the end. + uint64_t nLastInt = (numInt - nFirstInt) % nIntPerElem; + Element::copy(dstInt + nFirstInt + nElem * nIntPerElem, srcInt + nFirstInt + nElem * nIntPerElem, nLastInt, + threadId, numThreads); + } + } + + /// Copy data from the local memory to the remote memory. + /// + /// This function is intended to be collectively called by multiple threads. Each thread copies a part of data. + /// + /// @tparam Alignment The alignment of the source and destination addresses. Should be 4, 8, or a multiple of 16. + /// @tparam CopyRemainder Whether to copy remainder bytes when the number of bytes is not a multiple of @p Alignment. + /// @param dstOffset The offset in bytes of the remote address. Should be a multiple of @p Alignment. + /// @param srcOffset The offset in bytes of the local address. Should be a multiple of @p Alignment. + /// @param bytes Bytes of the data to be copied. Should be a multiple of @p Alignment. + /// @param threadId The index of the current thread among all threads running this function. This is different from + /// the `threadIdx` in CUDA. + /// @param numThreads The total number of threads that run this function. + template + __forceinline__ __device__ void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t bytes, uint32_t threadId, + uint32_t numThreads) { + copy((char*)dst_ + dstOffset, (char*)src_ + srcOffset, bytes, threadId, numThreads); + } + + /// Copy data from the remote memory to the local memory. + /// + /// This function is intended to be collectively called by multiple threads. Each thread copies a part of data. + /// + /// @tparam Alignment The alignment of the source and destination addresses. Should be 4, 8, or a multiple of 16. + /// @tparam CopyRemainder Whether to copy remainder bytes when the number of bytes is not a multiple of @p Alignment. + /// @param dstOffset The offset in bytes of the remote address. Should be a multiple of @p Alignment. + /// @param srcOffset The offset in bytes of the local address. Should be a multiple of @p Alignment. + /// @param bytes Bytes of the data to be copied. Should be a multiple of @p Alignment. + /// @param threadId The index of the current thread among all threads running this function. This is different from + /// the `threadIdx` in CUDA. + /// @param numThreads The total number of threads that run this function. + template + __forceinline__ __device__ void get(uint64_t dstOffset, uint64_t srcOffset, uint64_t bytes, uint32_t threadId, + uint32_t numThreads) { + // Note that `dst` and `src` are swapped for `get()`. + copy((char*)src_ + srcOffset, (char*)dst_ + dstOffset, bytes, threadId, numThreads); + } + + template + __forceinline__ __device__ void put(uint64_t offset, uint64_t size, uint32_t threadId, uint32_t numThreads) { + put(offset, offset, size, threadId, numThreads); + } + + template + __forceinline__ __device__ void get(uint64_t offset, uint64_t size, uint32_t threadId, uint32_t numThreads) { + get(offset, offset, size, threadId, numThreads); } __forceinline__ __device__ void putPackets(uint64_t dstOffset, uint64_t srcOffset, uint64_t size, uint32_t threadId, @@ -52,11 +246,6 @@ struct SmChannel { __forceinline__ __device__ void wait() { semaphore_.wait(); } #endif // __CUDACC__ - private: - SmDevice2DeviceSemaphore::DeviceHandle semaphore_; - void* src_; - void* dst_; - void* getPacketBuffer_; }; } // namespace mscclpp diff --git a/test/deploy/perf_ndmv4.jsonl b/test/deploy/perf_ndmv4.jsonl index 0a78227d..bec80a94 100644 --- a/test/deploy/perf_ndmv4.jsonl +++ b/test/deploy/perf_ndmv4.jsonl @@ -1,7 +1,15 @@ -{"name":"allgather", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":271.83, "busBw":237.85, "size":1073741824, "time":3949.94, "target":"throughput"} -{"name":"allgather", "kernel":2, "ranks":16,"ranksPerNode":8, "algBw":243.86, "busBw":228.62, "size":3221225472, "time":13209.19,"target":"throughput"} -{"name":"allgather", "kernel":3, "ranks":8, "ranksPerNode":8, "algBw":0.1133, "busBw":0.1016, "size":8192, "time":72.88, "target":"latency"} -{"name":"allreduce", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":139.04, "busBw":243.32, "size":1073741824, "time":7722.32, "target":"throughput"} -{"name":"allreduce", "kernel":2, "ranks":8, "ranksPerNode":8, "algBw":1.40, "busBw":2.45, "size":8192, "time":5.86, "target":"latency"} -{"name":"alltoall", "kernel":0, "ranks":16,"ranksPerNode":8, "algBw":46.49, "busBw":43.5928,"size":1073741824, "time":23091.7, "target":"throughput"} -{"name":"alltoall", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":275.54, "busBw":241.10, "size":1073741824, "time":3896.75, "target":"throughput"} +{"name":"allgather", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":291.52, "busBw":255.08, "size":1073741824, "time":3683.13, "target":"throughput"} +{"name":"allgather", "kernel":2, "ranks":16,"ranksPerNode":8, "algBw":244.61, "busBw":229.33, "size":3221225472, "time":13168.31,"target":"throughput"} +{"name":"allgather", "kernel":3, "ranks":8, "ranksPerNode":8, "algBw":0.1112, "busBw":0.0973, "size":8192, "time":73.63, "target":"latency"} +{"name":"allreduce", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":139.41, "busBw":243.96, "size":1073741824, "time":7701.98, "target":"throughput"} +{"name":"allreduce", "kernel":2, "ranks":8, "ranksPerNode":8, "algBw":1.25, "busBw":2.19, "size":8192, "time":6.51, "target":"latency"} +{"name":"allreduce", "kernel":2, "ranks":16,"ranksPerNode":8, "algBw":0.51, "busBw":0.96, "size":8192, "time":15.96, "target":"latency"} +{"name":"allreduce", "kernel":3, "ranks":8, "ranksPerNode":8, "algBw":139.08, "busBw":243.40, "size":1073741824, "time":7719.85, "target":"throughput"} +{"name":"allreduce", "kernel":4, "ranks":8, "ranksPerNode":8, "algBw":106.98, "busBw":187.22, "size":16777216, "time":156.81, "target":"throughput"} +{"name":"allreduce", "kernel":4, "ranks":8, "ranksPerNode":8, "algBw":116.24, "busBw":203.42, "size":33554432, "time":288.65, "target":"throughput"} +{"name":"allreduce", "kernel":4, "ranks":16,"ranksPerNode":8, "algBw":84.55, "busBw":158.53, "size":25165824, "time":297.64, "target":"throughput"} +{"name":"allreduce", "kernel":4, "ranks":16,"ranksPerNode":8, "algBw":99.43, "busBw":186.44, "size":50331648, "time":506.16, "target":"throughput"} +{"name":"allreduce", "kernel":4, "ranks":16,"ranksPerNode":8, "algBw":124.60, "busBw":233.64, "size":3221225472, "time":25850.67,"target":"throughput"} +{"name":"allreduce", "kernel":3, "ranks":16,"ranksPerNode":8, "algBw":119.5, "busBw":224.06, "size":3221225472, "time":26955.85,"target":"throughput"} +{"name":"alltoall", "kernel":0, "ranks":16,"ranksPerNode":8, "algBw":46.53, "busBw":43.63, "size":1073741824, "time":23071.5, "target":"throughput"} +{"name":"alltoall", "kernel":1, "ranks":8, "ranksPerNode":8, "algBw":276.17, "busBw":241.65, "size":1073741824, "time":3887.87, "target":"throughput"} diff --git a/test/deploy/run_tests.sh b/test/deploy/run_tests.sh index b0a39927..d8c5bf38 100644 --- a/test/deploy/run_tests.sh +++ b/test/deploy/run_tests.sh @@ -25,6 +25,18 @@ function run_mscclpp_test() -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \ -npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 1 -o /root/mscclpp/output.jsonl + /usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \ + -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \ + -npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1M -f 2 -k 2 -o /root/mscclpp/output.jsonl + + /usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \ + -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \ + -npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 3K -e 3G -f 2 -k 3 -o /root/mscclpp/output.jsonl + + /usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \ + -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \ + -npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 3K -e 3G -f 2 -k 4 -o /root/mscclpp/output.jsonl + echo "==================Run alltoall_test_perf on 2 nodes=========================" /usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \ -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \ diff --git a/test/deploy/setup.sh b/test/deploy/setup.sh index ab5d4ba0..9be3b10c 100644 --- a/test/deploy/setup.sh +++ b/test/deploy/setup.sh @@ -8,5 +8,10 @@ chown root:root /root/.ssh/config chmod 400 /root/mscclpp/sshkey chown root:root /root/mscclpp/sshkey +nvidia-smi -pm 1 +for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do + nvidia-smi -ac $(nvidia-smi --query-gpu=clocks.max.memory,clocks.max.sm --format=csv,noheader,nounits -i $i | sed 's/\ //') -i $i +done + mkdir -p /var/run/sshd /usr/sbin/sshd -p 22345 diff --git a/test/mp_unit/sm_channel_tests.cu b/test/mp_unit/sm_channel_tests.cu index eae9f21c..6f99037f 100644 --- a/test/mp_unit/sm_channel_tests.cu +++ b/test/mp_unit/sm_channel_tests.cu @@ -60,7 +60,7 @@ void SmChannelOneToOneTest::setupMeshConnections(std::vector __constant__ mscclpp::SmChannel gChannelOneToOneTestConstSmChans; -__global__ void kernelDirectPingPong(int* buff, int rank, int nElem, int* ret) { +__global__ void kernelSmPutPingPong(int* buff, int rank, int nElem, int* ret) { mscclpp::SmChannel& smChan = gChannelOneToOneTestConstSmChans; volatile int* sendBuff = (volatile int*)buff; int nTries = 1000; @@ -107,7 +107,7 @@ __global__ void kernelDirectPingPong(int* buff, int rank, int nElem, int* ret) { } } -TEST_F(SmChannelOneToOneTest, PingPong) { +TEST_F(SmChannelOneToOneTest, PutPingPong) { if (gEnv->rank >= numRanksToUse) return; const int nElem = 4 * 1024 * 1024; @@ -122,31 +122,107 @@ TEST_F(SmChannelOneToOneTest, PingPong) { std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); - kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, ret.get()); + kernelSmPutPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); *ret = 0; - kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); + kernelSmPutPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); *ret = 0; - kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); + kernelSmPutPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); *ret = 0; - kernelDirectPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); + kernelSmPutPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); } -__global__ void kernelDirectPacketPingPong(int* buff, int rank, int nElem, int* ret) { +__global__ void kernelSmGetPingPong(int* buff, int rank, int nElem, int* ret) { + if (rank > 1) return; + + mscclpp::SmChannel& smChan = gChannelOneToOneTestConstSmChans; + volatile int* buffPtr = (volatile int*)buff; + int offset0 = (rank == 0) ? 0 : 10000000; + int offset1 = (rank == 0) ? 10000000 : 0; + int nTries = 1000; + + for (int i = 0; i < nTries; i++) { + // rank=0: 0, 1, 0, 1, ... + // rank=1: 1, 0, 1, 0, ... + if ((rank ^ (i & 1)) == 0) { + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + buffPtr[j] = offset0 + i + j; + } + if (threadIdx.x == 0) { + smChan.signal(); + } + } else { + if (threadIdx.x == 0) { + smChan.wait(); + } + __syncthreads(); + smChan.get(0, 0, nElem * sizeof(int), threadIdx.x, blockDim.x); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (buffPtr[j] != offset1 + i + j) { + // printf("rank %d ERROR: buff[%d] = %d, expected %d\n", rank, j, buffPtr[j], offset1 + i + j); + *ret = 1; + break; + } + } + } + } +} + +TEST_F(SmChannelOneToOneTest, GetPingPong) { + if (gEnv->rank >= numRanksToUse) return; + + const int nElem = 4 * 1024 * 1024; + + std::vector smChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + setupMeshConnections(smChannels, buff.get(), nElem * sizeof(int)); + + ASSERT_EQ(smChannels.size(), 1); + MSCCLPP_CUDATHROW( + cudaMemcpyToSymbol(gChannelOneToOneTestConstSmChans, smChannels.data(), sizeof(mscclpp::SmChannel))); + + std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); + + kernelSmGetPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelSmGetPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelSmGetPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + *ret = 0; + + kernelSmGetPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); +} + +__global__ void kernelSmPacketPingPong(int* buff, int rank, int nElem, int* ret) { if (rank > 1) return; mscclpp::SmChannel& smChan = gChannelOneToOneTestConstSmChans; @@ -208,25 +284,25 @@ TEST_F(SmChannelOneToOneTest, PacketPingPong) { std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); // The least nelem is 2 for packet ping pong - kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 2, ret.get()); + kernelSmPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 2, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); *ret = 0; - kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); + kernelSmPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); *ret = 0; - kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); + kernelSmPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 1024 * 1024, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); *ret = 0; - kernelDirectPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); + kernelSmPacketPingPong<<<1, 1024>>>(buff.get(), gEnv->rank, 4 * 1024 * 1024, ret.get()); MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); EXPECT_EQ(*ret, 0); diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu index 824b17e2..5411c5ac 100644 --- a/test/mscclpp-test/allgather_test.cu +++ b/test/mscclpp-test/allgather_test.cu @@ -4,12 +4,11 @@ #include #include +#include #include #include "common.hpp" -#define ALIGN 4 - namespace { auto isUsingHostOffload = [](int kernelNum) { return kernelNum == 3; }; constexpr uint64_t MAGIC = 0xdeadbeef; @@ -18,6 +17,8 @@ constexpr uint64_t MAGIC = 0xdeadbeef; __constant__ mscclpp::SimpleProxyChannel constDevChans[16]; __constant__ mscclpp::ProxyChannel constRawDevChan[16]; +__constant__ mscclpp::SmChannel constSmChans[8]; + __device__ void allgather0(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int remoteRank, size_t nelemsPerGPU) { // this allgather is really simple and implemented as an alltoall @@ -34,35 +35,83 @@ __device__ void allgather0(mscclpp::SimpleProxyChannel devChan, int rank, int wo if (threadIdx.x % 32 == 0) devChan.wait(); } -__device__ void localAllGather(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int nranksPerNode, +__device__ void localAllGather(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int nRanksPerNode, int remoteRank, uint64_t offset, uint64_t size, bool flushAfterSignal = true) { // this allgather algorithm works as follows: - // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode - // and waits for data from GPU rank (i-1) % nranksPerNode - // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode + // Step 1: GPU rank i sends data to GPU rank (i+1) % nRanksPerNode + // and waits for data from GPU rank (i-1) % nRanksPerNode + // Step 2: GPU rank i sends data to GPU rank (i+2) % nRanksPerNode // ... // This order is much better for DMA engine for NVLinks - for (int i = 1; i < nranksPerNode; i++) { - if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) { - // put your data to GPU (rank+i) % nranksPerNode and signal in one call + for (int i = 1; i < nRanksPerNode; i++) { + if ((remoteRank % nRanksPerNode) == ((rank + i) % nRanksPerNode)) { + // put your data to GPU (rank+i) % nRanksPerNode and signal in one call if (flushAfterSignal && (threadIdx.x % 32) == 0) devChan.putWithSignalAndFlush(offset, size); if (!flushAfterSignal && (threadIdx.x % 32) == 0) devChan.putWithSignal(offset, size); } - // wait for the data from GPU (rank-i) % nranksPerNode to arrive - if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { + // wait for the data from GPU (rank-i) % nRanksPerNode to arrive + if ((remoteRank % nRanksPerNode) == ((rank - i + nRanksPerNode) % nRanksPerNode)) { if ((threadIdx.x % 32) == 0) devChan.wait(); } - asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); + asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nRanksPerNode - 1) * 32) : "memory"); } } -__device__ void allgather1(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int nranksPerNode, +__device__ mscclpp::DeviceSyncer deviceSyncer; + +// This kernel is the most performant when the number of blocks is a multiple of (nRanksPerNode - 1). +__device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunkIndex, uint64_t offsetInRankChunk, + uint64_t rankChunkSize, uint64_t size, size_t nBlocks) { + if (nRanksPerNode == 1) return; + if (blockIdx.x >= nBlocks) return; + const size_t nPeer = nRanksPerNode - 1; + const size_t peerIdx = blockIdx.x % nPeer; + const size_t nBlockForThisPeer = nBlocks / nPeer + (nBlocks % nPeer > peerIdx ? 1 : 0); + const size_t peerLocalBlockIdx = blockIdx.x / nPeer; + const size_t rankLocalIndex = rank % nRanksPerNode; + const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1); + + // Split the data into chunks for aligned data access. Ignore the remainder here and let the last block handle it. + constexpr size_t chunkBytes = 128; // heuristic value + const size_t nChunk = size / chunkBytes; + const size_t nMinChunkPerBlock = nChunk / nBlockForThisPeer; + const size_t nRemainderChunk = nChunk % nBlockForThisPeer; + + // Distribute chunks to blocks + size_t nChunkForThisBlock; + size_t offsetForThisBlock; + if (peerLocalBlockIdx < nRemainderChunk) { + nChunkForThisBlock = nMinChunkPerBlock + 1; + offsetForThisBlock = (nMinChunkPerBlock + 1) * peerLocalBlockIdx; + } else { + nChunkForThisBlock = nMinChunkPerBlock; + offsetForThisBlock = + (nMinChunkPerBlock + 1) * nRemainderChunk + (peerLocalBlockIdx - nRemainderChunk) * nMinChunkPerBlock; + } + offsetForThisBlock *= chunkBytes; + + // Calculate the size of the data for this block + size_t sizeForThisBlock = nChunkForThisBlock * chunkBytes; + const size_t lastChunkSize = size - nChunk * chunkBytes; + if (lastChunkSize > 0 && peerLocalBlockIdx == nBlockForThisPeer - 1) { + sizeForThisBlock += lastChunkSize; + } + if (threadIdx.x == 0 && peerLocalBlockIdx == 0) { + constSmChans[peerIdx].signal(); + constSmChans[peerIdx].wait(); + } + deviceSyncer.sync(nBlocks); + size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; + constSmChans[peerIdx].get(offset + offsetForThisBlock, sizeForThisBlock, threadIdx.x, blockDim.x); +} + +__device__ void allgather1(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int nRanksPerNode, int remoteRank, size_t nelemsPerGPU) { - localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + localAllGather(devChan, rank, worldSize, nRanksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); } -__device__ void allgather2(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int nranksPerNode, +__device__ void allgather2(mscclpp::SimpleProxyChannel devChan, int rank, int worldSize, int nRanksPerNode, int remoteRank, size_t nelemsPerGPU) { // this allgather is a pipelined and hierarchical one and only works for two nodes // it is implemented as follows: @@ -78,12 +127,12 @@ __device__ void allgather2(mscclpp::SimpleProxyChannel devChan, int rank, int wo // Step 1 // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), + if (remoteRank / nRanksPerNode == rank / nRanksPerNode) { + localAllGather(devChan, rank, worldSize, nRanksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int), false); } // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { + if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { // opposite side if ((threadIdx.x % 32) == 0) devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), @@ -93,23 +142,23 @@ __device__ void allgather2(mscclpp::SimpleProxyChannel devChan, int rank, int wo // sync here to make sure IB flush dose not block the CUDA IPC traffic __syncthreads(); - // since all CUDA IPC share the same CUDA stream, only need to flush one of devChans - if ((remoteRank % nranksPerNode == rank % nranksPerNode) || - (remoteRank / nranksPerNode == rank / nranksPerNode && rank % nranksPerNode == 0)) { + // need to flush ib channel here to avoid cq overflow. since we won't change send suffer after send, we don't need + // to flush for IPC channel. + if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { if ((threadIdx.x % 32) == 0) devChan.flush(); } __syncthreads(); // Step 2 // local allgather - int otherNghr = (rank + nranksPerNode) % worldSize; - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), + int otherNghr = (rank + nRanksPerNode) % worldSize; + if (remoteRank / nRanksPerNode == rank / nRanksPerNode) { + localAllGather(devChan, rank, worldSize, nRanksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int), (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int), false); } // cross-node exchange - if (remoteRank % nranksPerNode == rank % nranksPerNode) { + if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { // opposite side if ((threadIdx.x % 32) == 0) devChan.putWithSignal((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), @@ -118,16 +167,15 @@ __device__ void allgather2(mscclpp::SimpleProxyChannel devChan, int rank, int wo } __syncthreads(); - if ((remoteRank % nranksPerNode == rank % nranksPerNode) || - (remoteRank / nranksPerNode == rank / nranksPerNode && rank % nranksPerNode == 0)) { + if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { if ((threadIdx.x % 32) == 0) devChan.flush(); } __syncthreads(); // Step 3 // local allgather - if (remoteRank / nranksPerNode == rank / nranksPerNode) { - localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, + if (remoteRank / nRanksPerNode == rank / nRanksPerNode) { + localAllGather(devChan, rank, worldSize, nRanksPerNode, remoteRank, (otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), nelemsPerGPU / pipelineSize * sizeof(int)); } @@ -149,22 +197,82 @@ __device__ void allgather3(mscclpp::ProxyChannel devChan, int rank, int worldSiz } } -__global__ void kernel(int rank, int worldSize, int nranksPerNode, size_t nelemsPerGPU, int kernel) { +__device__ void allgather4(int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU) { + // this allgather is a pipelined and hierarchical one and only works for two nodes + // it is implemented as follows: + // Step 1: each node does a local allgather and concurrently, + // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with + // its cross-node neighbor (local GPU i on the other node) via IB + // Step 2: each node does a local allgather again with the data just received from its + // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with + // its cross-node neighbor + // Step 3: each node does a local allgather for the last time with the rest of the data + + int pipelineSize = 3; + int peerRank = (rank + nRanksPerNode) % worldSize; + int peerNodeId = peerRank / nRanksPerNode; + int peer = (peerRank < rank) ? peerRank : peerRank - 1; + mscclpp::SimpleProxyChannel& devChan = constDevChans[peer]; + const size_t nBlocksForLocalAllGather = gridDim.x; + const size_t rankChunkSize = nelemsPerGPU * sizeof(int); + const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode; + const int startRankIndexInPeerNode = (peerRank / nRanksPerNode) * nRanksPerNode; + + if (peerNodeId == rank / nRanksPerNode) { + localAllGatherSm(rank, nRanksPerNode, 0, 0, rankChunkSize, rankChunkSize, nBlocksForLocalAllGather); + return; + } + + constexpr size_t alignment = 128; + size_t step1Bytes = (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int); + step1Bytes = step1Bytes / alignment * alignment; + const size_t step2Bytes = nelemsPerGPU * sizeof(int) - step1Bytes; + + // Step 1 + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), step1Bytes); + } + localAllGatherSm(rank, nRanksPerNode, startRankIndexInLocalNode, 0, rankChunkSize, rankChunkSize, + nBlocksForLocalAllGather); + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.wait(); + devChan.flush(); + } + deviceSyncer.sync(nBlocksForLocalAllGather); + // Step 2 + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int) + step1Bytes, step2Bytes); + } + localAllGatherSm(rank, nRanksPerNode, startRankIndexInPeerNode, 0, rankChunkSize, step1Bytes, + nBlocksForLocalAllGather); + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.wait(); + devChan.flush(); + } + deviceSyncer.sync(nBlocksForLocalAllGather); + // Step 3 + localAllGatherSm(rank, nRanksPerNode, startRankIndexInPeerNode, step1Bytes, rankChunkSize, step2Bytes, + nBlocksForLocalAllGather); +} + +__global__ void kernel(int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU, int kernel) { // find the mapping between remoteRank and devConns int warpId = threadIdx.x / 32; int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks mscclpp::SimpleProxyChannel devChan = constDevChans[warpId]; - if (kernel == 0) + if (kernel == 0) { allgather0(devChan, rank, worldSize, remoteRank, nelemsPerGPU); - else if (kernel == 1) - allgather1(devChan, rank, worldSize, nranksPerNode, remoteRank, nelemsPerGPU); - else if (kernel == 2) - allgather2(devChan, rank, worldSize, nranksPerNode, remoteRank, nelemsPerGPU); - else if (kernel == 3) { + } else if (kernel == 1) { + allgather1(devChan, rank, worldSize, nRanksPerNode, remoteRank, nelemsPerGPU); + } else if (kernel == 2) { + allgather2(devChan, rank, worldSize, nRanksPerNode, remoteRank, nelemsPerGPU); + } else if (kernel == 3) { mscclpp::ProxyChannel devChan = constRawDevChan[warpId]; allgather3(devChan, rank, worldSize); + } else if (kernel == 4) { + allgather4(rank, worldSize, nRanksPerNode, nelemsPerGPU); } } @@ -255,6 +363,7 @@ class AllGatherTestColl : public BaseTestColl { void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; void setupCollTest(size_t size) override; + std::vector getKernelRestrictions() override; }; void AllGatherTestColl::runColl(const TestArgs& args, cudaStream_t stream) { @@ -262,7 +371,16 @@ void AllGatherTestColl::runColl(const TestArgs& args, cudaStream_t stream) { const int rank = args.rank; const int nRanksPerNode = args.nRanksPerNode; const int kernelNum = args.kernelNum; - kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(rank, worldSize, nRanksPerNode, paramCount_, kernelNum); + int nBlocks; + int nThreads; + if (kernelNum == 4) { + nBlocks = 21; + nThreads = 1024; + } else { + nBlocks = 1; + nThreads = 32 * (worldSize - 1); + } + kernel<<>>(rank, worldSize, nRanksPerNode, paramCount_, kernelNum); } void AllGatherTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { @@ -295,7 +413,7 @@ void AllGatherTestColl::getBw(const double deltaSec, double& algBw, double& busB void AllGatherTestColl::setupCollTest(size_t size) { size_t count = size / typeSize_; - size_t base = (count / (ALIGN * worldSize_)) * ALIGN; + size_t base = (count / worldSize_); sendCount_ = base; recvCount_ = base * worldSize_; paramCount_ = base; @@ -304,6 +422,17 @@ void AllGatherTestColl::setupCollTest(size_t size) { auto service = std::dynamic_pointer_cast(chanService_); service->setSendBytes(sendCount_ * typeSize_); } + mscclpp::DeviceSyncer syncer = {}; + CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); +} + +std::vector AllGatherTestColl::getKernelRestrictions() { + return {// {kernelNum, kernelName, compatibleWithMultiNodes, countDivisorForMultiNodes, alignedBytes} + {0, "allgather0", true, 1, 4 * worldSize_}, + {1, "allgather1", false, 1, 4 * worldSize_}, + {2, "allgather2", true, 3, 4 * worldSize_}, + {3, "allgather3", true, 1, 4 * worldSize_}, + {4, "allgather4", true, 3, 16 * worldSize_ /*use ulong2 to transfer data*/}}; } class AllGatherTestEngine : public BaseTestEngine { @@ -340,6 +469,11 @@ void AllGatherTestEngine::setupConnections() { assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::SimpleProxyChannel)); CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(), sizeof(mscclpp::SimpleProxyChannel) * devChannels.size())); + + std::vector smChannels; + setupMeshConnections(smChannels, sendBuff_.get(), args_.maxBytes); + assert(smChannels.size() < sizeof(constSmChans) / sizeof(mscclpp::SmChannel)); + CUDATHROW(cudaMemcpyToSymbol(constSmChans, smChannels.data(), sizeof(mscclpp::SmChannel) * smChannels.size())); } else { auto service = std::dynamic_pointer_cast(chanService_); setupMeshConnections(devChannels, sendBuff_.get(), args_.maxBytes, nullptr, 0, @@ -347,7 +481,7 @@ void AllGatherTestEngine::setupConnections() { std::vector>& remoteMemories, const mscclpp::RegisteredMemory& localMemory) { std::vector semaphoreIds; - for (int i = 0; i < conns.size(); ++i) { + for (size_t i = 0; i < conns.size(); ++i) { service->addSemaphore(conns[i]); service->addRemoteMemory(remoteMemories[i].get()); } diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index e4710d6b..10f4d732 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -8,13 +8,14 @@ #include "common.hpp" -#define ALIGN 4 #define BLOCKS_PER_PEER 1 __constant__ mscclpp::SimpleProxyChannel constDevFstRoundChans[16]; __constant__ mscclpp::SimpleProxyChannel constDevSndRoundChans[16]; -__constant__ mscclpp::SmChannel constSmChans[8]; +__constant__ mscclpp::SmChannel constSmInPlaceChans[8]; +__constant__ mscclpp::SmChannel constSmOutOfPlaceChans[8]; +__constant__ mscclpp::SmChannel constSmOutOfPlaceGetChans[8]; __device__ uint64_t globalFlag; // TODO(chhwang): need an interface for this. @@ -40,12 +41,12 @@ __host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t ch return Chunk{offset, chunkIdx < remainder ? largeChunkSize : smallChunkSize}; } -__forceinline__ __device__ void vectorSum(int* dst, int* src, size_t nElem) { +__forceinline__ __device__ void vectorSum(int* dst, int* src, size_t nElem, int blockId, int nBlocks) { size_t nInt4 = nElem / 4; size_t nLastInts = nElem % 4; int4* dst4 = (int4*)dst; int4* src4 = (int4*)src; - for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < nInt4; i += blockDim.x * gridDim.x) { + for (int i = threadIdx.x + blockId * blockDim.x; i < nInt4; i += blockDim.x * nBlocks) { dst4[i].w += src4[i].w; dst4[i].x += src4[i].x; dst4[i].y += src4[i].y; @@ -54,12 +55,16 @@ __forceinline__ __device__ void vectorSum(int* dst, int* src, size_t nElem) { if (nLastInts > 0) { int* dstLast = dst + nInt4 * 4; int* srcLast = src + nInt4 * 4; - for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < nLastInts; i += blockDim.x * gridDim.x) { + for (int i = threadIdx.x + blockId * blockDim.x; i < nLastInts; i += blockDim.x * nBlocks) { dstLast[i] += srcLast[i]; } } } +__forceinline__ __device__ void vectorSum(int* dst, int* src, size_t nElem) { + vectorSum(dst, src, nElem, blockIdx.x, gridDim.x); +} + __device__ void vectorSumSingleBlock(int* dst, int* src, size_t nElem) { for (int i = threadIdx.x; i < nElem; i += blockDim.x) { dst[i] += src[i]; @@ -67,6 +72,442 @@ __device__ void vectorSumSingleBlock(int* dst, int* src, size_t nElem) { } __device__ mscclpp::DeviceSyncer deviceSyncer; +__device__ mscclpp::DeviceSyncer reduceScatterDeviceSyncer; +__device__ mscclpp::DeviceSyncer ibDeviceSyncer; + +__device__ void localReduceScatter(int* buff, int* scratch, int rank, int nRanksPerNode, int startChunkIndex, + size_t offsetInChunk, size_t chunkSize, size_t nelems) { + if (nRanksPerNode == 1) { + return; + } + int isComm = (threadIdx.x == 0) && (blockIdx.x == 0); + int startRankInNode = (rank / nRanksPerNode) * nRanksPerNode; + int rankIndexInNode = rank % nRanksPerNode; + + for (int i = 1; i < nRanksPerNode; ++i) { + int remoteSendToRank = (rank + i) % nRanksPerNode + startRankInNode; + int remoteRecvFromRank = (rank + nRanksPerNode - i) % nRanksPerNode + startRankInNode; + int peerSendId = (remoteSendToRank < rank) ? remoteSendToRank : remoteSendToRank - 1; + int peerRecvId = (remoteRecvFromRank < rank) ? remoteRecvFromRank : remoteRecvFromRank - 1; + + mscclpp::SimpleProxyChannel& devFstSendChan = constDevFstRoundChans[peerSendId]; + mscclpp::SimpleProxyChannel& devFstRecvChan = constDevFstRoundChans[peerRecvId]; + size_t srcOffset = + (((rankIndexInNode + i) % nRanksPerNode + startChunkIndex) * chunkSize + offsetInChunk) * sizeof(int); + size_t dstOffset = rank * chunkSize * sizeof(int); + + if (i == 1) { + if (isComm) { + devFstSendChan.putWithSignal(dstOffset, srcOffset, nelems * sizeof(int)); + } + } else { + int pre = i - 1; + int preRemoteRecvFromRank = (rank + nRanksPerNode - pre) % nRanksPerNode + startRankInNode; + int prePeerRecvId = (preRemoteRecvFromRank < rank) ? preRemoteRecvFromRank : preRemoteRecvFromRank - 1; + + // overlap communication and computation + mscclpp::SimpleProxyChannel& preDevFstRecvChan = constDevFstRoundChans[prePeerRecvId]; + if (isComm) { + preDevFstRecvChan.wait(); + devFstSendChan.putWithSignal(dstOffset, srcOffset, nelems * sizeof(int)); + } + + deviceSyncer.sync(gridDim.x); + size_t offset = ((startChunkIndex + rankIndexInNode) * chunkSize + offsetInChunk) * sizeof(int); + size_t scratchOffset = preRemoteRecvFromRank * chunkSize * sizeof(int); + int* dst = (int*)((char*)buff + offset); + int* src = (int*)((char*)scratch + scratchOffset); + vectorSum(dst, src, nelems); + } + // for last iteration, wait for the last send + if (i == nRanksPerNode - 1) { + if (isComm) { + devFstRecvChan.wait(); + } + deviceSyncer.sync(gridDim.x); + size_t offset = ((startChunkIndex + rankIndexInNode) * chunkSize + offsetInChunk) * sizeof(int); + size_t scratchOffset = remoteRecvFromRank * chunkSize * sizeof(int); + int* dst = (int*)((char*)buff + offset); + int* src = (int*)((char*)scratch + scratchOffset); + vectorSum(dst, src, nelems); + } + } +} + +__device__ void reduceScatter(int* buff, int* scratch, int rank, int nRanksPerNode, int worldSize, + size_t nelems // must be divisible by 3 +) { + // this reduce-scatter algorithm works as follows: + // Step 1: each node does a local reduce-scatter on peer node data chunks with 1/pipeline portion of chunk data. For + // example, 2 nodes and each node has 2 ranks. rank 0 and rank 1 perform reduce-scatter on chunk 2 and chunk 3, with + // 1/pipeline portion of the data. + // Step 2: each node does a local reduce-scatter on peers data chunks with (pipeline-1)/pipeline portion of chunk + // data. Meanwhile, exchange the reduced data of the previous step with its cross-node neighbor (same local rank + // number on the other node) via IB. Then performs a reduce operation. + // Step 3: each node does a local reduce-scatter on local ranks, meanwhile exchange the reduced data of the previous + // step with its cross-node neighbor (same local rank number on the other node) via IB. Then performs a reduce + // operation. + int pipelineSize = 3; + const size_t chunkSize = nelems / worldSize; + int peerRank = (rank + nRanksPerNode) % worldSize; + int peerNodeId = peerRank / nRanksPerNode; + int isComm = (threadIdx.x == 0) && (blockIdx.x == 0); + int peer = (peerRank < rank) ? peerRank : peerRank - 1; + mscclpp::SimpleProxyChannel& devChan = constDevFstRoundChans[peer]; + if (peerNodeId == rank / nRanksPerNode) { + localReduceScatter(buff, scratch, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize); + return; + } + + // step 1: local reduce + int startChunkIndex = peerNodeId * nRanksPerNode; + localReduceScatter(buff, scratch, rank, nRanksPerNode, startChunkIndex, 0, chunkSize, chunkSize / pipelineSize); + deviceSyncer.sync(gridDim.x); + + // step 2: local reduce and exchange data with neighbor + if (isComm) { + size_t offset = (peerRank * chunkSize) * sizeof(int); + // opposite side + devChan.putWithSignal(offset, (chunkSize / pipelineSize * sizeof(int))); + } + localReduceScatter(buff, scratch, rank, nRanksPerNode, startChunkIndex, chunkSize / pipelineSize, chunkSize, + 2 * chunkSize / pipelineSize); + if (isComm) { + devChan.wait(); + } + deviceSyncer.sync(gridDim.x); + // reduce data received from peer to related rank + size_t offset = rank * chunkSize * sizeof(int); + int* dst = (int*)((char*)buff + offset); + int* src = (int*)((char*)scratch + offset); + vectorSum(dst, src, chunkSize / pipelineSize); + if (isComm) { + devChan.flush(); + } + deviceSyncer.sync(gridDim.x); + + // step 3: local reduce and exchange data with neighbor + startChunkIndex = (rank / nRanksPerNode) * nRanksPerNode; + if (isComm) { + size_t offset = (peerRank * chunkSize + chunkSize / pipelineSize) * sizeof(int); + devChan.putWithSignal(offset, (pipelineSize - 1) * chunkSize / pipelineSize * sizeof(int)); + } + localReduceScatter(buff, scratch, rank, nRanksPerNode, startChunkIndex, 0, chunkSize, chunkSize); + if (isComm) { + devChan.wait(); + } + deviceSyncer.sync(gridDim.x); + // reduce to related rank + offset = (rank * chunkSize + chunkSize / pipelineSize) * sizeof(int); + dst = (int*)((char*)buff + offset); + src = (int*)((char*)scratch + offset); + vectorSum(dst, src, 2 * chunkSize / pipelineSize); + if (isComm) { + devChan.flush(); + } +} + +// Run with a single thread only. +__device__ void localAllGather(int rank, int nRanksPerNode, uint64_t offset, uint64_t size) { + // this allgather algorithm works as follows: + // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode + // and waits for data from GPU rank (i-1) % nranksPerNode + // Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode + // ... + // This order is much better for DMA engine for NVLinks + if (nRanksPerNode == 1) return; + + int startRankInNode = (rank / nRanksPerNode) * nRanksPerNode; + for (int i = 1; i < nRanksPerNode; i++) { + int remoteSendToRank = (rank + i) % nRanksPerNode + startRankInNode; + int remoteRecvFromRank = (rank + nRanksPerNode - i) % nRanksPerNode + startRankInNode; + int peerSendId = (remoteSendToRank < rank) ? remoteSendToRank : remoteSendToRank - 1; + int peerRecvId = (remoteRecvFromRank < rank) ? remoteRecvFromRank : remoteRecvFromRank - 1; + + mscclpp::SimpleProxyChannel& devSendChan = constDevSndRoundChans[peerSendId]; + mscclpp::SimpleProxyChannel& devRecvChan = constDevSndRoundChans[peerRecvId]; + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + devSendChan.putWithSignal(offset, size); + devRecvChan.wait(); + } +} + +// Run with a single thread only. +__device__ void allGather(int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU) { + // this allgather is a pipelined and hierarchical one and only works for two nodes + // it is implemented as follows: + // Step 1: each node does a local allgather and concurrently, + // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with + // its cross-node neighbor (local GPU i on the other node) via IB + // Step 2: each node does a local allgather again with the data just received from its + // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with + // its cross-node neighbor + // Step 3: each node does a local allgather for the last time with the rest of the data + + int pipelineSize = 3; + int peerRank = (rank + nRanksPerNode) % worldSize; + int peerNodeId = peerRank / nRanksPerNode; + int peer = (peerRank < rank) ? peerRank : peerRank - 1; + mscclpp::SimpleProxyChannel& devChan = constDevSndRoundChans[peer]; + + if (peerNodeId == rank / nRanksPerNode) { + localAllGather(rank, nRanksPerNode, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + return; + } + + // Step 1 + devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + localAllGather(rank, nRanksPerNode, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + devChan.wait(); + devChan.flush(); + // Step 2 + devChan.putWithSignal((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); + localAllGather(rank, nRanksPerNode, peerRank * nelemsPerGPU * sizeof(int), + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); + devChan.wait(); + devChan.flush(); + // Step 3 + localAllGather(rank, nRanksPerNode, + (peerRank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), + nelemsPerGPU / pipelineSize * sizeof(int)); +} + +__device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRanksPerNode, int startChunkIndex, + size_t offsetInChunk, size_t chunkSize, size_t nelems, int nBlocks) { + if (nRanksPerNode == 1) return; + if (blockIdx.x >= nBlocks) return; + const int nPeer = nRanksPerNode - 1; + mscclpp::SmChannel* smChans = constSmOutOfPlaceGetChans; + + const size_t localRankIndexInNode = rank % nRanksPerNode; + const size_t indexOffset = ((localRankIndexInNode + startChunkIndex) * chunkSize + offsetInChunk); + const size_t indexOffset4 = indexOffset / 4; + + int4* buff4 = (int4*)buff; + + for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { + smChans[peerIdx].signal(); + } + for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { + smChans[peerIdx].wait(); + } + reduceScatterDeviceSyncer.sync(nBlocks); + + const size_t nInt4 = nelems / 4; + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * nBlocks) { + int4 sum = make_int4(0, 0, 0, 0); + + for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + int4 val = smChans[peerIdx].read(indexOffset4 + idx); + sum.w += val.w; + sum.x += val.x; + sum.y += val.y; + sum.z += val.z; + } + buff4[indexOffset4 + idx].w += sum.w; + buff4[indexOffset4 + idx].x += sum.x; + buff4[indexOffset4 + idx].y += sum.y; + buff4[indexOffset4 + idx].z += sum.z; + } + + const size_t nLastInts = nelems % 4; + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nLastInts; idx += blockDim.x * nBlocks) { + int sum = 0; + for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + int val = smChans[peerIdx].read(indexOffset + nInt4 * 4 + idx); + sum += val; + } + buff[indexOffset + nInt4 * 4 + idx] += sum; + } +} + +__device__ void reduceScatterSm(int* buff, int* scratch, int rank, int nRanksPerNode, int worldSize, + size_t nelems // must be divisible by 3 +) { + // this reduce-scatter algorithm works as follows: + // Step 1: each node does a local reduce-scatter on peer node data chunks with 1/pipeline portion of chunk data. For + // example, 2 nodes and each node has 2 ranks. rank 0 and rank 1 perform reduce-scatter on chunk 2 and chunk 3, with + // 1/pipeline portion of the data. + // Step 2: each node does a local reduce-scatter on peers data chunks with (pipeline-1)/pipeline portion of chunk + // data. Meanwhile, exchange the reduced data of the previous step with its cross-node neighbor (same local rank + // number on the other node) via IB. Then performs a reduce operation. + // Step 3: each node does a local reduce-scatter on local ranks, meanwhile exchange the reduced data of the previous + // step with its cross-node neighbor (same local rank number on the other node) via IB. Then performs a reduce + // operation. + int pipelineSize = 3; + float nBlocksForReduceScatterRatio = 0.8; + const size_t chunkSize = nelems / worldSize; + const int peerRank = (rank + nRanksPerNode) % worldSize; + int peerNodeId = peerRank / nRanksPerNode; + int nBlocksForReduceScatter = + (int)(nBlocksForReduceScatterRatio * gridDim.x) / (nRanksPerNode - 1) * (nRanksPerNode - 1); + int isComm = (threadIdx.x == 0) && (blockIdx.x == nBlocksForReduceScatter); + int peer = (peerRank < rank) ? peerRank : peerRank - 1; + int nBlocksRemain = gridDim.x - nBlocksForReduceScatter; + mscclpp::SimpleProxyChannel& devChan = constDevFstRoundChans[peer]; + if (peerNodeId == rank / nRanksPerNode) { + localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize, nBlocksForReduceScatter); + return; + } + + // step 1: local reduce + int startChunkIndex = peerNodeId * nRanksPerNode; + localReduceScatterSm(buff, scratch, rank, nRanksPerNode, startChunkIndex, 0, chunkSize, chunkSize / pipelineSize, + nBlocksForReduceScatter); + deviceSyncer.sync(gridDim.x); + + // step 2: local reduce and exchange data with neighbor + if (isComm) { + size_t offset = (peerRank * chunkSize) * sizeof(int); + // opposite side + devChan.putWithSignal(offset, (chunkSize / pipelineSize * sizeof(int))); + } + localReduceScatterSm(buff, scratch, rank, nRanksPerNode, startChunkIndex, chunkSize / pipelineSize, chunkSize, + 2 * chunkSize / pipelineSize, nBlocksForReduceScatter); + if (isComm) { + devChan.wait(); + } + if (blockIdx.x >= nBlocksForReduceScatter) { + ibDeviceSyncer.sync(nBlocksRemain); + // reduce data received from peer to related rank + size_t offset = rank * chunkSize * sizeof(int); + int* dst = (int*)((char*)buff + offset); + int* src = (int*)((char*)scratch + offset); + vectorSum(dst, src, chunkSize / pipelineSize, blockIdx.x - nBlocksForReduceScatter, nBlocksRemain); + } + if (isComm) { + devChan.flush(); + } + deviceSyncer.sync(gridDim.x); + + // step 3: local reduce and exchange data with neighbor + startChunkIndex = (rank / nRanksPerNode) * nRanksPerNode; + if (isComm) { + size_t offset = (peerRank * chunkSize + chunkSize / pipelineSize) * sizeof(int); + devChan.putWithSignal(offset, (pipelineSize - 1) * chunkSize / pipelineSize * sizeof(int)); + } + localReduceScatterSm(buff, scratch, rank, nRanksPerNode, startChunkIndex, 0, chunkSize, chunkSize, + nBlocksForReduceScatter); + if (isComm) { + devChan.wait(); + } + deviceSyncer.sync(gridDim.x); + // reduce to related rank, can not overlap since localReduceScatter also calculate the sum + size_t offset = (rank * chunkSize + chunkSize / pipelineSize) * sizeof(int); + int* dst = (int*)((char*)buff + offset); + int* src = (int*)((char*)scratch + offset); + vectorSum(dst, src, 2 * chunkSize / pipelineSize); + if (isComm) { + devChan.flush(); + } +} + +// This kernel is the most performant when the number of blocks is a multiple of (nRanksPerNode - 1). +__device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunkIndex, uint64_t offsetInRankChunk, + uint64_t rankChunkSize, uint64_t size, size_t nBlocks) { + if (nRanksPerNode == 1) return; + if (blockIdx.x >= nBlocks) return; + const size_t nPeer = nRanksPerNode - 1; + const size_t peerIdx = blockIdx.x % nPeer; + const size_t nBlockForThisPeer = nBlocks / nPeer + (nBlocks % nPeer > peerIdx ? 1 : 0); + const size_t peerLocalBlockIdx = blockIdx.x / nPeer; + const size_t rankLocalIndex = rank % nRanksPerNode; + const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1); + + // Split the data into chunks for aligned data access. Ignore the remainder here and let the last block handle it. + constexpr size_t chunkBytes = 128; // heuristic value + const size_t nChunk = size / chunkBytes; + const size_t nMinChunkPerBlock = nChunk / nBlockForThisPeer; + const size_t nRemainderChunk = nChunk % nBlockForThisPeer; + + // Distribute chunks to blocks + size_t nChunkForThisBlock; + size_t offsetForThisBlock; + if (peerLocalBlockIdx < nRemainderChunk) { + nChunkForThisBlock = nMinChunkPerBlock + 1; + offsetForThisBlock = (nMinChunkPerBlock + 1) * peerLocalBlockIdx; + } else { + nChunkForThisBlock = nMinChunkPerBlock; + offsetForThisBlock = + (nMinChunkPerBlock + 1) * nRemainderChunk + (peerLocalBlockIdx - nRemainderChunk) * nMinChunkPerBlock; + } + offsetForThisBlock *= chunkBytes; + + // Calculate the size of the data for this block + size_t sizeForThisBlock = nChunkForThisBlock * chunkBytes; + const size_t lastChunkSize = size - nChunk * chunkBytes; + if (lastChunkSize > 0 && peerLocalBlockIdx == nBlockForThisPeer - 1) { + sizeForThisBlock += lastChunkSize; + } + if (threadIdx.x == 0 && peerLocalBlockIdx == 0) { + constSmInPlaceChans[peerIdx].signal(); + constSmInPlaceChans[peerIdx].wait(); + } + deviceSyncer.sync(nBlocks); + size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; + constSmInPlaceChans[peerIdx].get(offset + offsetForThisBlock, sizeForThisBlock, threadIdx.x, blockDim.x); +} + +// This is an allgather4 equivalent +__device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU) { + // this allgather is a pipelined and hierarchical one and only works for two nodes + // it is implemented as follows: + // Step 1: each node does a local allgather and concurrently, + // local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with + // its cross-node neighbor (local GPU i on the other node) via IB + // Step 2: each node does a local allgather again with the data just received from its + // cross-node neighbor in step 1, and concurrently, exchange the rest of the data with + // its cross-node neighbor + // Step 3: each node does a local allgather for the last time with the rest of the data + + int pipelineSize = 3; + int peerRank = (rank + nRanksPerNode) % worldSize; + int peerNodeId = peerRank / nRanksPerNode; + int peer = (peerRank < rank) ? peerRank : peerRank - 1; + mscclpp::SimpleProxyChannel& devChan = constDevSndRoundChans[peer]; + const size_t nBlocksForLocalAllGather = gridDim.x; + const size_t rankChunkSize = nelemsPerGPU * sizeof(int); + const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode; + const int startRankIndexInPeerNode = (peerRank / nRanksPerNode) * nRanksPerNode; + + if (peerNodeId == rank / nRanksPerNode) { + localAllGatherSm(rank, nRanksPerNode, 0, 0, rankChunkSize, rankChunkSize, nBlocksForLocalAllGather); + return; + } + + constexpr size_t alignment = 128; + size_t step1Bytes = (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int); + step1Bytes = step1Bytes / alignment * alignment; + const size_t step2Bytes = nelemsPerGPU * sizeof(int) - step1Bytes; + + // Step 1 + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), step1Bytes); + } + localAllGatherSm(rank, nRanksPerNode, startRankIndexInLocalNode, 0, rankChunkSize, rankChunkSize, + nBlocksForLocalAllGather); + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.wait(); + devChan.flush(); + } + deviceSyncer.sync(nBlocksForLocalAllGather); + // Step 2 + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int) + step1Bytes, step2Bytes); + } + localAllGatherSm(rank, nRanksPerNode, startRankIndexInPeerNode, 0, rankChunkSize, step1Bytes, + nBlocksForLocalAllGather); + if (threadIdx.x == 0 && blockIdx.x == 0) { + devChan.wait(); + devChan.flush(); + } + deviceSyncer.sync(nBlocksForLocalAllGather); + // Step 3 + localAllGatherSm(rank, nRanksPerNode, startRankIndexInPeerNode, step1Bytes, rankChunkSize, step2Bytes, + nBlocksForLocalAllGather); +} __device__ void allreduce0(int* buff, int* scratch, int rank, int worldSize, size_t nelems, size_t scratchDataCount) { int peerId = blockIdx.x / BLOCKS_PER_PEER; @@ -238,7 +679,7 @@ __device__ void allreduce2(int* buff, void* scratch, void* putPktBuf, void* getP // Channel to a local peer int smChanIdx = blockIdx.x / BLOCKS_PER_PEER; - mscclpp::SmChannel smChan = constSmChans[smChanIdx]; + mscclpp::SmChannel smChan = constSmOutOfPlaceChans[smChanIdx]; // Channel to a remote peer that has the same local rank as me int localRank = rank % nRanksPerNode; @@ -335,6 +776,21 @@ __device__ void allreduce2(int* buff, void* scratch, void* putPktBuf, void* getP } } +__device__ void allreduce3(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, + size_t nelems) { + reduceScatter(buff, scratch, rank, nRanksPerNode, worldSize, nelems); + if (threadIdx.x == 0 && blockIdx.x == 0) { + allGather(rank, worldSize, nRanksPerNode, nelems / worldSize); + } +} + +__device__ void allreduce4(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, + size_t nelems) { + reduceScatterSm(buff, scratch, rank, nRanksPerNode, worldSize, nelems); + deviceSyncer.sync(gridDim.x); + allGatherSm(rank, worldSize, nRanksPerNode, nelems / worldSize); +} + __global__ void kernel(void* buff, void* scratch, void* result, void* putPktBuf, void* getPktBuf, int rank, int nRanksPerNode, int worldSize, size_t nelems, size_t scratchDataCount, int kernel) { if (kernel == 0) @@ -343,6 +799,10 @@ __global__ void kernel(void* buff, void* scratch, void* result, void* putPktBuf, allreduce1((int*)buff, (int*)scratch, rank, worldSize, nelems, scratchDataCount); else if (kernel == 2) allreduce2((int*)buff, scratch, putPktBuf, getPktBuf, result, rank, nRanksPerNode, worldSize, nelems); + else if (kernel == 3) + allreduce3((int*)buff, (int*)scratch, result, rank, nRanksPerNode, worldSize, nelems); + else if (kernel == 4) + allreduce4((int*)buff, (int*)scratch, result, rank, nRanksPerNode, worldSize, nelems); } class AllReduceTestColl : public BaseTestColl { @@ -354,6 +814,7 @@ class AllReduceTestColl : public BaseTestColl { void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; void setupCollTest(size_t size) override; + std::vector getKernelRestrictions() override; }; void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { @@ -363,20 +824,30 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { const int nPeers = worldSize - 1; const Chunk chunk = getChunk(paramCount_, worldSize, rank); const size_t scratchDataCount = chunk.size * nPeers; + int nBlocks; + int nThreadsPerBlock; void* tmpBuff; if (kernelNum == 0) { nBlocks = nPeers * BLOCKS_PER_PEER; tmpBuff = scratchBuff; - } else if (kernelNum == 1) { + nThreadsPerBlock = 1024; + } else if (kernelNum == 1 || kernelNum == 3) { nBlocks = 24; tmpBuff = scratchBuff; + nThreadsPerBlock = 1024; + } else if (kernelNum == 4) { + nBlocks = 45; + tmpBuff = scratchBuff; + nThreadsPerBlock = 512; } else { nBlocks = std::max(args.nRanksPerNode - 1, 1) * BLOCKS_PER_PEER; tmpBuff = scratchPacketBuff; + nThreadsPerBlock = 1024; } - kernel<<>>(inputBuff, tmpBuff, resultBuff, putPacketBuff, getPacketBuff, rank, - args.nRanksPerNode, worldSize, paramCount_, scratchDataCount, kernelNum); + kernel<<>>(inputBuff, tmpBuff, resultBuff, putPacketBuff, getPacketBuff, rank, + args.nRanksPerNode, worldSize, paramCount_, scratchDataCount, + kernelNum); } void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { @@ -409,16 +880,32 @@ void AllReduceTestColl::setupCollTest(size_t size) { mscclpp::DeviceSyncer syncer = {}; uint64_t initFlag = 1; CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); + CUDATHROW(cudaMemcpyToSymbol(reduceScatterDeviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); + CUDATHROW(cudaMemcpyToSymbol(ibDeviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); CUDATHROW(cudaMemcpyToSymbol(globalFlag, &initFlag, sizeof(uint64_t))); } +std::vector AllReduceTestColl::getKernelRestrictions() { + return {// {kernelNum, kernelName, compatibleWithMultiNodes, countDivisorForMultiNodes, alignedBytes} + {0, "allreduce0", true, 1, .alignedBytes = 4 * worldSize_}, + {1, "allreduce1", true, 1, .alignedBytes = 4 * worldSize_}, + {2, "allreduce2", true, 1, .alignedBytes = 4 * worldSize_}, + {3, "allreduce3", true, 3, .alignedBytes = 4 * worldSize_}, + { + 4, + "allreduce4", + true, + 3, + .alignedBytes = 16 * worldSize_ /*use ulong2 to transfer data*/, + }}; +} + class AllReduceTestEngine : public BaseTestEngine { public: AllReduceTestEngine(const TestArgs& args); ~AllReduceTestEngine() = default; void allocateBuffer() override; - std::shared_ptr createChannelService() override; void setupConnections() override; bool isUsePacket() const; @@ -454,7 +941,7 @@ void AllReduceTestEngine::allocateBuffer() { inputBuff = inputBuff_.get(); resultBuff = resultBuff_.get(); - if (args_.kernelNum == 0 || args_.kernelNum == 1) { + if (args_.kernelNum == 0 || args_.kernelNum == 1 || args_.kernelNum == 3 || args_.kernelNum == 4) { scratchBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); scratchBuff = scratchBuff_.get(); } else if (args_.kernelNum == 2) { @@ -473,14 +960,6 @@ void AllReduceTestEngine::allocateBuffer() { expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); } -std::shared_ptr AllReduceTestEngine::createChannelService() { - if (isUsePacket()) { - return std::make_shared(*comm_); - } else { - return std::make_shared(*comm_); - } -} - void AllReduceTestEngine::setupConnections() { if (isUsePacket()) { std::vector smChannels; @@ -494,26 +973,47 @@ void AllReduceTestEngine::setupConnections() { packetBuffBytes, getPacketBuff_.get(), packetBuffBytes, scratchPacketBuff_.get(), scratchPacketBuffBytes); - assert(smChannels.size() < sizeof(constSmChans) / sizeof(mscclpp::SmChannel)); + assert(smChannels.size() < sizeof(constSmOutOfPlaceChans) / sizeof(mscclpp::SmChannel)); assert(devChannels.size() < sizeof(constDevFstRoundChans) / sizeof(mscclpp::SimpleProxyChannel)); - CUDATHROW(cudaMemcpyToSymbol(constSmChans, smChannels.data(), sizeof(mscclpp::SmChannel) * smChannels.size())); + CUDATHROW( + cudaMemcpyToSymbol(constSmOutOfPlaceChans, smChannels.data(), sizeof(mscclpp::SmChannel) * smChannels.size())); CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, devChannels.data(), sizeof(mscclpp::SimpleProxyChannel) * devChannels.size())); } else { std::vector fstRoundChannels; std::vector sndRoundChannels; - // Send data from local sendBuff to remote scratchBuff (out-of-place) + std::vector smOutOfPlaceChannels; + std::vector smInPlaceChannels; + std::vector smOutputPlaceGetChannels; + + // Send data from local inputBuff to remote scratchBuff (out-of-place) setupMeshConnections(fstRoundChannels, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); assert(fstRoundChannels.size() < sizeof(constDevFstRoundChans) / sizeof(mscclpp::SimpleProxyChannel)); CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, fstRoundChannels.data(), sizeof(mscclpp::SimpleProxyChannel) * fstRoundChannels.size())); - // Send data from local sendBuff to remote sendBuff (in-place) + // Send data from local inputBuff to remote inputBuff (in-place) setupMeshConnections(sndRoundChannels, inputBuff_.get(), args_.maxBytes); assert(sndRoundChannels.size() < sizeof(constDevSndRoundChans) / sizeof(mscclpp::SimpleProxyChannel)); CUDATHROW(cudaMemcpyToSymbol(constDevSndRoundChans, sndRoundChannels.data(), sizeof(mscclpp::SimpleProxyChannel) * sndRoundChannels.size())); + + setupMeshConnections(smOutOfPlaceChannels, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); + assert(smOutOfPlaceChannels.size() < sizeof(constSmOutOfPlaceChans) / sizeof(mscclpp::SmChannel)); + CUDATHROW(cudaMemcpyToSymbol(constSmOutOfPlaceChans, smOutOfPlaceChannels.data(), + sizeof(mscclpp::SmChannel) * smOutOfPlaceChannels.size())); + + setupMeshConnections(smInPlaceChannels, inputBuff_.get(), args_.maxBytes); + assert(smInPlaceChannels.size() < sizeof(constSmInPlaceChans) / sizeof(mscclpp::SmChannel)); + CUDATHROW(cudaMemcpyToSymbol(constSmInPlaceChans, smInPlaceChannels.data(), + sizeof(mscclpp::SmChannel) * smInPlaceChannels.size())); + + setupMeshConnections(smOutputPlaceGetChannels, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes, + ChannelSemantic::GET); + assert(smOutputPlaceGetChannels.size() < sizeof(constSmOutOfPlaceGetChans) / sizeof(mscclpp::SmChannel)); + CUDATHROW(cudaMemcpyToSymbol(constSmOutOfPlaceGetChans, smOutputPlaceGetChannels.data(), + sizeof(mscclpp::SmChannel) * smOutputPlaceGetChannels.size())); } } diff --git a/test/mscclpp-test/alltoall_test.cu b/test/mscclpp-test/alltoall_test.cu index f46fdb76..f2e37fcc 100644 --- a/test/mscclpp-test/alltoall_test.cu +++ b/test/mscclpp-test/alltoall_test.cu @@ -6,7 +6,6 @@ #include "common.hpp" -#define ALIGN 4 __constant__ mscclpp::SimpleProxyChannel constDevChans[16]; __device__ mscclpp::DeviceSyncer deviceSyncer; void* localRecvBuff; @@ -65,6 +64,7 @@ class AllToAllTestColl : public BaseTestColl { void initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) override; void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override; void setupCollTest(size_t size) override; + std::vector getKernelRestrictions() override; }; void AllToAllTestColl::runColl(const TestArgs& args, cudaStream_t stream) { @@ -103,7 +103,7 @@ void AllToAllTestColl::getBw(const double deltaSec, double& algBw, double& busBw void AllToAllTestColl::setupCollTest(size_t size) { size_t count = size / typeSize_; - size_t base = (count / worldSize_ / (ALIGN)) * ALIGN * worldSize_; + size_t base = count; sendCount_ = base; recvCount_ = base; paramCount_ = base / worldSize_; @@ -113,6 +113,12 @@ void AllToAllTestColl::setupCollTest(size_t size) { CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); } +std::vector AllToAllTestColl::getKernelRestrictions() { + return {// {kernelNum, kernelName, compatibleWithMultiNodes, countDivisorForMultiNodes} + {0, "alltoall0", true, 1, 4 * worldSize_}, + {1, "alltoall1", false, 1, 4 * worldSize_}}; +} + class AllToAllTestEngine : public BaseTestEngine { public: AllToAllTestEngine(const TestArgs& args); diff --git a/test/mscclpp-test/check_perf_result.py b/test/mscclpp-test/check_perf_result.py index 9f6b6d38..1430526e 100644 --- a/test/mscclpp-test/check_perf_result.py +++ b/test/mscclpp-test/check_perf_result.py @@ -24,38 +24,23 @@ def load_perf_file(perf_fine: str) -> dict: def check_perf_result(perf_result: dict, baseline: dict, time_threshold: float, bandwidth_threshold: float) -> bool: res = True + threshold = None for key, value in perf_result.items(): if key not in baseline: continue if baseline[key]["target"] == "latency": - if abs(value["time"] - baseline[key]["time"]) / baseline[key]["time"] > time_threshold: - logging.error( - "%s: time %f not match baseline %f with threshold %f", - str(key), - value["time"], - baseline[key]["time"], - time_threshold, - ) - res = False - elif baseline[key]["target"] == "throughput": - if abs(value["algBw"] - baseline[key]["algBw"]) / baseline[key]["algBw"] > bandwidth_threshold: - logging.error( - "%s: algBw %f not match baseline %f with threshold %f", - str(key), - value["algBw"], - baseline[key]["algBw"], - bandwidth_threshold, - ) - res = False - if abs(value["busBw"] - baseline[key]["busBw"]) / baseline[key]["busBw"] > bandwidth_threshold: - logging.error( - "%s: busBw %f not match baseline %f with threshold %f", - str(key), - value["busBw"], - baseline[key]["busBw"], - bandwidth_threshold, - ) - res = False + threshold = time_threshold + else: + threshold = bandwidth_threshold + if abs(value["time"] - baseline[key]["time"]) / baseline[key]["time"] > threshold: + logging.error( + "%s: time %f not match baseline %f with threshold %f", + str(key), + value["time"], + baseline[key]["time"], + time_threshold, + ) + res = False return res diff --git a/test/mscclpp-test/common.cc b/test/mscclpp-test/common.cc index a8f9055d..76db2df0 100644 --- a/test/mscclpp-test/common.cc +++ b/test/mscclpp-test/common.cc @@ -124,6 +124,36 @@ const std::string getBusId(int cudaDev) { } return std::string(busIdChar); } + +void validateArgsForDeviceKernel(const std::vector& restrictions, int kernelNum, size_t paramCount, + int worldSize, int nRanksPerNode, int typeSize = 4) { + auto iter = std::find_if(restrictions.begin(), restrictions.end(), [kernelNum](const KernelRestriction& restriction) { + return restriction.kernelNum == kernelNum; + }); + if (iter == restrictions.end()) { + PRINT("no restriction for kernelNum=" << kernelNum << std::endl); + return; + } + std::stringstream ss; + bool isOnMultiNodes = worldSize / nRanksPerNode > 1; + if (isOnMultiNodes && !iter->compatibleWithMultiNodes) { + ss << "kernel is not compatible with multi nodes, kernelNum=" << kernelNum << ", name=" << iter->kernelName + << ", worldSize=" << worldSize << ", nRanksPerNode=" << nRanksPerNode; + throw std::invalid_argument(ss.str()); + } + if (isOnMultiNodes && paramCount % iter->countDivisorForMultiNodes != 0) { + ss << "kernel is not compatible with input size, kernelNum=" << kernelNum << ", name=" << iter->kernelName + << ", paramCount=" << paramCount << ", countDivisorForMultiNodes=" << iter->countDivisorForMultiNodes; + throw std::invalid_argument(ss.str()); + } + bool sizeAlignedForMultiNode = (paramCount * typeSize / iter->countDivisorForMultiNodes) % iter->alignedBytes == 0; + if (((paramCount * typeSize) % iter->alignedBytes != 0) || (isOnMultiNodes && !sizeAlignedForMultiNode)) { + ss << "kernel is not compatible with alignment restriction, kernelNum=" << kernelNum + << ", name=" << iter->kernelName << ", paramCount=" << paramCount << ", alignedBytes=" << iter->alignedBytes + << ", countDivisorForMultiNodes=" << iter->countDivisorForMultiNodes; + throw std::invalid_argument(ss.str()); + } +} } // namespace int getDeviceNumaNode(int cudaDev) { @@ -201,6 +231,8 @@ void BaseTestEngine::runTest() { // warm-up for large size this->coll_->setupCollTest(args_, args_.maxBytes); this->barrier(); + validateArgsForDeviceKernel(coll_->getKernelRestrictions(), args_.kernelNum, coll_->getParamBytes() / sizeof(int), + args_.totalRanks, args_.nRanksPerNode); for (int iter = 0; iter < warmup_iters; iter++) { this->coll_->runColl(args_, stream_); } @@ -209,6 +241,8 @@ void BaseTestEngine::runTest() { // warm-up for small size this->coll_->setupCollTest(args_, args_.minBytes); this->barrier(); + validateArgsForDeviceKernel(coll_->getKernelRestrictions(), args_.kernelNum, coll_->getParamBytes() / sizeof(int), + args_.totalRanks, args_.nRanksPerNode); for (int iter = 0; iter < warmup_iters; iter++) { this->coll_->runColl(args_, stream_); } @@ -232,6 +266,8 @@ void BaseTestEngine::runTest() { ss << std::setw(12) << std::max(coll_->getSendBytes(), coll_->getExpectedBytes()) << " " << std::setw(12) << coll_->getParamBytes() / sizeof(int); + validateArgsForDeviceKernel(coll_->getKernelRestrictions(), args_.kernelNum, coll_->getParamBytes() / sizeof(int), + args_.totalRanks, args_.nRanksPerNode); double deltaSec = benchTime(); size_t nErrors = 0; @@ -388,6 +424,40 @@ void BaseTestEngine::setupMeshConnections(std::vectorsetup(); } +void BaseTestEngine::setupMeshConnections(std::vector& smChannels, void* inputBuff, + size_t inputBuffBytes, void* outputBuff, size_t outputBuffBytes, + ChannelSemantic semantic) { + const mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc | IBs[args_.gpuNum]; + mscclpp::RegisteredMemory inputBufRegMem = comm_->registerMemory(inputBuff, inputBuffBytes, allTransports); + mscclpp::RegisteredMemory getPacketBufRegMem; + mscclpp::RegisteredMemory outputBufRegMem; + if (outputBuff) { + outputBufRegMem = comm_->registerMemory(outputBuff, outputBuffBytes, allTransports); + } + + std::vector> connections; + std::vector> remoteRegMemories; + mscclpp::RegisteredMemory& localRegMemory = + (outputBuff && semantic == ChannelSemantic::PUT) ? outputBufRegMem : inputBufRegMem; + setupMeshConnectionsInternal(connections, localRegMemory, remoteRegMemories); + + std::unordered_map> smSemaphores; + for (size_t cid = 0; cid < connections.size(); ++cid) { + if (connections[cid]->transport() == mscclpp::Transport::CudaIpc) { + smSemaphores.emplace(cid, std::make_shared(*comm_, connections[cid])); + } + } + comm_->setup(); + + for (size_t cid = 0; cid < connections.size(); ++cid) { + if (connections[cid]->transport() == mscclpp::Transport::CudaIpc) { + smChannels.emplace_back(smSemaphores[cid]->deviceHandle(), remoteRegMemories[cid].get(), + (outputBuff && semantic == ChannelSemantic::GET) ? outputBuff : inputBufRegMem.data(), + nullptr); + } + } +} + void BaseTestEngine::setupMeshConnections(std::vector& smChannels, std::vector& devChannels, void* inputBuff, size_t inputBuffBytes, void* putPacketBuff, size_t putPacketBuffBytes, @@ -539,7 +609,6 @@ int main(int argc, char* argv[]) { "[-c,--check <0/1>] \n\t" "[-T,--timeout