mscclpp-test in Python (#204)

Co-authored-by: Binyang Li <binyli@microsoft.com>
Co-authored-by: Saeed Maleki <saemal@microsoft.com>
Co-authored-by: Esha Choukse <eschouks@microsoft.com>
This commit is contained in:
Changho Hwang
2023-11-16 12:45:25 +08:00
committed by GitHub
parent e710701728
commit 060fda12e6
24 changed files with 1589 additions and 155 deletions

View File

@@ -112,3 +112,20 @@ jobs:
set -e
python3 test/mscclpp-test/check_perf_result.py --perf-file output.jsonl --baseline-file test/deploy/perf_ndmv4.jsonl
workingDirectory: '$(System.DefaultWorkingDirectory)'
- task: Bash@3
name: PythonAllReduceBenchmark
displayName: Python Allreduce Benchmark
inputs:
targetType: 'inline'
script: |
set -e
export PATH=/usr/local/mpi/bin:$PATH
python3 -m pip install .
if [[ '$(containerImage)' == *'cuda11'* ]]; then
pip3 install -r ./python/requirements_cu11.txt
else
pip3 install -r ./python/requirements_cu12.txt
fi
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 python3 ./python/benchmark/allreduce_bench.py
workingDirectory: '$(System.DefaultWorkingDirectory)'

View File

@@ -83,7 +83,7 @@ jobs:
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/run_tests.sh mscclpp-test'
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/test/deploy/run_tests.sh mscclpp-test'
kill $CHILD_PID
- task: Bash@3
@@ -102,7 +102,7 @@ jobs:
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/run_tests.sh mp-ut'
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/test/deploy/run_tests.sh mp-ut'
kill $CHILD_PID
- task: Bash@3
@@ -121,7 +121,26 @@ jobs:
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/run_tests.sh pytests'
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/test/deploy/run_tests.sh pytests'
kill $CHILD_PID
- task: Bash@3
name: RunMultiNodePythonBenchmark
displayName: Run multi-nodes python benchmark
inputs:
targetType: 'inline'
script: |
set -e
HOSTFILE=$(System.DefaultWorkingDirectory)/test/mscclpp-test/deploy/hostfile
SSH_OPTION="StrictHostKeyChecking=no"
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
rm -rf output/*
mkdir -p output
touch output/mscclit-000000
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/test/deploy/run_tests.sh py-benchmark'
kill $CHILD_PID
- task: AzureCLI@2

View File

@@ -81,9 +81,9 @@ jobs:
export PATH=/usr/local/mpi/bin:$PATH
cd build && make pylib-copy
if [[ '$(containerImage)' == *'cuda11'* ]]; then
pip3 install -r ../python/test/requirements_cu11.txt
pip3 install -r ../python/requirements_cu11.txt
else
pip3 install -r ../python/test/requirements_cu12.txt
pip3 install -r ../python/requirements_cu12.txt
fi
mpirun -tag-output -np 8 ~/.local/bin/pytest ../python/test/test_mscclpp.py -x
mpirun -tag-output -x MSCCLPP_HOME=$(System.DefaultWorkingDirectory) -np 8 ~/.local/bin/pytest ../python/test/test_mscclpp.py -x
workingDirectory: '$(System.DefaultWorkingDirectory)'

View File

@@ -33,11 +33,13 @@ struct DeviceSyncer {
if (tmpIsAdd) {
if (atomicAdd(&count_, 1) == maxOldCnt) {
flag_ = 1;
count_ = 0;
}
POLL_MAYBE_JAILBREAK(!flag_, maxSpinCount);
} else {
if (atomicSub(&count_, 1) == 1) {
if (atomicAdd(&count_, 1) == maxOldCnt) {
flag_ = 0;
count_ = 0;
}
POLL_MAYBE_JAILBREAK(flag_, maxSpinCount);
}

View File

@@ -23,7 +23,7 @@ struct Host2DeviceSemaphoreDeviceHandle {
}
/// Wait for the host to signal.
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) {
__forceinline__ __device__ void wait(int64_t maxSpinCount = 100000000) {
(*expectedInboundSemaphoreId) += 1;
POLL_MAYBE_JAILBREAK((cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*inboundSemaphoreId}.load(
cuda::memory_order_acquire) < (*expectedInboundSemaphoreId)),
@@ -48,7 +48,7 @@ struct SmDevice2DeviceSemaphoreDeviceHandle {
}
/// Wait for the remote device to signal.
__forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) {
__forceinline__ __device__ void wait(int64_t maxSpinCount = 100000000) {
(*expectedInboundSemaphoreId) += 1;
POLL_MAYBE_JAILBREAK((cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*inboundSemaphoreId}.load(
cuda::memory_order_acquire) < (*expectedInboundSemaphoreId)),
@@ -68,6 +68,19 @@ struct SmDevice2DeviceSemaphoreDeviceHandle {
cuda::memory_order_seq_cst);
}
/// Signal the remote device.
///
/// This function is a relaxed version of signal() and provides no guarantee on the completion of memory operations.
/// User requires to call proper fencing before using this function.
///
__forceinline__ __device__ void relaxedSignal() {
// This fence ensures that preceding writes are visible on the peer GPU before the incremented
// `outboundSemaphoreId` is visible.
semaphoreIncrement();
cuda::atomic_ref<uint64_t, cuda::thread_scope_system>{*remoteInboundSemaphoreId}.store(semaphoreGetLocal(),
cuda::memory_order_relaxed);
}
/// Signal the remote device for copied packets.
///
/// Unlike @ref signal(), this function provides no guarantee on the completion of memory operations. This is

View File

@@ -16,30 +16,22 @@ namespace Element {
/// Load an element from DRAM.
///
/// This is a warpper of ld.volatile.global.* PTX instruction. Address alignment is not this function's
/// responsibility.
///
/// @param v The value to be loaded.
/// @param p The address of the value to be loaded.
///
template <typename T>
__forceinline__ __device__ void load(T& v, const T* p) {
// We should only use the specialized functions.
__assert_fail("Unsupported type", __FILE__, __LINE__, __PRETTY_FUNCTION__);
v = *p;
}
/// Write an element on DRAM.
///
/// This is a wrapper of st.volatile.global.* PTX instruction. Address alignment is not this function's
/// responsibility.
///
/// @param p The address of the value to be written.
/// @param v The value to be written.
///
template <typename T>
__forceinline__ __device__ void store(T* p, const T& v) {
// We should only use the specialized functions.
__assert_fail("Unsupported type", __FILE__, __LINE__, __PRETTY_FUNCTION__);
*p = v;
}
/// Copy aligned elements from the source memory to the destination memory.
@@ -64,52 +56,6 @@ __forceinline__ __device__ void copy(T* dst, T* src, uint64_t numElems, uint32_t
}
}
template <>
__forceinline__ __device__ void load<long long>(long long& v, const long long* p) {
asm volatile("ld.volatile.global.u64 %0, [%1];" : "=l"(v) : "l"(p) : "memory");
}
template <>
__forceinline__ __device__ void store<long long>(long long* p, const long long& v) {
asm volatile("st.volatile.global.u64 [%0], %1;" : : "l"(p), "l"(v) : "memory");
}
template <>
__forceinline__ __device__ void load<int>(int& v, const int* p) {
asm volatile("ld.volatile.global.u32 %0, [%1];" : "=r"(v) : "l"(p) : "memory");
}
template <>
__forceinline__ __device__ void store<int>(int* p, const int& v) {
asm volatile("st.volatile.global.u32 [%0], %1;" : : "l"(p), "r"(v) : "memory");
}
template <>
__forceinline__ __device__ void load<longlong2>(longlong2& v, const longlong2* p) {
asm volatile("ld.volatile.global.v2.u64 {%0,%1}, [%2];" : "=l"(v.x), "=l"(v.y) : "l"(p) : "memory");
}
template <>
__forceinline__ __device__ void store<longlong2>(longlong2* p, const longlong2& v) {
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" : : "l"(p), "l"(v.x), "l"(v.y) : "memory");
}
template <>
__forceinline__ __device__ void load<int4>(int4& v, const int4* p) {
asm volatile("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];"
: "=r"(v.x), "=r"(v.y), "=r"(v.z), "=r"(v.w)
: "l"(p)
: "memory");
}
template <>
__forceinline__ __device__ void store<int4>(int4* p, const int4& v) {
asm volatile("st.volatile.global.v4.u32 [%0], {%1,%2,%3,%4};"
:
: "l"(p), "r"(v.x), "r"(v.y), "r"(v.z), "r"(v.w)
: "memory");
}
} // namespace Element
#endif // __CUDACC__
@@ -315,6 +261,13 @@ struct SmChannelDeviceHandle {
///
__forceinline__ __device__ void signal() { semaphore_.signal(); }
/// Signal the remote semaphore.
///
/// This function is a relaxed version of signal() and provides no guarantee on the completion of memory operations.
/// User requires to call proper fencing before using this function.
///
__forceinline__ __device__ void relaxedSignal() { semaphore_.relaxedSignal(); }
/// Signal the remote semaphore for copied packets.
///
/// Unlike @ref signal(), this function provides no guarantee on the completion of memory operations. This is

View File

View File

@@ -0,0 +1,769 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include <cuda_fp16.h>
#include <mscclpp/concurrency.hpp>
#include <mscclpp/proxy_channel_device.hpp>
#include <mscclpp/sm_channel_device.hpp>
__device__ mscclpp::DeviceSyncer deviceSyncer;
__device__ mscclpp::DeviceSyncer allGatherDeviceSyncer;
__device__ mscclpp::DeviceSyncer reduceScatterDeviceSyncer;
__device__ mscclpp::DeviceSyncer ibDeviceSyncer;
#ifndef TYPE
#define TYPE float
#endif
#define VECTOR_SIZE (sizeof(int4) / sizeof(TYPE))
template <typename To, typename From>
__forceinline__ __device__ To bit_cast(const From& src) {
static_assert(sizeof(To) == sizeof(From), "Size mismatch for bit_cast");
union {
From f;
To t;
} u;
u.f = src;
return u.t;
}
template <typename T>
__forceinline__ __device__ T add_elements(T a, T b) {
return a + b;
}
template <>
__forceinline__ __device__ __half2 add_elements(__half2 a, __half2 b) {
return __hadd2(a, b);
}
template <typename T>
__forceinline__ __device__ int4 add_vectors_helper(int4 a, int4 b) {
int4 ret;
ret.w = bit_cast<int, T>(add_elements(bit_cast<T, int>(a.w), bit_cast<T, int>(b.w)));
ret.x = bit_cast<int, T>(add_elements(bit_cast<T, int>(a.x), bit_cast<T, int>(b.x)));
ret.y = bit_cast<int, T>(add_elements(bit_cast<T, int>(a.y), bit_cast<T, int>(b.y)));
ret.z = bit_cast<int, T>(add_elements(bit_cast<T, int>(a.z), bit_cast<T, int>(b.z)));
return ret;
}
template <typename T>
__forceinline__ __device__ int4 add_vectors(int4 a, int4 b) {
return add_vectors_helper<T>(a, b);
}
template <>
__forceinline__ __device__ int4 add_vectors<__half>(int4 a, int4 b) {
return add_vectors_helper<__half2>(a, b);
}
template <typename T>
__forceinline__ __device__ uint2 add_vectors_helper(uint2 a, uint2 b) {
uint2 ret;
ret.x = bit_cast<int, T>(add_elements(bit_cast<T, int>(a.x), bit_cast<T, int>(b.x)));
ret.y = bit_cast<int, T>(add_elements(bit_cast<T, int>(a.y), bit_cast<T, int>(b.y)));
return ret;
}
template <typename T>
__forceinline__ __device__ uint2 add_vectors(uint2 a, uint2 b) {
return add_vectors_helper<T>(a, b);
}
template <>
__forceinline__ __device__ uint2 add_vectors<__half>(uint2 a, uint2 b) {
return add_vectors_helper<__half2>(a, b);
}
template <typename T>
__forceinline__ __device__ int add_vectors_helper(int a, int b) {
return bit_cast<int, T>(add_elements(bit_cast<T, int>(a), bit_cast<T, int>(b)));
}
template <typename T>
__forceinline__ __device__ int add_vectors(int a, int b) {
return add_vectors_helper<T>(a, b);
}
template <>
__forceinline__ __device__ int add_vectors<__half>(int a, int b) {
return add_vectors_helper<__half2>(a, b);
}
__forceinline__ __device__ void vectorSum(TYPE* dst, TYPE* src, size_t nElem, int blockId, int nBlocks) {
size_t nInt4 = nElem / 4;
size_t nLastInts = nElem % 4;
int4* dst4 = (int4*)dst;
int4* src4 = (int4*)src;
for (int i = threadIdx.x + blockId * blockDim.x; i < nInt4; i += blockDim.x * nBlocks) {
dst4[i] = add_vectors<TYPE>(dst4[i], src4[i]);
}
if (nLastInts > 0) {
int* dstLast = ((int*)dst) + nInt4 * 4;
int* srcLast = ((int*)src) + nInt4 * 4;
for (int i = threadIdx.x + blockId * blockDim.x; i < nLastInts; i += blockDim.x * nBlocks) {
dstLast[i] = add_vectors<TYPE>(dstLast[i], srcLast[i]);
}
}
}
__forceinline__ __device__ void vectorSum(TYPE* dst, TYPE* src, size_t nElem) {
vectorSum(dst, src, nElem, blockIdx.x, gridDim.x);
}
// -------------------------------------------
// AllReduce1
// -------------------------------------------
#ifndef READ_ONLY
#define READ_ONLY 0
#endif
extern "C" __global__ void __launch_bounds__(1024, 1)
allreduce1(mscclpp::SmChannelDeviceHandle* smChans, TYPE* buff, int rank, int nranks, size_t nelems) {
const size_t chunkSize = nelems / nranks;
if (nranks == 1) return;
const int nPeer = nranks - 1;
const size_t indexOffset = rank * chunkSize;
const size_t indexOffset4 = indexOffset / VECTOR_SIZE;
int4* buff4 = (int4*)buff;
const int tid = threadIdx.x + blockIdx.x * blockDim.x;
// synchronize everyone
if (tid == 0) {
__threadfence_system();
}
__syncthreads();
if (tid < nPeer) {
smChans[tid].relaxedSignal();
}
if (tid >= nPeer && tid < nPeer * 2) {
smChans[tid - nPeer].wait();
}
deviceSyncer.sync(gridDim.x);
// use int4 as much as possible
const size_t nInt4 = chunkSize / VECTOR_SIZE;
for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * gridDim.x) {
int4 tmp = buff4[indexOffset4 + idx];
for (int index = 0; index < nPeer; ++index) {
int4 val;
int peerIdx = (index + rank);
if (peerIdx >= nPeer) peerIdx -= nPeer;
val = smChans[peerIdx].read<int4>(indexOffset4 + idx);
tmp = add_vectors<TYPE>(tmp, val);
}
if (READ_ONLY == 0) {
for (int index = 0; index < nPeer; ++index) {
int peerIdx = (index + rank);
if (peerIdx >= nPeer) peerIdx -= nPeer;
smChans[peerIdx].write<int4>(indexOffset4 + idx, tmp);
}
}
buff4[indexOffset4 + idx] = tmp;
}
// use the given TYPE for the rest
size_t processed = nInt4 * VECTOR_SIZE * nranks;
const size_t nRemElems = nelems - processed;
const size_t startIdx = processed + (nRemElems * rank) / nranks;
const size_t endIdx = processed + (nRemElems * (rank + 1)) / nranks;
for (int idx = threadIdx.x + blockIdx.x * blockDim.x + startIdx; idx < endIdx; idx += blockDim.x * gridDim.x) {
TYPE tmp = buff[idx];
for (int index = 0; index < nPeer; ++index) {
int peerIdx = (index + rank);
if (peerIdx >= nPeer) peerIdx -= nPeer;
TYPE val = smChans[peerIdx].read<TYPE>(idx);
tmp += val;
}
for (int index = 0; index < nPeer; ++index) {
int peerIdx = (index + rank);
if (peerIdx >= nPeer) peerIdx -= nPeer;
smChans[peerIdx].write<TYPE>(idx, tmp);
}
buff[idx] = tmp;
}
// synchronize everyone again
deviceSyncer.sync(gridDim.x);
if (tid == 0) {
__threadfence_system();
}
__syncthreads();
if (tid < nPeer) {
smChans[tid].relaxedSignal();
}
if (tid >= nPeer && tid < nPeer * 2) {
smChans[tid - nPeer].wait();
}
if (READ_ONLY) {
for (int i = 0; i < nPeer; ++i) {
int peerIdx = (i + rank);
if (peerIdx >= nPeer) peerIdx -= nPeer;
const int remoteRank = (peerIdx < rank ? peerIdx : peerIdx + 1);
size_t offset = chunkSize * remoteRank * sizeof(TYPE);
smChans[peerIdx].get(offset, chunkSize * sizeof(TYPE), tid, blockDim.x * gridDim.x);
}
}
}
// -------------------------------------------
// AllReduce2
// -------------------------------------------
__device__ uint64_t globalFlag = 1;
extern "C" __global__ void __launch_bounds__(512, 1)
allreduce2(mscclpp::SmChannelDeviceHandle* smChans, TYPE* buff, TYPE* scratch, void* resultBuff, int rank,
int worldSize, size_t nelems) {
nelems = nelems / (sizeof(int) / sizeof(TYPE));
// This version of allreduce only works for single nodes
const int nPeers = worldSize - 1;
const int nPkts = nelems / 2;
const int nelemsPerRank = nelems / worldSize;
const int nPktsPerRank = nelemsPerRank / 2;
// flag for packets. Initially 1
const uint32_t flag = (uint32_t)globalFlag;
// thread block & channel info
const int nBlocksPerPeer = gridDim.x / nPeers;
const int localBlockIdx = blockIdx.x % nBlocksPerPeer;
const int peerIdx = blockIdx.x / nBlocksPerPeer;
const int remoteRank = peerIdx < rank ? peerIdx : peerIdx + 1;
mscclpp::SmChannelDeviceHandle smChan = smChans[peerIdx];
const int tid = threadIdx.x + localBlockIdx * blockDim.x;
// double buffering
size_t scratchBaseOffset = (flag & 1) ? 0 : nPkts * sizeof(mscclpp::LLPacket);
void* scratchBuff = (void*)((char*)scratch + scratchBaseOffset);
size_t scratchOffset = scratchBaseOffset + rank * nPktsPerRank * sizeof(mscclpp::LLPacket);
size_t scratchResultOffset =
(flag & 1) ? 2 * nPkts * sizeof(mscclpp::LLPacket) : 3 * nPkts * sizeof(mscclpp::LLPacket);
size_t srcOffset = remoteRank * nelemsPerRank * sizeof(int);
uint2* src = (uint2*)((char*)buff + rank * nelemsPerRank * sizeof(int));
uint2* dst = (uint2*)((char*)resultBuff + rank * nelemsPerRank * sizeof(int));
// step 1: write to scratch buffer
smChan.putPackets(scratchOffset, srcOffset, nelemsPerRank * sizeof(int), tid, blockDim.x * nBlocksPerPeer, flag);
// step 2: get data from scratch buffer, reduce data and write result to remote scratch buffer
for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * gridDim.x) {
uint2 data = make_uint2(0, 0);
for (int index = 0; index < nPeers; index++) {
const int remoteRank = index < rank ? index : index + 1;
mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)scratchBuff + remoteRank * nPktsPerRank;
uint2 val = dstPkt[idx].read(flag);
data = add_vectors<TYPE>(val, data);
}
data = add_vectors<TYPE>(data, src[idx]);
dst[idx] = data;
mscclpp::LLPacket packet;
packet.data1 = data.x;
packet.flag1 = flag;
packet.data2 = data.y;
packet.flag2 = flag;
size_t offset = scratchResultOffset / sizeof(mscclpp::LLPacket) + (idx + rank * nPktsPerRank);
for (int index = 0; index < nPeers; index++) {
smChans[index].write(offset, packet);
}
}
// step 3: get data result from scratch buffer
mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)scratch + scratchResultOffset);
const int dstOffset = remoteRank * nPktsPerRank;
uint2* result = (uint2*)((char*)resultBuff + remoteRank * nelemsPerRank * sizeof(int));
for (int idx = threadIdx.x + localBlockIdx * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * nBlocksPerPeer) {
uint2 data = dstPkt[idx + dstOffset].read(flag);
result[idx].x = data.x;
result[idx].y = data.y;
}
if (threadIdx.x == 0 && blockIdx.x == 0) {
globalFlag += 1;
}
}
// -------------------------------------------
// AllReduce3
// -------------------------------------------
extern "C" __global__ void __launch_bounds__(1024, 1)
allreduce3(mscclpp::SimpleProxyChannelDeviceHandle* fstRoundChans,
mscclpp::SimpleProxyChannelDeviceHandle* sndRoundChans, TYPE* buff, TYPE* scratch, int rank,
int worldSize, size_t nelems) {
nelems = nelems / (sizeof(int) / sizeof(TYPE));
int isComm = (threadIdx.x == 0) && (blockIdx.x == 0);
int remoteSendRank = (rank + 1) % worldSize;
int remoteRecvRank = (rank + worldSize - 1) % worldSize;
int peerSendId = (remoteSendRank < rank) ? remoteSendRank : remoteSendRank - 1;
int peerRecvId = (remoteRecvRank < rank) ? remoteRecvRank : remoteRecvRank - 1;
mscclpp::SimpleProxyChannelDeviceHandle& devFstSendChan = fstRoundChans[peerSendId];
mscclpp::SimpleProxyChannelDeviceHandle& devFstRecvChan = fstRoundChans[peerRecvId];
mscclpp::SimpleProxyChannelDeviceHandle& devSndSendChan = sndRoundChans[peerSendId];
mscclpp::SimpleProxyChannelDeviceHandle& devSndRecvChan = sndRoundChans[peerRecvId];
// Step 1
size_t chunkIndex = (rank + worldSize - 1) % worldSize;
size_t chunkNelem = nelems / worldSize;
size_t offset = chunkIndex * chunkNelem * sizeof(int);
if (isComm) {
if (chunkNelem > 1) {
devFstSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int));
}
}
// Step 2 ~ Step n-1
for (int step = 2; step < worldSize; ++step) {
if (isComm) {
if (chunkNelem > 1) {
devFstRecvChan.wait();
devFstSendChan.flush();
}
devFstSendChan.putWithSignal(offset + chunkNelem / 2 * sizeof(int), (chunkNelem - chunkNelem / 2) * sizeof(int));
}
deviceSyncer.sync(gridDim.x);
// Reduce
chunkIndex = (rank + worldSize - step) % worldSize;
offset = chunkIndex * chunkNelem * sizeof(int);
int* dst = (int*)((char*)buff + offset);
int* src = (int*)((char*)scratch + offset);
vectorSum((TYPE*)dst, (TYPE*)src, chunkNelem / 2);
if (isComm) {
devFstRecvChan.wait();
devFstSendChan.flush();
if (chunkNelem > 1) {
devFstSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int));
}
}
deviceSyncer.sync(gridDim.x);
dst += chunkNelem / 2;
src += chunkNelem / 2;
vectorSum((TYPE*)dst, (TYPE*)src, chunkNelem - chunkNelem / 2);
}
// Step n
if (isComm) {
if (chunkNelem > 1) {
devFstRecvChan.wait();
devFstSendChan.flush();
}
devFstSendChan.putWithSignal(offset + chunkNelem / 2 * sizeof(int), (chunkNelem - chunkNelem / 2) * sizeof(int));
}
deviceSyncer.sync(gridDim.x);
offset = rank * chunkNelem * sizeof(int);
int* dst = (int*)((char*)buff + offset);
int* src = (int*)((char*)scratch + offset);
vectorSum((TYPE*)dst, (TYPE*)src, chunkNelem / 2);
if (isComm) {
devFstRecvChan.wait();
devFstSendChan.flush();
if (chunkNelem > 1) {
devSndSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int));
}
}
deviceSyncer.sync(gridDim.x);
dst += chunkNelem / 2;
src += chunkNelem / 2;
vectorSum((TYPE*)dst, (TYPE*)src, chunkNelem - chunkNelem / 2);
if (isComm) {
if (chunkNelem > 1) {
devSndRecvChan.wait();
devSndSendChan.flush();
}
devSndSendChan.putWithSignalAndFlush(offset + chunkNelem / 2 * sizeof(int),
(chunkNelem - chunkNelem / 2) * sizeof(int));
}
// Step n+1 ~ Step 2n-2
for (int i = 1; i < worldSize - 1; ++i) {
if (isComm) {
devSndRecvChan.wait();
}
deviceSyncer.sync(gridDim.x);
// Copy
chunkIndex = (rank + worldSize - i) % worldSize;
if (isComm) {
devSndSendChan.putWithSignalAndFlush(chunkIndex * chunkNelem * sizeof(int), chunkNelem * sizeof(int));
}
}
// Final receive
if (isComm) {
devSndRecvChan.wait();
}
}
// -------------------------------------------
// AllReduce4
// 2-node
// -------------------------------------------
__device__ void localReduceScatterSm(mscclpp::SmChannelDeviceHandle* smChans, TYPE* buff, int rank, int nRanksPerNode,
int startChunkIndex, size_t offsetInChunk, size_t chunkSize, size_t nelems,
int nBlocks) {
if (nRanksPerNode == 1) return;
if (blockIdx.x >= nBlocks) return;
const int nPeer = nRanksPerNode - 1;
const size_t localRankIndexInNode = rank % nRanksPerNode;
const size_t indexOffset = ((localRankIndexInNode + startChunkIndex) * chunkSize + offsetInChunk);
const size_t indexOffset4 = indexOffset / 4;
int4* buff4 = (int4*)buff;
for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) {
smChans[peerIdx].relaxedSignal();
}
for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) {
smChans[peerIdx].wait();
}
reduceScatterDeviceSyncer.sync(nBlocks);
const size_t nInt4 = nelems / 4;
for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * nBlocks) {
int4 tmp = buff4[indexOffset4 + idx];
for (int index = 0; index < nPeer; ++index) {
int4 val;
int peerIdx = index + localRankIndexInNode;
if (peerIdx >= nPeer) peerIdx -= nPeer;
val = smChans[peerIdx].read<int4>(indexOffset4 + idx);
tmp = add_vectors<TYPE>(tmp, val);
}
buff4[indexOffset4 + idx] = tmp;
}
// TODO: deal with rest elements
}
// This kernel is the most performant when the number of blocks is a multiple of (nRanksPerNode - 1).
__device__ void localAllGatherSm(mscclpp::SmChannelDeviceHandle* smChans, int rank, int nRanksPerNode,
int startRankChunkIndex, uint64_t offsetInRankChunk, uint64_t rankChunkSize,
uint64_t size, size_t nBlocks) {
if (nRanksPerNode == 1) return;
if (blockIdx.x >= nBlocks) return;
const size_t nPeer = nRanksPerNode - 1;
const size_t peerIdx = blockIdx.x % nPeer;
const size_t nBlockForThisPeer = nBlocks / nPeer + (nBlocks % nPeer > peerIdx ? 1 : 0);
const size_t peerLocalBlockIdx = blockIdx.x / nPeer;
const size_t rankLocalIndex = rank % nRanksPerNode;
const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1);
// Split the data into chunks for aligned data access. Ignore the remainder here and let the last block handle it.
constexpr size_t chunkBytes = 128; // heuristic value
const size_t nChunk = size / chunkBytes;
const size_t nMinChunkPerBlock = nChunk / nBlockForThisPeer;
const size_t nRemainderChunk = nChunk % nBlockForThisPeer;
// Distribute chunks to blocks
size_t nChunkForThisBlock;
size_t offsetForThisBlock;
if (peerLocalBlockIdx < nRemainderChunk) {
nChunkForThisBlock = nMinChunkPerBlock + 1;
offsetForThisBlock = (nMinChunkPerBlock + 1) * peerLocalBlockIdx;
} else {
nChunkForThisBlock = nMinChunkPerBlock;
offsetForThisBlock =
(nMinChunkPerBlock + 1) * nRemainderChunk + (peerLocalBlockIdx - nRemainderChunk) * nMinChunkPerBlock;
}
offsetForThisBlock *= chunkBytes;
// Calculate the size of the data for this block
size_t sizeForThisBlock = nChunkForThisBlock * chunkBytes;
const size_t lastChunkSize = size - nChunk * chunkBytes;
if (lastChunkSize > 0 && peerLocalBlockIdx == nBlockForThisPeer - 1) {
sizeForThisBlock += lastChunkSize;
}
if (threadIdx.x == 0 && peerLocalBlockIdx == 0) {
smChans[peerIdx].relaxedSignal();
smChans[peerIdx].wait();
}
allGatherDeviceSyncer.sync(nBlocks);
size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk;
smChans[peerIdx].get(offset + offsetForThisBlock, sizeForThisBlock, threadIdx.x, blockDim.x);
}
__device__ void localAllGatherAllPairsSm(mscclpp::SmChannelDeviceHandle* smChans, int rank, int nRanksPerNode,
uint64_t size, size_t nBlocks) {
if (nRanksPerNode == 1) return;
if (blockIdx.x >= nBlocks) return;
int tid = threadIdx.x + blockIdx.x * blockDim.x;
const int nPeer = nRanksPerNode - 1;
if (tid < nPeer) {
smChans[tid].signal();
}
int waitStart = nBlocks * blockDim.x - nPeer;
if (tid >= waitStart && tid < nBlocks * blockDim.x) {
smChans[tid - waitStart].wait();
}
allGatherDeviceSyncer.sync(nBlocks);
for (int i = 0; i < nPeer; ++i) {
int peerIdx = (i + rank) % nPeer;
const int remoteRankLocalIndex = (peerIdx < rank ? peerIdx : peerIdx + 1);
size_t offset = size * remoteRankLocalIndex;
smChans[peerIdx].get(offset, size, tid, blockDim.x * nBlocks);
}
}
// This is an allgather4 equivalent
__device__ void allGatherSm(mscclpp::SmChannelDeviceHandle* smChans,
mscclpp::SimpleProxyChannelDeviceHandle* proxyChans, int rank, int worldSize,
int nRanksPerNode, size_t nelemsPerGPU, int pipelineDepth) {
// this allgather is a pipelined and hierarchical one and only works for two nodes
// it is implemented as follows:
// Step 1: each node does a local allgather and concurrently,
// local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with
// its cross-node neighbor (local GPU i on the other node) via IB
// Step 2: each node does a local allgather again with the data just received from its
// cross-node neighbor in step 1, and concurrently, exchange the rest of the data with
// its cross-node neighbor
// Step 3: each node does a local allgather for the last time with the rest of the data
int pipelineSize = pipelineDepth;
int peerRank = (rank + nRanksPerNode) % worldSize;
int peerNodeId = peerRank / nRanksPerNode;
int peer = (peerRank < rank) ? peerRank : peerRank - 1;
mscclpp::SimpleProxyChannelDeviceHandle proxyChan = proxyChans[peer];
const size_t nBlocksForLocalAllGather = gridDim.x / (nRanksPerNode - 1) * (nRanksPerNode - 1);
const size_t rankChunkSize = nelemsPerGPU * sizeof(int);
const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode;
const int startRankIndexInPeerNode = (peerRank / nRanksPerNode) * nRanksPerNode;
if (peerNodeId == rank / nRanksPerNode) {
localAllGatherSm(smChans, rank, nRanksPerNode, 0, 0, rankChunkSize, rankChunkSize, gridDim.x);
return;
}
constexpr size_t alignment = 128;
size_t step1Bytes = (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int);
step1Bytes = step1Bytes / alignment * alignment;
const size_t step2Bytes = nelemsPerGPU * sizeof(int) - step1Bytes;
// Step 1
if (threadIdx.x == 0 && blockIdx.x == 0 && step1Bytes > 0) {
proxyChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), step1Bytes);
}
localAllGatherSm(smChans, rank, nRanksPerNode, startRankIndexInLocalNode, 0, rankChunkSize, rankChunkSize,
nBlocksForLocalAllGather);
if (threadIdx.x == 0 && blockIdx.x == 0 && step1Bytes > 0) {
proxyChan.wait();
proxyChan.flush();
}
deviceSyncer.sync(gridDim.x);
// Step 2
if (threadIdx.x == 0 && blockIdx.x == 0) {
proxyChan.putWithSignal(rank * nelemsPerGPU * sizeof(int) + step1Bytes, step2Bytes);
}
if (step1Bytes > 0)
localAllGatherSm(smChans, rank, nRanksPerNode, startRankIndexInPeerNode, 0, rankChunkSize, step1Bytes,
nBlocksForLocalAllGather);
if (threadIdx.x == 0 && blockIdx.x == 0) {
proxyChan.wait();
proxyChan.flush();
}
deviceSyncer.sync(gridDim.x);
// Step 3
localAllGatherSm(smChans, rank, nRanksPerNode, startRankIndexInPeerNode, step1Bytes, rankChunkSize, step2Bytes,
nBlocksForLocalAllGather);
}
__device__ void reduceScatterSm(mscclpp::SmChannelDeviceHandle* smChans,
mscclpp::SimpleProxyChannelDeviceHandle* proxyChans, TYPE* buff, TYPE* scratch,
int rank, int nRanksPerNode, int worldSize,
size_t nelems, // must be divisible by 3
int pipelineDepth) {
// this reduce-scatter algorithm works as follows:
// Step 1: each node does a local reduce-scatter on peer node data chunks with 1/pipeline portion of chunk data. For
// example, 2 nodes and each node has 2 ranks. rank 0 and rank 1 perform reduce-scatter on chunk 2 and chunk 3, with
// 1/pipeline portion of the data.
// Step 2: each node does a local reduce-scatter on peers data chunks with (pipeline-1)/pipeline portion of chunk
// data. Meanwhile, exchange the reduced data of the previous step with its cross-node neighbor (same local rank
// number on the other node) via IB. Then performs a reduce operation.
// Step 3: each node does a local reduce-scatter on local ranks, meanwhile exchange the reduced data of the previous
// step with its cross-node neighbor (same local rank number on the other node) via IB. Then performs a reduce
// operation.
int pipelineSize = pipelineDepth;
float nBlocksForReduceScatterRatio = 0.8;
const size_t chunkSize = nelems / worldSize;
const int peerRank = (rank + nRanksPerNode) % worldSize;
int peerNodeId = peerRank / nRanksPerNode;
int nBlocksForReduceScatter =
(int)(nBlocksForReduceScatterRatio * gridDim.x) / (nRanksPerNode - 1) * (nRanksPerNode - 1);
int isComm = (threadIdx.x == 0) && (blockIdx.x == nBlocksForReduceScatter);
int peer = (peerRank < rank) ? peerRank : peerRank - 1;
int nBlocksRemain = gridDim.x - nBlocksForReduceScatter;
mscclpp::SimpleProxyChannelDeviceHandle proxyChan = proxyChans[peer];
if (peerNodeId == rank / nRanksPerNode) {
localReduceScatterSm(smChans, buff, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize, gridDim.x);
return;
}
// step 1: local reduce
int startChunkIndex = peerNodeId * nRanksPerNode;
localReduceScatterSm(smChans, buff, rank, nRanksPerNode, startChunkIndex, 0, chunkSize, chunkSize / pipelineSize,
nBlocksForReduceScatter);
deviceSyncer.sync(gridDim.x);
// step 2: local reduce and exchange data with neighbor
if (isComm) {
size_t offset = (peerRank * chunkSize) * sizeof(int);
// opposite side
proxyChan.putWithSignal(offset, (chunkSize / pipelineSize * sizeof(int)));
}
if (pipelineSize > 1)
localReduceScatterSm(smChans, buff, rank, nRanksPerNode, startChunkIndex, chunkSize / pipelineSize, chunkSize,
(pipelineSize - 1) * chunkSize / pipelineSize, nBlocksForReduceScatter);
if (isComm) {
proxyChan.wait();
}
if (blockIdx.x >= nBlocksForReduceScatter) {
ibDeviceSyncer.sync(nBlocksRemain);
// reduce data received from peer to related rank
size_t offset = rank * chunkSize * sizeof(int);
int* dst = (int*)((char*)buff + offset);
int* src = (int*)((char*)scratch + offset);
vectorSum((TYPE*)dst, (TYPE*)src, chunkSize / pipelineSize, blockIdx.x - nBlocksForReduceScatter, nBlocksRemain);
}
if (isComm) {
proxyChan.flush();
}
deviceSyncer.sync(gridDim.x);
// step 3: local reduce and exchange data with neighbor
startChunkIndex = (rank / nRanksPerNode) * nRanksPerNode;
if (isComm && pipelineSize > 1) {
size_t offset = (peerRank * chunkSize + chunkSize / pipelineSize) * sizeof(int);
proxyChan.putWithSignal(offset, (pipelineSize - 1) * chunkSize / pipelineSize * sizeof(int));
}
localReduceScatterSm(smChans, buff, rank, nRanksPerNode, startChunkIndex, 0, chunkSize, chunkSize,
nBlocksForReduceScatter);
if (isComm && pipelineSize > 1) {
proxyChan.wait();
}
deviceSyncer.sync(gridDim.x);
// reduce to related rank, can not overlap since localReduceScatter also calculate the sum
size_t offset = (rank * chunkSize + chunkSize / pipelineSize) * sizeof(int);
int* dst = (int*)((char*)buff + offset);
int* src = (int*)((char*)scratch + offset);
if (pipelineSize > 1) vectorSum((TYPE*)dst, (TYPE*)src, (pipelineSize - 1) * chunkSize / pipelineSize);
if (isComm) {
proxyChan.flush();
}
}
extern "C" __global__ void __launch_bounds__(1024, 1) __global__
allreduce4(mscclpp::SmChannelDeviceHandle* smChans,
mscclpp::SimpleProxyChannelDeviceHandle* reduceScatterProxyChans,
mscclpp::SimpleProxyChannelDeviceHandle* allGatherProxyChans, TYPE* buff, TYPE* scratch, int rank,
int nRanksPerNode, int worldSize, size_t nelems, int pipelineDepth) {
nelems = nelems / (sizeof(int) / sizeof(TYPE));
reduceScatterSm(smChans, reduceScatterProxyChans, buff, scratch, rank, nRanksPerNode, worldSize, nelems,
pipelineDepth);
deviceSyncer.sync(gridDim.x);
allGatherSm(smChans, allGatherProxyChans, rank, worldSize, nRanksPerNode, nelems / worldSize, pipelineDepth);
}
// allreduce 5 for 2-nodes
extern "C" __global__ void __launch_bounds__(1024, 1)
allreduce5(mscclpp::SmChannelDeviceHandle* smChans, mscclpp::SimpleProxyChannelDeviceHandle* proxyChans, TYPE* buff,
TYPE* scratch, TYPE* putBuff, TYPE* resultBuff, int rank, int nRanksPerNode, int worldSize,
size_t nelems) {
nelems = nelems / (sizeof(int) / sizeof(TYPE));
// This version of allreduce only works for single nodes
const int nPeersInNode = nRanksPerNode - 1;
const int nPkts = nelems / 2;
const int nelemsPerLocalRank = nelems / nRanksPerNode;
const int nPktsPerLocalRank = nelemsPerLocalRank / 2;
const int localRankId = rank % nRanksPerNode;
// flag for packets. Initially 1
const uint32_t flag = (uint32_t)globalFlag;
// thread block & channel info
const int nBlocksPerPeer = gridDim.x / nPeersInNode;
const int localBlockIdx = blockIdx.x % nBlocksPerPeer;
const int peerIdx = blockIdx.x / nBlocksPerPeer;
const int remoteRankIdx = peerIdx < localRankId ? peerIdx : peerIdx + 1;
mscclpp::SmChannelDeviceHandle smChan = smChans[peerIdx];
mscclpp::SimpleProxyChannelDeviceHandle proxyChan = proxyChans[localRankId];
const int tid = threadIdx.x + localBlockIdx * blockDim.x;
// double buffering
size_t scratchBaseOffset = (flag & 1) ? 0 : nPkts * sizeof(mscclpp::LLPacket);
size_t putBaseOffset = (flag & 1) ? 0 : nPktsPerLocalRank * sizeof(mscclpp::LLPacket);
void* scratchBuff = (void*)((char*)scratch + scratchBaseOffset);
size_t scratchOffset = scratchBaseOffset + localRankId * nPktsPerLocalRank * sizeof(mscclpp::LLPacket);
size_t scratchResultOffset =
(flag & 1) ? 2 * nPkts * sizeof(mscclpp::LLPacket) : 3 * nPkts * sizeof(mscclpp::LLPacket);
size_t srcOffset = remoteRankIdx * nelemsPerLocalRank * sizeof(int);
uint2* src = (uint2*)((char*)buff + localRankId * nelemsPerLocalRank * sizeof(int));
uint2* dst = (uint2*)((char*)resultBuff + localRankId * nelemsPerLocalRank * sizeof(int));
// step 1: write to scratch buffer
if (nRanksPerNode > 1) {
smChan.putPackets(scratchOffset, srcOffset, nelemsPerLocalRank * sizeof(int), tid, blockDim.x * nBlocksPerPeer,
flag);
}
// step 2: get data from scratch buffer, do local reduce-scatter in each node.
mscclpp::LLPacket* putPkt = (mscclpp::LLPacket*)((char*)putBuff + putBaseOffset);
for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerLocalRank; idx += blockDim.x * gridDim.x) {
uint2 data = make_uint2(0, 0);
for (int index = 0; index < nPeersInNode; index++) {
const int remoteRank = index < localRankId ? index : index + 1;
mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)scratchBuff + remoteRank * nPktsPerLocalRank;
uint2 val = dstPkt[idx].read(flag);
data = add_vectors<TYPE>(val, data);
}
data = add_vectors<TYPE>(data, src[idx]);
putPkt[idx].write(data.x, data.y, flag);
dst[idx] = data;
}
deviceSyncer.sync(gridDim.x);
// step 3. send local reduced data to remote node.
if (threadIdx.x == 0 && blockIdx.x == 0) {
proxyChan.put(scratchOffset, putBaseOffset, nPktsPerLocalRank * sizeof(mscclpp::LLPacket));
if ((flag & 63) == 0) {
proxyChan.flush();
}
}
// step 4. try to read the data from scratch buffer and write to local peers
mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)scratchBuff + localRankId * nPktsPerLocalRank;
for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerLocalRank; idx += blockDim.x * gridDim.x) {
uint2 res = dst[idx];
uint2 val = dstPkt[idx].read(flag);
res = add_vectors<TYPE>(res, val);
mscclpp::LLPacket packet;
packet.data1 = res.x;
packet.flag1 = flag;
packet.data2 = res.y;
packet.flag2 = flag;
size_t offset = scratchResultOffset / sizeof(mscclpp::LLPacket) + (idx + localRankId * nPktsPerLocalRank);
for (int index = 0; index < nPeersInNode; index++) {
smChans[index].write(offset, packet);
}
dst[idx] = res;
}
// step 5: get data result from scratch buffer
dstPkt = (mscclpp::LLPacket*)((char*)scratch + scratchResultOffset);
const int dstOffset = remoteRankIdx * nPktsPerLocalRank;
uint2* result = (uint2*)((char*)resultBuff + remoteRankIdx * nelemsPerLocalRank * sizeof(int));
if (nRanksPerNode > 1) {
for (int idx = threadIdx.x + localBlockIdx * blockDim.x; idx < nPktsPerLocalRank;
idx += blockDim.x * nBlocksPerPeer) {
uint2 data = dstPkt[idx + dstOffset].read(flag);
result[idx] = data;
}
}
if (threadIdx.x == 0 && blockIdx.x == 0) {
globalFlag += 1;
}
}

View File

@@ -0,0 +1,215 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import cupy as cp
from mscclpp_op import MscclppAllReduce1, MscclppAllReduce2, MscclppAllReduce3, MscclppAllReduce4, MscclppAllReduce5
from nccl_op import NcclAllReduce
from mpi4py import MPI
import cupy.cuda.nccl as nccl
import mscclpp.comm as mscclpp_comm
from mscclpp import ProxyService
from prettytable import PrettyTable
import netifaces as ni
data_type = cp.float16
if data_type == cp.float16:
dtype_str = "fp16"
elif data_type == cp.float32:
dtype_str = "fp32"
elif data_type == cp.int32:
dtype_str = "int32"
else:
raise RuntimeError("Unknown data type")
def human_readable_size(size, decimal_places=1):
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]:
if size < 1024.0 or unit == "PiB":
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def check_correctness(memory, func):
rand_gen = cp.random.default_rng(seed=MPI.COMM_WORLD.rank)
memory[:] = rand_gen.random(memory.shape).astype(data_type)
cp.cuda.runtime.deviceSynchronize()
output_memory = func(0)
cp.cuda.runtime.deviceSynchronize()
expected = cp.zeros_like(memory)
for i in range(MPI.COMM_WORLD.size):
rand_gen = cp.random.default_rng(seed=i)
expected += rand_gen.random(memory.shape).astype(data_type)
if data_type == cp.float16:
ac = cp.allclose(output_memory, expected, rtol=1.0e-2, atol=1.0e-4)
else:
ac = cp.allclose(output_memory, expected, rtol=1.0e-2, atol=1.0e-4)
ac = MPI.COMM_WORLD.allreduce(ac, op=MPI.SUM)
if not ac:
print(output_memory, expected)
return ac
def bench_time(niter: int, func):
# capture cuda graph for nites of the kernel launch
stream = cp.cuda.Stream(non_blocking=True)
with stream:
stream.begin_capture()
for i in range(niter):
func(stream.ptr)
graph = stream.end_capture()
# now run a warm up round
graph.launch(stream)
# now run the benchmark and measure time
start = cp.cuda.Event()
end = cp.cuda.Event()
start.record(stream)
graph.launch(stream)
end.record(stream)
end.synchronize()
return cp.cuda.get_elapsed_time(start, end) / niter * 1000.0
def find_best_config(mscclpp_call, niter):
best_time = 10000000.0
for config in mscclpp_call.auto_tune():
cur_time = bench_time(niter, mscclpp_call)
if cur_time < best_time:
best_time = cur_time
best_config = config
if MPI.COMM_WORLD.rank == 0:
print("t", end="", flush=True)
best_config = MPI.COMM_WORLD.bcast(best_config, root=0)
if MPI.COMM_WORLD.rank == 0:
print(best_config, end="", flush=True)
return best_config
def run_benchmark(
mscclpp_group: mscclpp_comm.CommGroup, nccl_op: nccl.NcclCommunicator, table: PrettyTable, niter: int, nelem: int
):
memory = cp.zeros(nelem, dtype=data_type)
memory_out = cp.zeros(nelem, dtype=data_type)
cp.cuda.runtime.deviceSynchronize()
if MPI.COMM_WORLD.size // N_GPUS_PER_NODE == 1:
if memory.nbytes < 2**20:
mscclpp_call = MscclppAllReduce2(mscclpp_group, memory, memory_out)
elif memory.nbytes < 2**29:
if memory.nbytes >= 2**20 and memory.nbytes <= 2**22:
read_only = 0
else:
read_only = 1
mscclpp_call = MscclppAllReduce1(mscclpp_group, memory, read_only=read_only)
else:
proxy_service = ProxyService()
mscclpp_call = MscclppAllReduce3(mscclpp_group, memory, proxy_service)
proxy_service.start_proxy()
else:
if memory.nbytes < 2**22:
proxy_service = ProxyService()
mscclpp_call = MscclppAllReduce5(mscclpp_group, memory, memory_out, N_GPUS_PER_NODE, proxy_service)
proxy_service.start_proxy()
best_config = find_best_config(mscclpp_call, 100)
mscclpp_call.set_params(*best_config)
else:
proxy_service = ProxyService()
mscclpp_call = MscclppAllReduce4(mscclpp_group, memory, N_GPUS_PER_NODE, proxy_service)
proxy_service.start_proxy()
best_config = find_best_config(mscclpp_call, 20)
mscclpp_call.set_params(*best_config)
nccl_call = NcclAllReduce(nccl_op, memory)
memory_nbytes = memory.nbytes
mscclpp_time = bench_time(niter, mscclpp_call)
mscclpp_algBw = memory_nbytes / mscclpp_time / 1e3
mscclpp_check = "PASS" if check_correctness(memory, mscclpp_call) else "FAIL"
nccl_time = bench_time(niter, nccl_call)
nccl_algBw = memory_nbytes / nccl_time / 1e3
nccl_check = "PASS" if check_correctness(memory, nccl_call) else "FAIL"
if (
isinstance(mscclpp_call, MscclppAllReduce3)
or isinstance(mscclpp_call, MscclppAllReduce5)
or isinstance(mscclpp_call, MscclppAllReduce4)
):
MPI.COMM_WORLD.barrier()
proxy_service.stop_proxy()
if MPI.COMM_WORLD.rank == 0:
table.add_row(
[
human_readable_size(memory_nbytes),
"{:.2f}".format(mscclpp_time),
"{:.2f}".format(mscclpp_algBw),
mscclpp_check,
"{:.2f}".format(nccl_time),
"{:.2f}".format(nccl_algBw),
nccl_check,
"{:.2f}".format(nccl_time / mscclpp_time),
]
)
if MPI.COMM_WORLD.rank == 0:
print(".", end="", flush=True)
if __name__ == "__main__":
shm_comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED, 0, MPI.INFO_NULL)
N_GPUS_PER_NODE = shm_comm.size
shm_comm.Free()
cp.cuda.Device(MPI.COMM_WORLD.rank % N_GPUS_PER_NODE).use()
# create a MscclppGroup
network_interface = "eth0"
my_ip = ni.ifaddresses(network_interface)[ni.AF_INET][0]["addr"]
root_ip = MPI.COMM_WORLD.bcast(my_ip, root=0)
ifIpPortTrio = network_interface + ":" + root_ip + ":50000" # some random port
mscclpp_group = mscclpp_comm.CommGroup(
interfaceIpPortTrio=ifIpPortTrio, rank=MPI.COMM_WORLD.rank, size=MPI.COMM_WORLD.size
)
# create a NcclComm
if MPI.COMM_WORLD.rank == 0:
uid = nccl.get_unique_id()
else:
uid = None
uid = MPI.COMM_WORLD.bcast(uid, root=0)
nccl_comm = nccl.NcclCommunicator(MPI.COMM_WORLD.size, uid, MPI.COMM_WORLD.rank)
table = None
if MPI.COMM_WORLD.rank == 0:
# Set table headers
table = PrettyTable()
table.field_names = [
f"Size ({dtype_str})",
"Time (us)",
"AlgBW (GB/s)",
"Correctness",
"NCCL Time (us)",
"NCCL AlgBW (GB/s)",
"NCCL Correctness",
"Speed Up",
]
for i in range(10, 28):
if MPI.COMM_WORLD.size // N_GPUS_PER_NODE == 1:
run_benchmark(mscclpp_group, nccl_comm, table, 100, 2**i)
elif MPI.COMM_WORLD.size // N_GPUS_PER_NODE == 2:
run_benchmark(mscclpp_group, nccl_comm, table, 100, 3 * 2**i)
else:
raise RuntimeError("Only support one node/two nodes communication")
if MPI.COMM_WORLD.rank == 0:
print()
print(table)
mscclpp_group = None
nccl_comm = None

View File

@@ -0,0 +1,344 @@
import os
import cupy as cp
import ctypes
from mscclpp import Transport, ProxyService
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, pack
IB_TRANSPORTS = [
Transport.IB0,
Transport.IB1,
Transport.IB2,
Transport.IB3,
Transport.IB4,
Transport.IB5,
Transport.IB6,
Transport.IB7,
]
def type_to_str(dtype):
if dtype == cp.float16:
return "__half"
elif dtype == cp.float32:
return "float"
elif dtype == cp.int32:
return "int"
else:
raise RuntimeError("Unknown data type")
class MscclppAllReduce1:
def __init__(
self,
group: mscclpp_comm.CommGroup,
memory: cp.ndarray,
read_only: int = 1,
nthreads: int = 1024,
nblocks: int = 24,
):
self.group = group
self.memory = memory
remote_nghrs = list(range(self.group.nranks))
remote_nghrs.remove(self.group.my_rank)
self.group.barrier()
# create a connection for each remote neighbor
self.connections = self.group.make_connection(remote_nghrs, Transport.CudaIpc)
type_str = type_to_str(memory.dtype)
# create a sm_channel for each remote neighbor
self.sm_channels = self.group.make_sm_channels(self.memory, self.connections)
file_dir = os.path.dirname(os.path.abspath(__file__))
self.kernel = KernelBuilder(
file="allreduce.cu",
kernel_name="allreduce1",
file_dir=file_dir,
macro_dict={"TYPE": type_str, "READ_ONLY": str(read_only)},
).get_compiled_kernel()
self.params = b""
self.device_handles = []
for rank in range(self.group.nranks):
if rank != self.group.my_rank:
self.device_handles.append(self.sm_channels[rank].device_handle().raw)
self.params += pack(
cp.asarray(memoryview(b"".join(self.device_handles)), dtype=cp.uint8),
self.memory,
self.group.my_rank,
self.group.nranks,
ctypes.c_size_t(self.memory.size),
)
self.nthreads = nthreads
self.nblocks = nblocks
def __call__(self, stream_ptr):
self.kernel.launch_kernel(self.params, self.nblocks, self.nthreads, 0, stream_ptr)
return self.memory
class MscclppAllReduce2:
def __init__(self, group: mscclpp_comm.CommGroup, memory: cp.ndarray, memory_out: cp.ndarray):
self.group = group
self.memory = memory
self.memory_out = memory_out
remote_nghrs = list(range(self.group.nranks))
remote_nghrs.remove(self.group.my_rank)
self.group.barrier()
# create a connection for each remote neighbor
self.connections = self.group.make_connection(remote_nghrs, Transport.CudaIpc)
type_str = type_to_str(memory.dtype)
self.scratch = cp.zeros(self.memory.size * 8, dtype=self.memory.dtype)
# create a sm_channel for each remote neighbor
self.sm_channels = self.group.make_sm_channels_with_scratch(self.memory, self.scratch, self.connections)
file_dir = os.path.dirname(os.path.abspath(__file__))
self.kernel = KernelBuilder(
file="allreduce.cu", kernel_name="allreduce2", file_dir=file_dir, macro_dict={"TYPE": type_str}
).get_compiled_kernel()
self.params = b""
self.device_handles = []
for rank in range(self.group.nranks):
if rank != self.group.my_rank:
self.device_handles.append(self.sm_channels[rank].device_handle().raw)
self.params += pack(
cp.asarray(memoryview(b"".join(self.device_handles)), dtype=cp.uint8),
self.memory,
self.scratch,
self.memory_out,
self.group.my_rank,
self.group.nranks,
ctypes.c_size_t(self.memory.size),
)
def __call__(self, stream_ptr):
self.kernel.launch_kernel(self.params, 21, 512, 0, stream_ptr)
return self.memory_out
class MscclppAllReduce3:
def __init__(self, group: mscclpp_comm.CommGroup, memory: cp.ndarray, proxy_service: ProxyService):
self.group = group
self.memory = memory
remote_nghrs = list(range(self.group.nranks))
remote_nghrs.remove(self.group.my_rank)
self.group.barrier()
# create a connection for each remote neighbor
self.connections = self.group.make_connection(remote_nghrs, Transport.CudaIpc)
type_str = type_to_str(memory.dtype)
self.proxy_service = proxy_service
self.scratch = cp.zeros(self.memory.size, dtype=self.memory.dtype)
# create a sm_channel for each remote neighbor
self.fst_round_proxy_chans = self.group.make_proxy_channels_with_scratch(
self.proxy_service, self.memory, self.scratch, self.connections
)
self.snd_round_proxy_chans = self.group.make_proxy_channels(self.proxy_service, self.memory, self.connections)
file_dir = os.path.dirname(os.path.abspath(__file__))
self.kernel = KernelBuilder(
file="allreduce.cu", kernel_name="allreduce3", file_dir=file_dir, macro_dict={"TYPE": type_str}
).get_compiled_kernel()
self.params = b""
self.fst_device_handles = []
self.snd_device_handles = []
for rank in range(self.group.nranks):
if rank != self.group.my_rank:
self.fst_device_handles.append(self.fst_round_proxy_chans[rank].device_handle().raw)
self.snd_device_handles.append(self.snd_round_proxy_chans[rank].device_handle().raw)
self.params += pack(
cp.asarray(memoryview(b"".join(self.fst_device_handles)), dtype=cp.uint8),
cp.asarray(memoryview(b"".join(self.snd_device_handles)), dtype=cp.uint8),
self.memory,
self.scratch,
self.group.my_rank,
self.group.nranks,
ctypes.c_size_t(self.memory.size),
)
def __call__(self, stream_ptr):
self.kernel.launch_kernel(self.params, 24, 1024, 0, stream_ptr)
return self.memory
class MscclppAllReduce4:
def __init__(
self,
group: mscclpp_comm.CommGroup,
memory: cp.ndarray,
nranks_per_node: int,
proxy_service: ProxyService,
nblocks: int = 45,
block_size: int = 512,
pipeline_depth: int = 3,
):
self.group = group
self.memory = memory
self.nranks_per_node = nranks_per_node
in_same_node = lambda rank: rank // nranks_per_node == self.group.my_rank // nranks_per_node
remote_nghrs = list(range(self.group.nranks))
remote_nghrs.remove(self.group.my_rank)
transports = {}
for rank in remote_nghrs:
if in_same_node(rank):
transports[rank] = Transport.CudaIpc
else:
transports[rank] = IB_TRANSPORTS[rank % nranks_per_node]
self.group.barrier()
# create a connection for each remote neighbor
self.connections = self.group.make_connection(remote_nghrs, transports)
type_str = type_to_str(memory.dtype)
self.proxy_service = proxy_service
self.scratch = cp.zeros(self.memory.size, dtype=self.memory.dtype)
same_node_connections = {rank: conn for rank, conn in self.connections.items() if in_same_node(rank)}
# create a sm_channel for each remote neighbor
self.sm_channels = self.group.make_sm_channels(self.memory, same_node_connections)
self.reduce_scatter_proxy_channels = self.group.make_proxy_channels_with_scratch(
self.proxy_service, self.memory, self.scratch, self.connections
)
self.all_gather_proxy_channels = self.group.make_proxy_channels(
self.proxy_service, self.memory, self.connections
)
file_dir = os.path.dirname(os.path.abspath(__file__))
self.kernel = KernelBuilder(
file="allreduce.cu", kernel_name="allreduce4", file_dir=file_dir, macro_dict={"TYPE": type_str}
).get_compiled_kernel()
self.sm_device_handles = []
self.reduce_sactter_proxy_device_handles = []
self.all_gather_proxy_device_handles = []
for rank in range(self.group.nranks):
if rank != self.group.my_rank and in_same_node(rank):
self.sm_device_handles.append(self.sm_channels[rank].device_handle().raw)
if rank != self.group.my_rank:
self.reduce_sactter_proxy_device_handles.append(
self.reduce_scatter_proxy_channels[rank].device_handle().raw
)
self.all_gather_proxy_device_handles.append(self.all_gather_proxy_channels[rank].device_handle().raw)
self.set_params(nblocks, block_size, pipeline_depth)
def __call__(self, stream_ptr):
self.kernel.launch_kernel(self.params, self.nblocks, self.block_size, 0, stream_ptr)
return self.memory
def set_params(self, nblocks, block_size, pipeline_depth):
self.nblocks = nblocks
self.block_size = block_size
self.pipeline_depth = pipeline_depth
self.params = b""
self.params += pack(
cp.asarray(memoryview(b"".join(self.sm_device_handles)), dtype=cp.uint8),
cp.asarray(memoryview(b"".join(self.reduce_sactter_proxy_device_handles)), dtype=cp.uint8),
cp.asarray(memoryview(b"".join(self.all_gather_proxy_device_handles)), dtype=cp.uint8),
self.memory,
self.scratch,
self.group.my_rank,
self.nranks_per_node,
self.group.nranks,
bytes(4), # padding for memory alignment
ctypes.c_size_t(self.memory.size),
self.pipeline_depth,
)
def auto_tune(self):
nblocks_to_try = [24, 32, 40, 45, 48, 64, 72, 90, 96, 108]
block_size_to_try = [256, 512, 1024]
pipeline_depth_to_try = [1, 2, 3, 4]
for nblocks in nblocks_to_try:
for block_size in block_size_to_try:
for pipeline_depth in pipeline_depth_to_try:
self.set_params(nblocks, block_size, pipeline_depth)
yield nblocks, block_size, pipeline_depth
class MscclppAllReduce5:
def __init__(
self,
group: mscclpp_comm.CommGroup,
memory: cp.ndarray,
memory_out: cp.ndarray,
nranks_per_node: int,
proxy_service: ProxyService,
nblocks: int = 21,
block_size: int = 512,
):
self.group = group
self.memory = memory
self.memory_out = memory_out
self.nranks_per_node = nranks_per_node
in_same_node = lambda rank: rank // nranks_per_node == self.group.my_rank // nranks_per_node
remote_nghrs = list(range(self.group.nranks))
remote_nghrs.remove(self.group.my_rank)
transports = {}
for rank in remote_nghrs:
if in_same_node(rank):
transports[rank] = Transport.CudaIpc
else:
transports[rank] = IB_TRANSPORTS[rank % nranks_per_node]
self.group.barrier()
# create a connection for each remote neighbor
self.connections = self.group.make_connection(remote_nghrs, transports)
type_str = type_to_str(memory.dtype)
self.proxy_service = proxy_service
self.scratch = cp.zeros(self.memory.size * 8, dtype=self.memory.dtype)
self.put_buff = cp.zeros(self.memory.size * 8 // nranks_per_node, dtype=self.memory.dtype)
same_node_connections = {rank: conn for rank, conn in self.connections.items() if in_same_node(rank)}
across_node_connections = {rank: conn for rank, conn in self.connections.items() if not in_same_node(rank)}
# create a sm_channel for each remote neighbor
self.sm_channels = self.group.make_sm_channels_with_scratch(self.memory, self.scratch, same_node_connections)
self.proxy_channels = self.group.make_proxy_channels_with_scratch(
self.proxy_service, self.put_buff, self.scratch, across_node_connections
)
file_dir = os.path.dirname(os.path.abspath(__file__))
self.kernel = KernelBuilder(
file="allreduce.cu", kernel_name="allreduce5", file_dir=file_dir, macro_dict={"TYPE": type_str}
).get_compiled_kernel()
self.sm_device_handles = []
self.proxy_device_handles = []
for rank in range(self.group.nranks):
if rank != self.group.my_rank and in_same_node(rank):
self.sm_device_handles.append(self.sm_channels[rank].device_handle().raw)
if rank != self.group.my_rank and not in_same_node(rank):
self.proxy_device_handles.append(self.proxy_channels[rank].device_handle().raw)
self.set_params(nblocks, block_size)
def __call__(self, stream_ptr):
self.kernel.launch_kernel(self.params, self.nblocks, self.block_size, 0, stream_ptr)
return self.memory_out
def set_params(self, nblocks, block_size):
self.nblocks = nblocks
self.block_size = block_size
self.params = b""
self.params += pack(
cp.asarray(memoryview(b"".join(self.sm_device_handles)), dtype=cp.uint8),
cp.asarray(memoryview(b"".join(self.proxy_device_handles)), dtype=cp.uint8),
self.memory,
self.scratch,
self.put_buff,
self.memory_out,
self.group.my_rank,
self.nranks_per_node,
self.group.nranks,
bytes(4), # padding for memory alignment
ctypes.c_size_t(self.memory.size),
)
def auto_tune(self):
nblocks_to_try = [21, 42, 84]
block_size_to_try = [256, 512, 1024]
for nblocks in nblocks_to_try:
for block_size in block_size_to_try:
self.set_params(nblocks, block_size)
yield nblocks, block_size

View File

@@ -0,0 +1,23 @@
import cupy.cuda.nccl as nccl
from mpi4py import MPI
import cupy as cp
class NcclAllReduce:
def __init__(self, nccl_comm: nccl.NcclCommunicator, memory: cp.ndarray):
self.nccl_comm = nccl_comm
self.memory = memory
if memory.dtype == cp.float32:
self.nccl_dtype = nccl.NCCL_FLOAT32
elif memory.dtype == cp.float16:
self.nccl_dtype = nccl.NCCL_FLOAT16
elif memory.dtype == cp.int32:
self.nccl_dtype = nccl.NCCL_INT32
else:
raise RuntimeError("Make sure that the data type is mapped to the correct NCCL data type")
def __call__(self, stream_ptr):
self.nccl_comm.allReduce(
self.memory.data.ptr, self.memory.data.ptr, self.memory.size, self.nccl_dtype, nccl.NCCL_SUM, stream_ptr
)
return self.memory

View File

@@ -23,6 +23,9 @@ from ._mscclpp import (
__version__ = version()
if _os.environ.get("MSCCLPP_HOME", None) is None:
_os.environ["MSCCLPP_HOME"] = _os.path.abspath(_os.path.dirname(__file__))
def get_include():
"""Return the directory that contains the MSCCL++ headers."""

View File

@@ -2,11 +2,10 @@
# Licensed under the MIT license.
from __future__ import annotations
import logging
from typing import Type
import cupy as cp
from mscclpp import (
from ._mscclpp import (
Communicator,
Connection,
Host2DeviceSemaphore,
@@ -20,26 +19,32 @@ from mscclpp import (
Transport,
TransportFlags,
)
import mpi4py
import numpy as np
from .mscclpp_mpi import MpiGroup
logger = logging.getLogger(__name__)
class MscclppGroup:
def __init__(self, mpi_group: MpiGroup, interfaceIpPortTrio=""):
self.bootstrap = TcpBootstrap.create(mpi_group.comm.rank, mpi_group.comm.size)
class CommGroup:
def __init__(
self, mpi_comm: mpi4py.MPI.Comm = None, interfaceIpPortTrio: str = "", rank: int = None, size: int = None
):
if interfaceIpPortTrio == "":
self.bootstrap = TcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
uniq_id = None
if mpi_group.comm.rank == 0:
if mpi_comm.rank == 0:
# similar to NCCL's unique id
uniq_id = self.bootstrap.create_unique_id()
uniq_id_global = mpi_group.comm.bcast(uniq_id, 0)
uniq_id_global = mpi_comm.bcast(uniq_id, 0)
self.bootstrap.initialize(uniq_id_global)
else:
elif mpi_comm:
# use this instead
self.bootstrap = TcpBootstrap.create(mpi_comm.rank, mpi_comm.size)
self.bootstrap.initialize(interfaceIpPortTrio)
elif not interfaceIpPortTrio == "":
assert rank >= 0 and size >= 1
self.bootstrap = TcpBootstrap.create(rank, size)
self.bootstrap.initialize(interfaceIpPortTrio)
else:
raise RuntimeError("Either the interface or mpi_group need to be specified")
self.communicator = Communicator(self.bootstrap)
self.my_rank = self.bootstrap.get_rank()
self.nranks = self.bootstrap.get_n_ranks()
@@ -73,9 +78,15 @@ class MscclppGroup:
else:
assert False # only 8 IBs are supported
def make_connection(self, remote_ranks: list[int], transport: Transport) -> dict[int, Connection]:
def make_connection(
self, remote_ranks: list[int], transports: Transport | dict[int, Transport]
) -> dict[int, Connection]:
connections = {}
for rank in remote_ranks:
if type(transports) is dict:
transport = transports[rank]
else:
transport = transports
connections[rank] = self.communicator.connect_on_setup(rank, 0, transport)
self.communicator.setup()
connections = {rank: connections[rank].get() for rank in connections}
@@ -119,19 +130,19 @@ class MscclppGroup:
channels[rank] = SmChannel(semaphores[rank], registered_memories[rank], tensor.data.ptr)
return channels
def make_sm_channels_with_packet(
self, tensor: cp.ndarray, packetTensor: cp.ndarray, connections: dict[int, Connection]
def make_sm_channels_with_scratch(
self, tensor: cp.ndarray, scratchTensor: cp.ndarray, connections: dict[int, Connection]
) -> dict[int, SmChannel]:
semaphores = self.make_semaphore(connections, SmDevice2DeviceSemaphore)
registered_memories = self.register_tensor_with_connections(packetTensor, connections)
registered_memories = self.register_tensor_with_connections(scratchTensor, connections)
channels = {}
for rank in connections:
channels[rank] = SmChannel(
semaphores[rank], registered_memories[rank], tensor.data.ptr, packetTensor.data.ptr
semaphores[rank], registered_memories[rank], tensor.data.ptr, scratchTensor.data.ptr
)
return channels
def make_proxy_channels_with_packet(
def make_proxy_channels(
self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection]
) -> dict[int, SmChannel]:
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
@@ -148,3 +159,34 @@ class MscclppGroup:
proxy_service.proxy_channel(semaphore_ids[rank]), memory_ids[rank], memory_ids[self.my_rank]
)
return channels
def make_proxy_channels_with_scratch(
self,
proxy_service: ProxyService,
tensor: cp.ndarray,
scratchTensor: cp.ndarray,
connections: dict[int, Connection],
) -> dict[int, SmChannel]:
transport_flags = TransportFlags()
for rank in connections:
transport_flags |= connections[rank].transport()
data_ptr = tensor.data.ptr if isinstance(tensor, cp.ndarray) else tensor.ctypes.data
local_reg_memory = self.communicator.register_memory(data_ptr, tensor.size * tensor.itemsize, transport_flags)
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
registered_memories = self.register_tensor_with_connections(scratchTensor, connections)
memory_ids = {}
semaphore_ids = {}
for rank in registered_memories:
if rank == self.my_rank:
memory_ids[self.my_rank] = proxy_service.add_memory(local_reg_memory)
else:
memory_ids[rank] = proxy_service.add_memory(registered_memories[rank])
for rank in semaphores:
semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank])
channels = {}
for rank in semaphores:
channels[rank] = SimpleProxyChannel(
proxy_service.proxy_channel(semaphore_ids[rank]), memory_ids[rank], memory_ids[self.my_rank]
)
return channels

View File

@@ -74,19 +74,27 @@ class Kernel:
class KernelBuilder:
kernel_map: dict = {}
def __init__(self, file: str, kernel_name: str):
if kernel_name in self.kernel_map:
self._kernel = self.kernel_map[kernel_name]
def get_key(self, kernel_name, macro_dict):
return kernel_name + "-".join(f"{key}={macro_dict[key]}" for key in sorted(macro_dict))
def __init__(self, file: str, kernel_name: str, file_dir: str = None, macro_dict: dict = {}):
kernel_key = self.get_key(kernel_name, macro_dict)
if kernel_key in self.kernel_map:
self._kernel = self.kernel_map[kernel_key]
return
self._tempdir = tempfile.TemporaryDirectory(suffix=f"{os.getpid()}")
self._current_file_dir = os.path.dirname(os.path.abspath(__file__))
self._current_file_dir = file_dir if file_dir else os.path.dirname(os.path.abspath(__file__))
self.macros = None
if file_dir:
self.macros = ["-D{}={}".format(macro, value) for macro, value in macro_dict.items()]
device_id = cp.cuda.Device().id
ptx = self._compile_cuda(os.path.join(self._current_file_dir, file), f"{kernel_name}.ptx", device_id)
self._kernel = Kernel(ptx, kernel_name, device_id)
self.kernel_map[kernel_name] = self._kernel
self.kernel_map[kernel_key] = self._kernel
def _compile_cuda(self, source_file, output_file, device_id, std_version="c++17"):
include_dir = os.path.join(self._current_file_dir, "../../include")
mscclpp_home = os.environ.get("MSCCLPP_HOME", "/usr/local/mscclpp")
include_dir = os.path.join(mscclpp_home, "include")
major = _check_cuda_errors(
cudart.cudaDeviceGetAttribute(cudart.cudaDeviceAttr.cudaDevAttrComputeCapabilityMajor, device_id)
)
@@ -108,12 +116,15 @@ class KernelBuilder:
"-o",
f"{self._tempdir.name}/{output_file}",
]
if self.macros:
command += self.macros
try:
subprocess.run(command, capture_output=True, text=True, check=True, bufsize=1)
with open(f"{self._tempdir.name}/{output_file}", "rb") as f:
return f.read()
except subprocess.CalledProcessError as e:
raise RuntimeError("Compilation failed:", e.stderr, " ".join(command))
print(e.stderr, end="")
raise RuntimeError("Compilation failed: ", " ".join(command))
def get_compiled_kernel(self):
return self._kernel
@@ -128,6 +139,8 @@ def pack(*args):
for arg in list(args):
if isinstance(arg, int):
res += struct.pack("i", arg)
elif isinstance(arg, ctypes.c_size_t):
res += struct.pack("N", arg.value)
elif isinstance(arg, np.ndarray):
res += struct.pack("P", arg.ctypes.data)
elif isinstance(arg, cp.ndarray):
@@ -135,6 +148,8 @@ def pack(*args):
# use int to represent bool, which can avoid CUDA_ERROR_LAUNCH_OUT_OF_RESOURCES error
elif isinstance(arg, bool):
res += struct.pack("i", arg)
elif isinstance(arg, bytes):
res += struct.pack(f"{len(arg)}s", arg)
else:
raise RuntimeError(f"Unsupported type: {type(arg)}")
return res

View File

@@ -0,0 +1,7 @@
mpi4py
cupy-cuda11x
prettytable
cuda-python
netifaces
pytest
numpy

View File

@@ -0,0 +1,7 @@
mpi4py
cupy-cuda12x
prettytable
cuda-python
netifaces
pytest
numpy

View File

@@ -38,10 +38,13 @@ atexit.register(finalize_mpi)
class MpiGroup:
def __init__(self, ranks: list):
def __init__(self, ranks: list = []):
world_group = MPI.COMM_WORLD.group
group = world_group.Incl(ranks)
self.comm = MPI.COMM_WORLD.Create(group)
if len(ranks) == 0:
self.comm = MPI.COMM_WORLD
else:
group = world_group.Incl(ranks)
self.comm = MPI.COMM_WORLD.Create(group)
@pytest.fixture

View File

@@ -1,6 +0,0 @@
cuda-python==12.1.0
mpi4py==3.1.4
netifaces==0.11.0
numpy==1.22.2
pytest==7.2.2
cupy-cuda11x

View File

@@ -1,6 +0,0 @@
cuda-python==12.1.0
mpi4py==3.1.4
netifaces==0.11.0
numpy==1.22.2
pytest==7.2.2
cupy-cuda12x

View File

@@ -2,6 +2,7 @@
# Licensed under the MIT license.
from concurrent.futures import ThreadPoolExecutor
import os
import time
import threading
@@ -11,18 +12,18 @@ import netifaces as ni
import pytest
from mscclpp import (
TcpBootstrap,
Fifo,
Host2DeviceSemaphore,
Host2HostSemaphore,
ProxyService,
SmDevice2DeviceSemaphore,
TcpBootstrap,
Transport,
)
import mscclpp.comm as mscclpp_comm
from mscclpp.utils import KernelBuilder, pack
from ._cpp import _ext
from .mscclpp_group import MscclppGroup
from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group
from .utils import KernelBuilder, pack
ethernet_interface_name = "eth0"
@@ -50,7 +51,7 @@ def test_group_with_ip(mpi_group: MpiGroup, ifIpPortTrio: str):
# ranks are on different nodes
pytest.skip("this case is not supported as localhost will be different for different nodes")
group = MscclppGroup(mpi_group, ifIpPortTrio)
group = mscclpp_comm.CommGroup(mpi_group.comm, ifIpPortTrio)
nelem = 1024
memory = np.zeros(nelem, dtype=np.int32)
@@ -119,7 +120,7 @@ def test_bootstrap_init_gil_release(mpi_group: MpiGroup):
def create_and_connect(mpi_group: MpiGroup, transport: str):
if transport == "NVLink" and all_ranks_on_the_same_node(mpi_group) is False:
pytest.skip("cannot use nvlink for cross node")
group = MscclppGroup(mpi_group)
group = mscclpp_comm.CommGroup(mpi_group.comm)
remote_nghrs = list(range(mpi_group.comm.size))
remote_nghrs.remove(mpi_group.comm.rank)
@@ -278,33 +279,40 @@ class MscclppKernel:
scratch=None,
fifo=None,
):
file_dir = os.path.dirname(os.path.abspath(__file__))
if test_name == "h2d_semaphore":
self._kernel = KernelBuilder(
file="h2d_semaphore_test.cu", kernel_name="h2d_semaphore"
file="h2d_semaphore_test.cu", kernel_name="h2d_semaphore", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "d2d_semaphore":
self._kernel = KernelBuilder(
file="d2d_semaphore_test.cu", kernel_name="d2d_semaphore"
file="d2d_semaphore_test.cu", kernel_name="d2d_semaphore", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "sm_channel":
self._kernel = KernelBuilder(file="sm_channel_test.cu", kernel_name="sm_channel").get_compiled_kernel()
self._kernel = KernelBuilder(
file="sm_channel_test.cu", kernel_name="sm_channel", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = nranks
self.nthreads = 1024
elif test_name == "fifo":
self._kernel = KernelBuilder(file="fifo_test.cu", kernel_name="fifo").get_compiled_kernel()
self._kernel = KernelBuilder(
file="fifo_test.cu", kernel_name="fifo", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1
elif test_name == "proxy":
self._kernel = KernelBuilder(file="proxy_test.cu", kernel_name="proxy").get_compiled_kernel()
self._kernel = KernelBuilder(
file="proxy_test.cu", kernel_name="proxy", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = nranks
elif test_name == "simple_proxy_channel":
self._kernel = KernelBuilder(
file="simple_proxy_channel_test.cu", kernel_name="simple_proxy_channel"
file="simple_proxy_channel_test.cu", kernel_name="simple_proxy_channel", file_dir=file_dir
).get_compiled_kernel()
self.nblocks = 1
self.nthreads = 1024
@@ -393,7 +401,7 @@ def test_sm_channels(mpi_group: MpiGroup, nelem: int, use_packet: bool):
memory_expected[(nelemPerRank * rank) : (nelemPerRank * (rank + 1))] = rank + 1
if use_packet:
channels = group.make_sm_channels_with_packet(memory, scratch, connections)
channels = group.make_sm_channels_with_scratch(memory, scratch, connections)
else:
channels = group.make_sm_channels(memory, connections)
kernel = MscclppKernel("sm_channel", group.my_rank, group.nranks, channels, memory, use_packet, scratch)
@@ -496,7 +504,7 @@ def test_simple_proxy_channel(mpi_group: MpiGroup, nelem: int, transport: str, u
memory_to_register = scratch
else:
memory_to_register = memory
simple_channels = group.make_proxy_channels_with_packet(proxy_service, memory_to_register, connections)
simple_channels = group.make_proxy_channels(proxy_service, memory_to_register, connections)
kernel = MscclppKernel(
"simple_proxy_channel",

View File

@@ -1,14 +1,10 @@
set -e
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
SRC_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/build"
SRC_INCLUDE_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/include"
PYTHON_SRC_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/python"
ROOT_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/"
DST_DIR="/tmp/mscclpp"
HOSTFILE="${SYSTEM_DEFAULTWORKINGDIRECTORY}/test/deploy/hostfile"
DEPLOY_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/test/deploy"
SSH_OPTION="StrictHostKeyChecking=no"
MSCCLPP_TEST_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/test/mscclpp-test"
chmod 400 ${KeyFilePath}
ssh-keygen -t rsa -f sshkey -P ""
@@ -25,23 +21,15 @@ done
set -e
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION "rm -rf ${DST_DIR}"
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION "mkdir -p ${DST_DIR}"
parallel-scp -t 0 -r -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${SRC_DIR} ${DST_DIR}
parallel-scp -t 0 -r -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${PYTHON_SRC_DIR} ${DST_DIR}
parallel-scp -t 0 -r -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${SRC_INCLUDE_DIR} ${DST_DIR}
parallel-scp -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION sshkey ${DST_DIR}
parallel-scp -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION sshkey.pub ${DST_DIR}
parallel-scp -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${DEPLOY_DIR}/* ${DST_DIR}
parallel-scp -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${MSCCLPP_TEST_DIR}/check_perf_result.py ${DST_DIR}
parallel-scp -t 0 -r -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${ROOT_DIR} ${DST_DIR}
# force to pull the latest image
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION \
"sudo docker pull ${CONTAINERIMAGE}"
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION \
"sudo docker run --rm -itd --privileged --net=host --ipc=host --gpus=all \
-w /root -v ${DST_DIR}:/root/mscclpp --name=mscclpp-test \
-w /root -v ${DST_DIR}:/root/mscclpp -v /opt/microsoft:/opt/microsoft --name=mscclpp-test \
--entrypoint /bin/bash ${CONTAINERIMAGE}"
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION \
"sudo docker exec -t --user root mscclpp-test bash '/root/mscclpp/setup.sh'"
"sudo docker exec -t --user root mscclpp-test bash '/root/mscclpp/test/deploy/setup.sh'"

View File

@@ -1,62 +1,63 @@
set -e
HOSTFILE=/root/mscclpp/test/deploy/hostfile_mpi
function run_mscclpp_test()
{
echo "=================Run allgather_test_perf on 2 nodes========================="
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 0 -o /root/mscclpp/output.jsonl
# For kernel 2, the message size must can be divided by 3
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allgather_test_perf -b 3K -e 3G -f 2 -k 2 -o /root/mscclpp/output.jsonl
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 3 -o /root/mscclpp/output.jsonl
echo "==================Run allreduce_test_perf on 2 nodes========================="
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 0 -o /root/mscclpp/output.jsonl
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 1 -o /root/mscclpp/output.jsonl
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1M -f 2 -k 2 -o /root/mscclpp/output.jsonl
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 3K -e 3G -f 2 -k 3 -o /root/mscclpp/output.jsonl
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/allreduce_test_perf -b 3K -e 3G -f 2 -k 4 -o /root/mscclpp/output.jsonl
echo "==================Run alltoall_test_perf on 2 nodes========================="
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile /root/mscclpp/hostfile_mpi \
/usr/local/mpi/bin/mpirun --allow-run-as-root -np 16 --bind-to numa -hostfile ${HOSTFILE} \
-x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mscclpp-test/alltoall_test_perf -b 1K -e 1G -f 2 -k 0 -o /root/mscclpp/output.jsonl
echo "========================Run performance check==============================="
python3 /root/mscclpp/check_perf_result.py --perf-file /root/mscclpp/output.jsonl \
--baseline-file /root/mscclpp/perf_ndmv4.jsonl
python3 /root/mscclpp/test/mscclpp-test/check_perf_result.py --perf-file /root/mscclpp/output.jsonl \
--baseline-file /root/mscclpp/test/deploy/perf_ndmv4.jsonl
}
function run_mp_ut()
{
echo "============Run multi-process unit tests on 2 nodes (np=2, npernode=1)========================="
/usr/local/mpi/bin/mpirun -allow-run-as-root -tag-output -np 2 --bind-to numa \
-hostfile /root/mscclpp/hostfile_mpi -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-hostfile ${HOSTFILE} -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 1 /root/mscclpp/build/test/mp_unit_tests -ip_port mscclit-000000:20003
echo "============Run multi-process unit tests on 2 nodes (np=16, npernode=8)========================="
/usr/local/mpi/bin/mpirun -allow-run-as-root -tag-output -np 16 --bind-to numa \
-hostfile /root/mscclpp/hostfile_mpi -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-hostfile ${HOSTFILE} -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 /root/mscclpp/build/test/mp_unit_tests -ip_port mscclit-000000:20003
}
@@ -64,12 +65,23 @@ function run_pytests()
{
echo "==================Run python tests================================"
/usr/local/mpi/bin/mpirun -allow-run-as-root -tag-output -np 16 --bind-to numa \
-hostfile /root/mscclpp/hostfile_mpi -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-npernode 8 bash /root/mscclpp/pytest.sh
-hostfile ${HOSTFILE} -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-x MSCCLPP_HOME=/root/mscclpp -npernode 8 bash /root/mscclpp/test/deploy/pytest.sh
}
function run_py_benchmark()
{
echo "==================Run python benchmark================================"
/usr/local/mpi/bin/mpirun -allow-run-as-root -np 16 --bind-to numa \
-hostfile ${HOSTFILE} -x MSCCLPP_DEBUG=WARN -x LD_LIBRARY_PATH=/root/mscclpp/build:$LD_LIBRARY_PATH \
-mca pml ob1 -mca btl ^openib -mca btl_tcp_if_include eth0 -x NCCL_IB_PCI_RELAXED_ORDERING=1 -x NCCL_SOCKET_IFNAME=eth0 \
-x CUDA_DEVICE_ORDER=PCI_BUS_ID -x NCCL_NET_GDR_LEVEL=5 -x NCCL_TOPO_FILE=/opt/microsoft/ndv4-topo.xml \
-x NCCL_NET_PLUGIN=none -x NCCL_IB_DISABLE=0 -x NCCL_MIN_NCHANNELS=32 -x NCCL_DEBUG=WARN -x NCCL_P2P_DISABLE=0 -x NCCL_SHM_DISABLE=0 \
-x MSCCLPP_HOME=/root/mscclpp -np 16 -npernode 8 python3 /root/mscclpp/python/benchmark/allreduce_bench.py
}
if [ $# -lt 1 ]; then
echo "Usage: $0 <mscclpp-test/mp-ut>"
echo "Usage: $0 <mscclpp-test/mp-ut/run_pytests/run_py_benchmark>"
exit 1
fi
test_name=$1
@@ -83,9 +95,13 @@ case $test_name in
run_mp_ut
;;
pytests)
echo "==================Run python tests================================"
echo "==================Run python tests===================================="
run_pytests
;;
py-benchmark)
echo "==================Run python benchmark================================"
run_py_benchmark
;;
*)
echo "Unknown test name: $test_name"
exit 1

View File

@@ -3,7 +3,7 @@ set -e
mkdir -p /root/.ssh
mv /root/mscclpp/sshkey.pub /root/.ssh/authorized_keys
chown root:root /root/.ssh/authorized_keys
mv /root/mscclpp/config /root/.ssh/config
mv /root/mscclpp/test/deploy/config /root/.ssh/config
chown root:root /root/.ssh/config
chmod 400 /root/mscclpp/sshkey
chown root:root /root/mscclpp/sshkey
@@ -14,10 +14,12 @@ for i in $(seq 0 $(( $(nvidia-smi -L | wc -l) - 1 ))); do
done
if [[ "${CUDA_VERSION}" == *"11."* ]]; then
pip3 install -r /root/mscclpp/python/test/requirements_cu11.txt
pip3 install -r /root/mscclpp/python/requirements_cu11.txt
else
pip3 install -r /root/mscclpp/python/test/requirements_cu12.txt
pip3 install -r /root/mscclpp/python/requirements_cu12.txt
fi
cd /root/mscclpp && pip3 install .
mkdir -p /var/run/sshd
/usr/sbin/sshd -p 22345

View File

@@ -890,7 +890,7 @@ __global__ void allreduce6(int* buff, int* scratch, void* resultBuff, int rank,
size_t scratchResultOffset =
(flag & 1) ? 2 * nPkts * sizeof(mscclpp::LLPacket) : 3 * nPkts * sizeof(mscclpp::LLPacket);
size_t srcOffset = remoteRank * nelemsPerRank * sizeof(int);
uint2* src = (uint2*)((char*)buff + srcOffset);
uint2* src = (uint2*)((char*)buff + rank * nelemsPerRank * sizeof(int));
uint2* dst = (uint2*)((char*)resultBuff + rank * nelemsPerRank * sizeof(int));
// step 1: write to scratch buffer