mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-11 17:00:22 +00:00
[mscclpp-test] Add AllReduce and AllToAll tests (#83)
This commit is contained in:
@@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.26)
|
||||
project(mscclpp LANGUAGES CUDA CXX)
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
set(CMAKE_CUDA_STANDARD 17)
|
||||
set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -gencode arch=compute_80,code=sm_80 -gencode arch=compute_90,code=sm_90")
|
||||
|
||||
option(ENABLE_TRACE "Enable tracing" OFF)
|
||||
option(USE_MPI_FOR_TESTS "Use MPI for tests" ON)
|
||||
|
||||
@@ -228,8 +228,12 @@ struct SimpleDeviceChannel {
|
||||
|
||||
SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src) : devChan_(devChan), dst_(dst), src_(src) {}
|
||||
|
||||
SimpleDeviceChannel(DeviceChannel devChan, void* dstPtr, void* srcPtr)
|
||||
: devChan_(devChan), srcPtr_(srcPtr), dstPtr_(dstPtr) {}
|
||||
SimpleDeviceChannel(DeviceChannel devChan, void* dstPtr, void* srcPtr, void* tmpPtr = nullptr)
|
||||
: devChan_(devChan), dstPtr_(dstPtr), srcPtr_(srcPtr), tmpPtr_(tmpPtr) {}
|
||||
|
||||
SimpleDeviceChannel(DeviceChannel devChan, MemoryId dst, MemoryId src, void* dstPtr, void* srcPtr,
|
||||
void* tmpPtr = nullptr)
|
||||
: devChan_(devChan), dst_(dst), src_(src), dstPtr_(dstPtr), srcPtr_(srcPtr), tmpPtr_(tmpPtr) {}
|
||||
|
||||
SimpleDeviceChannel(const SimpleDeviceChannel& other) = default;
|
||||
|
||||
@@ -278,8 +282,11 @@ struct SimpleDeviceChannel {
|
||||
MemoryId src_;
|
||||
|
||||
// these are used for direct copy
|
||||
void* srcPtr_;
|
||||
void* dstPtr_;
|
||||
void* srcPtr_;
|
||||
|
||||
// extra local buffer for out-of-place copy
|
||||
void* tmpPtr_;
|
||||
};
|
||||
|
||||
} // namespace channel
|
||||
|
||||
@@ -12,6 +12,9 @@ struct DeviceSyncer {
|
||||
// previous work of all threads in cooperating blocks is finished.
|
||||
__forceinline__ __device__ void sync(int blockNum) {
|
||||
int maxOldCnt = blockNum - 1;
|
||||
__threadfence();
|
||||
// Make sure that all threads in this block have done `__threadfence()`
|
||||
// before to flip `flag`.
|
||||
__syncthreads();
|
||||
if (threadIdx.x == 0) {
|
||||
int tmpIsAdd = isAdd_ ^ 1;
|
||||
|
||||
@@ -1,246 +0,0 @@
|
||||
#include <cuda/barrier>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include "comm.h"
|
||||
#include "common.h"
|
||||
|
||||
#define ALIGN 4
|
||||
|
||||
const int phase2Tag = 2;
|
||||
mscclppDevConn_t* conns;
|
||||
void* scratch = nullptr;
|
||||
void* sendRecvData = nullptr;
|
||||
cuda::barrier<cuda::thread_scope_device>* barrier = nullptr;
|
||||
|
||||
struct Chunk {
|
||||
size_t offset;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
inline int getSendTag(int rank, int peer) { return rank < peer ? 0 : 1; }
|
||||
|
||||
inline int getRecvTag(int rank, int peer) { return rank < peer ? 1 : 0; }
|
||||
|
||||
__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) {
|
||||
size_t remainder = dataCount % numChunks;
|
||||
size_t smallChunkSize = dataCount / numChunks;
|
||||
size_t largeChunkSize = smallChunkSize + 1;
|
||||
size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
|
||||
size_t numSmallChunks = chunkCount - numLargeChunks;
|
||||
size_t offset = (remainder - numLargeChunks) * largeChunkSize +
|
||||
(chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize;
|
||||
return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize};
|
||||
}
|
||||
|
||||
__host__ __device__ int peerIdx(int peerRank, int rank) { return peerRank < rank ? peerRank : peerRank - 1; }
|
||||
|
||||
__host__ __device__ int peerRank(int peerIdx, int rank) { return peerIdx < rank ? peerIdx : peerIdx + 1; }
|
||||
|
||||
__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3; }
|
||||
|
||||
__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 1; }
|
||||
|
||||
__host__ __device__ int phase2ConnIdx(int peerRank, int rank) { return peerIdx(peerRank, rank) * 3 + 2; }
|
||||
|
||||
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) {
|
||||
if (threadIdx.x == 0) {
|
||||
conn.putWithSignalAndFlush(dstOffset, srcOffset, size);
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void recv(mscclppDevConn_t& conn) {
|
||||
if (threadIdx.x == 0) {
|
||||
conn.wait();
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void reduceSum(int* dst, int* src, size_t size) {
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
dst[i] += src[i];
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void initData(int* data, size_t size, int rank) {
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
data[i] = rank;
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount,
|
||||
mscclppDevConn_t* conns, void* scratch, void* sendRecvData,
|
||||
cuda::barrier<cuda::thread_scope_device>* barrier) {
|
||||
int idx = blockIdx.x;
|
||||
int peer = peerRank(idx, rank);
|
||||
mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)];
|
||||
mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(peer, rank)];
|
||||
mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(peer, rank)];
|
||||
|
||||
// 1st communication phase: send data to the scratch buffer of the peer associated with this block
|
||||
Chunk toPeerChunk = getChunk(dataCount, nRanks, peer, 1);
|
||||
// Now we need to figure out the offset of this chunk in the scratch buffer of the destination.
|
||||
// The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and
|
||||
// inside that each of the destination's peers send to the nth chunk, where n is the index of the
|
||||
// source peer from the destination's perspective.
|
||||
size_t dstOffset = peerIdx(rank, peer) * toPeerChunk.size;
|
||||
send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int));
|
||||
recv(phase1RecvConn);
|
||||
|
||||
if (threadIdx.x == 0) barrier->arrive_and_wait();
|
||||
__syncthreads();
|
||||
|
||||
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
|
||||
Chunk rankChunk = getChunk(dataCount, nRanks, rank, 1);
|
||||
int* chunk = (int*)sendRecvData + rankChunk.offset;
|
||||
int numPeers = nRanks - 1, numBlocks = nRanks - 1;
|
||||
Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, idx, 1);
|
||||
for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) {
|
||||
assert(scratchDataCount % numPeers == 0);
|
||||
assert(scratchDataCount / numPeers == rankChunk.size);
|
||||
size_t scratchDataCountPerPeer = scratchDataCount / numPeers;
|
||||
int* scratchChunk = (int*)scratch + peerIdx * scratchDataCountPerPeer;
|
||||
Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, idx, 1);
|
||||
assert(blockScratchChunk.size == blockUserChunk.size);
|
||||
reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
|
||||
}
|
||||
|
||||
if (threadIdx.x == 0) barrier->arrive_and_wait();
|
||||
__syncthreads();
|
||||
|
||||
// 2nd communication phase: send the now reduced data between the user buffers
|
||||
Chunk collectionChunk = getChunk(dataCount, nRanks, rank, 1);
|
||||
send(phase2Conn, collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int),
|
||||
collectionChunk.size * sizeof(int));
|
||||
recv(phase2Conn);
|
||||
}
|
||||
|
||||
void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks) {
|
||||
size_t base = (count / ALIGN) * ALIGN;
|
||||
*sendcount = base;
|
||||
*recvcount = base;
|
||||
*sendInplaceOffset = 0;
|
||||
*recvInplaceOffset = 0;
|
||||
*paramcount = base;
|
||||
}
|
||||
|
||||
void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) {
|
||||
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
|
||||
AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
|
||||
}
|
||||
|
||||
testResult_t AllReduceInitData(struct testArgs* args, int in_place) {
|
||||
size_t recvcount = args->expectedBytes / sizeof(int);
|
||||
|
||||
CUDACHECK(cudaSetDevice(args->gpuNum));
|
||||
CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes));
|
||||
initData<<<1, 256>>>((int*)args->recvbuff, recvcount, args->proc);
|
||||
|
||||
int* dataHost = new int[recvcount];
|
||||
for (size_t i = 0; i < recvcount; i++) {
|
||||
dataHost[i] = args->totalProcs * (args->totalProcs - 1) / 2;
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
delete dataHost;
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
|
||||
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
|
||||
|
||||
*algBw = baseBw;
|
||||
double factor = (2 * (double)(nranks - 1)) / ((double)nranks);
|
||||
*busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm,
|
||||
cudaStream_t stream, int kernelNum) {
|
||||
int worldSize = comm->nRanks;
|
||||
int nPeers = worldSize - 1;
|
||||
int dataCount = nBytes / sizeof(int);
|
||||
Chunk chunk = getChunk(dataCount, worldSize, comm->rank, 1);
|
||||
size_t scratchDataCount = chunk.size * nPeers;
|
||||
allReduceKernel0<<<worldSize - 1, 256, 0, stream>>>(comm->rank, worldSize, dataCount, scratchDataCount, conns,
|
||||
scratch, sendRecvData, barrier);
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, defaultInitColl, AllReduceInitData,
|
||||
AllReduceGetBw, AllReduceRunColl};
|
||||
|
||||
testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) {
|
||||
int rank = args->proc, worldSize = args->totalProcs;
|
||||
size_t bufferSize = args->maxbytes;
|
||||
Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1);
|
||||
int nPeers = args->totalProcs - 1;
|
||||
size_t scratchBytes = chunk.size * nPeers * sizeof(int);
|
||||
|
||||
CUDACHECK(cudaMalloc(&scratch, scratchBytes));
|
||||
|
||||
for (int peer = 0; peer < worldSize; ++peer) {
|
||||
if (peer != args->proc) {
|
||||
int sendTag = getSendTag(args->proc, peer);
|
||||
int recvTag = getRecvTag(args->proc, peer);
|
||||
MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
|
||||
MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr));
|
||||
MSCCLPPCHECK(
|
||||
mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
|
||||
}
|
||||
}
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(args->comm));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t AllReduceTeardownMscclppConnections() {
|
||||
if (scratch != nullptr) {
|
||||
CUDACHECK(cudaFree(scratch));
|
||||
scratch = nullptr;
|
||||
}
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t AllReduceRunTest(struct testArgs* args) {
|
||||
args->collTest = &allReduceTest;
|
||||
|
||||
sendRecvData = args->recvbuff;
|
||||
CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier<cuda::thread_scope_device>)));
|
||||
cuda::barrier<cuda::thread_scope_device> initBarrier(args->totalProcs - 1);
|
||||
CUDACHECK(
|
||||
cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
|
||||
int nPeers = args->totalProcs - 1;
|
||||
int rank = args->proc;
|
||||
std::vector<mscclppDevConn_t> hostConns(nPeers * 3, mscclppDevConn_t());
|
||||
|
||||
for (int peer = 0; peer < args->totalProcs; ++peer) {
|
||||
mscclppDevConn_t* devConn;
|
||||
if (peer != rank) {
|
||||
int sendTag = getSendTag(args->proc, peer);
|
||||
int recvTag = getRecvTag(args->proc, peer);
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, sendTag, &devConn));
|
||||
hostConns[phase1SendConnIdx(peer, rank)] = *devConn;
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, recvTag, &devConn));
|
||||
hostConns[phase1RecvConnIdx(peer, rank)] = *devConn;
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, phase2Tag, &devConn));
|
||||
hostConns[phase2ConnIdx(peer, rank)] = *devConn;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMalloc(&conns, nPeers * 3 * sizeof(mscclppDevConn_t)));
|
||||
CUDACHECK(cudaMemcpy(conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice));
|
||||
|
||||
TESTCHECK(TimeTest(args));
|
||||
|
||||
CUDACHECK(cudaFree(barrier));
|
||||
CUDACHECK(cudaFree(conns));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testEngine allReduceEngine = {AllReduceGetBuffSize, AllReduceRunTest, AllReduceSetupMscclppConnections,
|
||||
AllReduceTeardownMscclppConnections};
|
||||
|
||||
#pragma weak mscclppTestEngine = allReduceEngine
|
||||
@@ -5,3 +5,5 @@ endfunction()
|
||||
|
||||
add_mscclpp_test_executable(sendrecv_test_perf sendrecv_test.cu)
|
||||
add_mscclpp_test_executable(allgather_test_perf allgather_test.cu)
|
||||
add_mscclpp_test_executable(allreduce_test_perf allreduce_test.cu)
|
||||
add_mscclpp_test_executable(alltoall_test_perf alltoall_test.cu)
|
||||
|
||||
@@ -133,7 +133,7 @@ class AllGatherTestColl : public BaseTestColl {
|
||||
|
||||
void runColl(const TestArgs& args, cudaStream_t stream) override;
|
||||
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
|
||||
void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void setupCollTest(size_t size) override;
|
||||
};
|
||||
|
||||
@@ -205,42 +205,8 @@ void AllGatherTestEngine::allocateBuffer() {
|
||||
}
|
||||
|
||||
void AllGatherTestEngine::setupConnections() {
|
||||
const int worldSize = args_.totalRanks;
|
||||
const int rank = args_.rank;
|
||||
const int nRanksPerNode = args_.nRanksPerNode;
|
||||
const int thisNode = rank / nRanksPerNode;
|
||||
const mscclpp::Transport ibTransport = IBs[args_.gpuNum];
|
||||
|
||||
std::vector<mscclpp::channel::ChannelId> channelIds;
|
||||
std::vector<mscclpp::RegisteredMemory> localMemories;
|
||||
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemories;
|
||||
|
||||
auto rankToNode = [&](int rank) { return rank / nRanksPerNode; };
|
||||
for (int r = 0; r < worldSize; r++) {
|
||||
if (r == rank) {
|
||||
continue;
|
||||
}
|
||||
mscclpp::Transport transport;
|
||||
if (rankToNode(r) == thisNode) {
|
||||
transport = mscclpp::Transport::CudaIpc;
|
||||
} else {
|
||||
transport = ibTransport;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
channelIds.push_back(chanService_->addChannel(comm_->connectOnSetup(r, 0, transport)));
|
||||
auto memory = comm_->registerMemory(sendBuff_.get(), args_.maxBytes, mscclpp::Transport::CudaIpc | ibTransport);
|
||||
localMemories.push_back(memory);
|
||||
comm_->sendMemoryOnSetup(memory, r, 0);
|
||||
remoteMemories.push_back(comm_->recvMemoryOnSetup(r, 0));
|
||||
}
|
||||
comm_->setup();
|
||||
|
||||
std::vector<mscclpp::channel::SimpleDeviceChannel> devChannels;
|
||||
for (size_t i = 0; i < channelIds.size(); ++i) {
|
||||
devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(chanService_->deviceChannel(channelIds[i]),
|
||||
chanService_->addMemory(remoteMemories[i].get()),
|
||||
chanService_->addMemory(localMemories[i])));
|
||||
}
|
||||
setupMeshConnections(devChannels, sendBuff_.get(), args_.maxBytes);
|
||||
|
||||
assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel));
|
||||
CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(),
|
||||
|
||||
328
test/mscclpp-test/allreduce_test.cu
Normal file
328
test/mscclpp-test/allreduce_test.cu
Normal file
@@ -0,0 +1,328 @@
|
||||
#include <cassert>
|
||||
#include <mscclpp/concurrency.hpp>
|
||||
#include <vector>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#define ALIGN 4
|
||||
#define BLOCKS_PER_PEER 15
|
||||
|
||||
__constant__ mscclpp::channel::SimpleDeviceChannel constDevFstRoundChans[16];
|
||||
__constant__ mscclpp::channel::SimpleDeviceChannel constDevSndRoundChans[16];
|
||||
|
||||
struct Chunk {
|
||||
size_t offset;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx) {
|
||||
size_t remainder = dataCount % numChunks;
|
||||
size_t smallChunkSize = dataCount / numChunks;
|
||||
size_t largeChunkSize = smallChunkSize + 1;
|
||||
size_t numRemainedLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
|
||||
size_t offset = (remainder - numRemainedLargeChunks) * largeChunkSize +
|
||||
(chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize;
|
||||
return Chunk{offset, chunkIdx < remainder ? largeChunkSize : smallChunkSize};
|
||||
}
|
||||
|
||||
__device__ void reduceSum(int* dst, int* src, size_t size) {
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
dst[i] += src[i];
|
||||
}
|
||||
}
|
||||
|
||||
__device__ mscclpp::DeviceSyncer deviceSyncer;
|
||||
|
||||
__device__ void allreduce0(int rank, int worldSize, size_t nelems, size_t scratchDataCount) {
|
||||
int peerId = blockIdx.x / BLOCKS_PER_PEER;
|
||||
int isComm = (threadIdx.x == 0) && (blockIdx.x % BLOCKS_PER_PEER == 0);
|
||||
int remoteRank = (peerId < rank) ? peerId : peerId + 1;
|
||||
|
||||
// 1st communication phase: send data to the scratch buffer of the peer associated with this block
|
||||
mscclpp::channel::SimpleDeviceChannel& devFstRoundChan = constDevFstRoundChans[peerId];
|
||||
Chunk toPeerChunk = getChunk(nelems, worldSize, remoteRank);
|
||||
// Now we need to figure out the offset of this chunk in the scratch buffer of the destination.
|
||||
// The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and
|
||||
// inside that each of the destination's peers send to the nth chunk, where n is the index of the
|
||||
// source peer from the destination's perspective.
|
||||
size_t dstOffset = (rank < remoteRank ? rank : rank - 1) * toPeerChunk.size;
|
||||
if (isComm) {
|
||||
// Write data to the peer
|
||||
devFstRoundChan.putWithSignalAndFlush(dstOffset * sizeof(int), toPeerChunk.offset * sizeof(int),
|
||||
toPeerChunk.size * sizeof(int));
|
||||
// Wait for data from the peer
|
||||
devFstRoundChan.wait();
|
||||
}
|
||||
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
|
||||
mscclpp::channel::SimpleDeviceChannel& devSndRoundChan = constDevSndRoundChans[peerId];
|
||||
Chunk rankChunk = getChunk(nelems, worldSize, rank);
|
||||
int* chunk = (int*)devSndRoundChan.srcPtr_ + rankChunk.offset;
|
||||
int numPeers = gridDim.x / BLOCKS_PER_PEER;
|
||||
int numBlocks = gridDim.x;
|
||||
Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, blockIdx.x);
|
||||
size_t scratchDataCountPerPeer = scratchDataCount / numPeers;
|
||||
Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, blockIdx.x);
|
||||
for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) {
|
||||
int* scratchChunk = (int*)devFstRoundChan.tmpPtr_ + peerIdx * scratchDataCountPerPeer;
|
||||
reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
|
||||
}
|
||||
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
// 2nd communication phase: send the now reduced data between the user buffers
|
||||
Chunk collectionChunk = getChunk(nelems, worldSize, rank);
|
||||
if (isComm) {
|
||||
// Write data to the peer
|
||||
devSndRoundChan.putWithSignalAndFlush(collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int),
|
||||
collectionChunk.size * sizeof(int));
|
||||
// Wait for data from the peer
|
||||
devSndRoundChan.wait();
|
||||
}
|
||||
}
|
||||
|
||||
__forceinline__ __device__ void vectorSum(int* dst, int* src, size_t nElem) {
|
||||
size_t nInt4 = nElem / 4;
|
||||
size_t nLastInts = nElem % 4;
|
||||
int4* dst4 = (int4*)dst;
|
||||
int4* src4 = (int4*)src;
|
||||
for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < nInt4; i += blockDim.x * gridDim.x) {
|
||||
dst4[i].w += src4[i].w;
|
||||
dst4[i].x += src4[i].x;
|
||||
dst4[i].y += src4[i].y;
|
||||
dst4[i].z += src4[i].z;
|
||||
}
|
||||
if (nLastInts > 0) {
|
||||
int* dstLast = dst + nInt4 * 4;
|
||||
int* srcLast = src + nInt4 * 4;
|
||||
for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < nLastInts; i += blockDim.x * gridDim.x) {
|
||||
dstLast[i] += srcLast[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void allreduce1(int rank, int worldSize, size_t nelems, size_t scratchDataCount) {
|
||||
int isComm = (threadIdx.x == 0) && (blockIdx.x == 0);
|
||||
int remoteSendRank = (rank + 1) % worldSize;
|
||||
int remoteRecvRank = (rank + worldSize - 1) % worldSize;
|
||||
int peerSendId = (remoteSendRank < rank) ? remoteSendRank : remoteSendRank - 1;
|
||||
int peerRecvId = (remoteRecvRank < rank) ? remoteRecvRank : remoteRecvRank - 1;
|
||||
|
||||
mscclpp::channel::SimpleDeviceChannel& devFstSendChan = constDevFstRoundChans[peerSendId];
|
||||
mscclpp::channel::SimpleDeviceChannel& devFstRecvChan = constDevFstRoundChans[peerRecvId];
|
||||
mscclpp::channel::SimpleDeviceChannel& devSndSendChan = constDevSndRoundChans[peerSendId];
|
||||
mscclpp::channel::SimpleDeviceChannel& devSndRecvChan = constDevSndRoundChans[peerRecvId];
|
||||
|
||||
// Step 1
|
||||
size_t chunkIndex = (rank + worldSize - 1) % worldSize;
|
||||
size_t chunkNelem = nelems / worldSize;
|
||||
size_t offset = chunkIndex * chunkNelem * sizeof(int);
|
||||
if (isComm) {
|
||||
if (chunkNelem > 1) {
|
||||
devFstSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int));
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2 ~ Step n-1
|
||||
for (int step = 2; step < worldSize; ++step) {
|
||||
if (isComm) {
|
||||
if (chunkNelem > 1) {
|
||||
devFstRecvChan.wait();
|
||||
devFstSendChan.flush();
|
||||
}
|
||||
devFstSendChan.putWithSignal(offset + chunkNelem / 2 * sizeof(int), (chunkNelem - chunkNelem / 2) * sizeof(int));
|
||||
}
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
// Reduce
|
||||
chunkIndex = (rank + worldSize - step) % worldSize;
|
||||
offset = chunkIndex * chunkNelem * sizeof(int);
|
||||
int* dst = (int*)((char*)devFstSendChan.srcPtr_ + offset);
|
||||
int* src = (int*)((char*)devFstRecvChan.tmpPtr_ + offset);
|
||||
vectorSum(dst, src, chunkNelem / 2);
|
||||
|
||||
if (isComm) {
|
||||
devFstRecvChan.wait();
|
||||
devFstSendChan.flush();
|
||||
if (chunkNelem > 1) {
|
||||
devFstSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int));
|
||||
}
|
||||
}
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
dst += chunkNelem / 2;
|
||||
src += chunkNelem / 2;
|
||||
vectorSum(dst, src, chunkNelem - chunkNelem / 2);
|
||||
}
|
||||
|
||||
// Step n
|
||||
if (isComm) {
|
||||
if (chunkNelem > 1) {
|
||||
devFstRecvChan.wait();
|
||||
devFstSendChan.flush();
|
||||
}
|
||||
devFstSendChan.putWithSignal(offset + chunkNelem / 2 * sizeof(int), (chunkNelem - chunkNelem / 2) * sizeof(int));
|
||||
}
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
offset = rank * chunkNelem * sizeof(int);
|
||||
int* dst = (int*)((char*)devFstSendChan.srcPtr_ + offset);
|
||||
int* src = (int*)((char*)devFstRecvChan.tmpPtr_ + offset);
|
||||
vectorSum(dst, src, chunkNelem / 2);
|
||||
|
||||
if (isComm) {
|
||||
devFstRecvChan.wait();
|
||||
devFstSendChan.flush();
|
||||
if (chunkNelem > 1) {
|
||||
devSndSendChan.putWithSignal(offset, chunkNelem / 2 * sizeof(int));
|
||||
}
|
||||
}
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
dst += chunkNelem / 2;
|
||||
src += chunkNelem / 2;
|
||||
vectorSum(dst, src, chunkNelem - chunkNelem / 2);
|
||||
|
||||
if (isComm) {
|
||||
if (chunkNelem > 1) {
|
||||
devSndSendChan.flush();
|
||||
}
|
||||
devSndSendChan.putWithSignalAndFlush(offset + chunkNelem / 2 * sizeof(int),
|
||||
(chunkNelem - chunkNelem / 2) * sizeof(int));
|
||||
}
|
||||
|
||||
// Step n+1 ~ Step 2n-2
|
||||
for (int i = 1; i < worldSize - 1; ++i) {
|
||||
if (isComm) {
|
||||
devSndRecvChan.wait();
|
||||
}
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
|
||||
// Copy
|
||||
chunkIndex = (rank + worldSize - i) % worldSize;
|
||||
if (isComm) {
|
||||
devSndSendChan.putWithSignalAndFlush(chunkIndex * chunkNelem * sizeof(int), chunkNelem * sizeof(int));
|
||||
}
|
||||
}
|
||||
|
||||
// Final receive
|
||||
if (isComm) {
|
||||
devSndRecvChan.wait();
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void kernel(int rank, int worldSize, size_t nelems, size_t scratchDataCount, int kernel) {
|
||||
if (kernel == 0)
|
||||
allreduce0(rank, worldSize, nelems, scratchDataCount);
|
||||
else if (kernel == 1)
|
||||
allreduce1(rank, worldSize, nelems, scratchDataCount);
|
||||
}
|
||||
|
||||
class AllReduceTestColl : public BaseTestColl {
|
||||
public:
|
||||
AllReduceTestColl() = default;
|
||||
~AllReduceTestColl() = default;
|
||||
|
||||
void runColl(const TestArgs& args, cudaStream_t stream) override;
|
||||
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
|
||||
void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void setupCollTest(size_t size) override;
|
||||
};
|
||||
|
||||
void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
const int worldSize = args.totalRanks;
|
||||
const int rank = args.rank;
|
||||
const int kernelNum = args.kernelNum;
|
||||
const int nPeers = worldSize - 1;
|
||||
const Chunk chunk = getChunk(paramCount_, worldSize, rank);
|
||||
const size_t scratchDataCount = chunk.size * nPeers;
|
||||
const int nBlocks = (kernelNum == 0) ? nPeers * BLOCKS_PER_PEER : 24;
|
||||
kernel<<<nBlocks, 1024, 0, stream>>>(rank, worldSize, paramCount_, scratchDataCount, kernelNum);
|
||||
}
|
||||
|
||||
void AllReduceTestColl::initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) {
|
||||
assert(sendBuff.size() == 1);
|
||||
const int rank = args.rank;
|
||||
const int worldSize = args.totalRanks;
|
||||
std::vector<int> dataHost(std::max(sendCount_, recvCount_), rank);
|
||||
CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), sendCount_ * typeSize_, cudaMemcpyHostToDevice));
|
||||
|
||||
for (size_t i = 0; i < recvCount_; i++) {
|
||||
dataHost[i] = worldSize * (worldSize - 1) / 2;
|
||||
}
|
||||
std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_);
|
||||
}
|
||||
|
||||
void AllReduceTestColl::getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) {
|
||||
double baseBw = (double)(paramCount_ * typeSize_) / 1.0E9 / deltaSec;
|
||||
algBw = baseBw;
|
||||
double factor = (2 * (double)(worldSize_ - 1)) / ((double)worldSize_);
|
||||
busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
void AllReduceTestColl::setupCollTest(size_t size) {
|
||||
size_t count = size / typeSize_;
|
||||
size_t base = (count / ALIGN) * ALIGN;
|
||||
sendCount_ = base;
|
||||
recvCount_ = base;
|
||||
paramCount_ = base;
|
||||
recvCount_ = base;
|
||||
|
||||
mscclpp::DeviceSyncer syncer = {};
|
||||
CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer)));
|
||||
}
|
||||
|
||||
class AllReduceTestEngine : public BaseTestEngine {
|
||||
public:
|
||||
AllReduceTestEngine() = default;
|
||||
~AllReduceTestEngine() = default;
|
||||
|
||||
void allocateBuffer() override;
|
||||
void setupConnections() override;
|
||||
|
||||
private:
|
||||
std::vector<void*> getSendBuff() override;
|
||||
void* getExpectedBuff() override;
|
||||
void* getRecvBuff() override;
|
||||
|
||||
std::shared_ptr<int> sendBuff_;
|
||||
std::shared_ptr<int> scratchBuff_;
|
||||
std::shared_ptr<int[]> expectedBuff_;
|
||||
};
|
||||
|
||||
void AllReduceTestEngine::allocateBuffer() {
|
||||
sendBuff_ = mscclpp::allocSharedCuda<int>(args_.maxBytes / sizeof(int));
|
||||
scratchBuff_ = mscclpp::allocSharedCuda<int>(args_.maxBytes / sizeof(int));
|
||||
expectedBuff_ = std::shared_ptr<int[]>(new int[args_.maxBytes / sizeof(int)]);
|
||||
}
|
||||
|
||||
void AllReduceTestEngine::setupConnections() {
|
||||
std::vector<mscclpp::channel::SimpleDeviceChannel> fstRoundChannels;
|
||||
std::vector<mscclpp::channel::SimpleDeviceChannel> sndRoundChannels;
|
||||
|
||||
// Send data from local sendBuff to remote scratchBuff (out-of-place)
|
||||
setupMeshConnections(fstRoundChannels, sendBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes);
|
||||
assert(fstRoundChannels.size() < sizeof(constDevFstRoundChans) / sizeof(mscclpp::channel::SimpleDeviceChannel));
|
||||
CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, fstRoundChannels.data(),
|
||||
sizeof(mscclpp::channel::SimpleDeviceChannel) * fstRoundChannels.size()));
|
||||
|
||||
// Send data from local sendBuff to remote sendBuff (in-place)
|
||||
setupMeshConnections(sndRoundChannels, sendBuff_.get(), args_.maxBytes);
|
||||
assert(sndRoundChannels.size() < sizeof(constDevSndRoundChans) / sizeof(mscclpp::channel::SimpleDeviceChannel));
|
||||
CUDATHROW(cudaMemcpyToSymbol(constDevSndRoundChans, sndRoundChannels.data(),
|
||||
sizeof(mscclpp::channel::SimpleDeviceChannel) * sndRoundChannels.size()));
|
||||
}
|
||||
|
||||
std::vector<void*> AllReduceTestEngine::getSendBuff() { return {sendBuff_.get()}; }
|
||||
|
||||
void* AllReduceTestEngine::getExpectedBuff() { return expectedBuff_.get(); }
|
||||
|
||||
void* AllReduceTestEngine::getRecvBuff() {
|
||||
// in-place operation reuse the send buffer
|
||||
return sendBuff_.get();
|
||||
}
|
||||
|
||||
std::shared_ptr<BaseTestEngine> getTestEngine() { return std::make_shared<AllReduceTestEngine>(); }
|
||||
std::shared_ptr<BaseTestColl> getTestColl() { return std::make_shared<AllReduceTestColl>(); }
|
||||
153
test/mscclpp-test/alltoall_test.cu
Normal file
153
test/mscclpp-test/alltoall_test.cu
Normal file
@@ -0,0 +1,153 @@
|
||||
#include <cassert>
|
||||
#include <mscclpp/concurrency.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#define ALIGN 4
|
||||
__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16];
|
||||
__device__ mscclpp::DeviceSyncer deviceSyncer;
|
||||
void* localRecvBuff;
|
||||
void* localSendBuff;
|
||||
|
||||
__device__ void localAlltoall(int rank, int nRanksPerNode, size_t nElements) {
|
||||
int remoteRank = (blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1;
|
||||
for (int i = 1; i < nRanksPerNode; i++) {
|
||||
mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[blockIdx.x];
|
||||
if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank + i) % nRanksPerNode) {
|
||||
devChan.putWithSignalAndFlush(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int),
|
||||
nElements * sizeof(int));
|
||||
}
|
||||
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
|
||||
if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank - i + nRanksPerNode) % nRanksPerNode) {
|
||||
devChan.wait();
|
||||
}
|
||||
deviceSyncer.sync(nRanksPerNode - 1);
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void alltoall0(int rank, int worldSize, size_t nElements) {
|
||||
int remoteRank = (blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1;
|
||||
mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[blockIdx.x];
|
||||
if (threadIdx.x == 0) {
|
||||
devChan.putWithSignal(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int),
|
||||
nElements * sizeof(int));
|
||||
}
|
||||
|
||||
deviceSyncer.sync(gridDim.x);
|
||||
if (threadIdx.x == 0) {
|
||||
devChan.flush();
|
||||
devChan.wait();
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void alltoall1(int rank, int nRanksPerNode, size_t nElements) {
|
||||
localAlltoall(rank, nRanksPerNode, nElements);
|
||||
}
|
||||
|
||||
__global__ void kernel(int rank, int worldSize, size_t nElements, int nRanksPerNode, int kernelNum) {
|
||||
if (kernelNum == 0) {
|
||||
alltoall0(rank, worldSize, nElements);
|
||||
} if (kernelNum == 1) {
|
||||
alltoall1(rank, nRanksPerNode, nElements);
|
||||
}
|
||||
}
|
||||
|
||||
class AllToAllTestColl : public BaseTestColl {
|
||||
public:
|
||||
AllToAllTestColl() = default;
|
||||
~AllToAllTestColl() override = default;
|
||||
|
||||
void runColl(const TestArgs& args, cudaStream_t stream) override;
|
||||
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
|
||||
void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void setupCollTest(size_t size) override;
|
||||
};
|
||||
|
||||
void AllToAllTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
const int worldSize = args.totalRanks;
|
||||
const int rank = args.rank;
|
||||
const int kernelNum = args.kernelNum;
|
||||
const int nRanksPerNode = args.nRanksPerNode;
|
||||
CUDATHROW(cudaMemcpyAsync((int*)localRecvBuff + paramCount_ * rank, (int*)localSendBuff + paramCount_ * rank,
|
||||
paramCount_ * sizeof(int), cudaMemcpyDeviceToDevice, stream));
|
||||
kernel<<<worldSize - 1, 32, 0, stream>>>(rank, worldSize, paramCount_, nRanksPerNode, kernelNum);
|
||||
}
|
||||
|
||||
void AllToAllTestColl::initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) {
|
||||
assert(sendBuff.size() == 1);
|
||||
const int rank = args.rank;
|
||||
std::vector<int> dataHost(recvCount_, 0);
|
||||
// For rank 0, the data is 0, 1, 2 ... recvCount_ - 1, for rank 1, the data is recvCount_, recvCount_ + 1, ...
|
||||
for (size_t i = 0; i < recvCount_; i++) {
|
||||
dataHost[i] = rank * recvCount_ + i;
|
||||
}
|
||||
CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), sendCount_ * typeSize_, cudaMemcpyHostToDevice));
|
||||
for (size_t i = 0; i < recvCount_ / paramCount_; i++) {
|
||||
for (int j = 0; j < paramCount_; j++) {
|
||||
dataHost[i * paramCount_ + j] = i * recvCount_ + rank * paramCount_ + j;
|
||||
}
|
||||
}
|
||||
std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_);
|
||||
}
|
||||
|
||||
void AllToAllTestColl::getBw(const double deltaSec, double& algBw, double& busBw) {
|
||||
double baseBw = (double)(paramCount_ * typeSize_ * worldSize_) / 1.0E9 / deltaSec;
|
||||
algBw = baseBw;
|
||||
double factor = ((double)(worldSize_ - 1)) / ((double)worldSize_);
|
||||
busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
void AllToAllTestColl::setupCollTest(size_t size) {
|
||||
size_t count = size / typeSize_;
|
||||
size_t base = (count / worldSize_ / (ALIGN)) * ALIGN * worldSize_;
|
||||
sendCount_ = base;
|
||||
recvCount_ = base;
|
||||
paramCount_ = base / worldSize_;
|
||||
expectedCount_ = base;
|
||||
|
||||
mscclpp::DeviceSyncer syncer = {};
|
||||
CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer)));
|
||||
}
|
||||
|
||||
class AllToAllTestEngine : public BaseTestEngine {
|
||||
public:
|
||||
AllToAllTestEngine() : BaseTestEngine(false){};
|
||||
~AllToAllTestEngine() override = default;
|
||||
|
||||
void allocateBuffer() override;
|
||||
void setupConnections() override;
|
||||
|
||||
private:
|
||||
std::vector<void*> getSendBuff() override;
|
||||
void* getExpectedBuff() override;
|
||||
void* getRecvBuff() override;
|
||||
|
||||
std::shared_ptr<int> sendBuff_;
|
||||
std::shared_ptr<int> recvBuff_;
|
||||
std::shared_ptr<int[]> expectedBuff_;
|
||||
};
|
||||
|
||||
void AllToAllTestEngine::allocateBuffer() {
|
||||
sendBuff_ = mscclpp::allocSharedCuda<int>(args_.maxBytes / sizeof(int));
|
||||
recvBuff_ = mscclpp::allocSharedCuda<int>(args_.maxBytes / sizeof(int));
|
||||
expectedBuff_ = std::shared_ptr<int[]>(new int[args_.maxBytes / sizeof(int)]);
|
||||
|
||||
localSendBuff = sendBuff_.get();
|
||||
localRecvBuff = recvBuff_.get();
|
||||
}
|
||||
|
||||
void AllToAllTestEngine::setupConnections() {
|
||||
std::vector<mscclpp::channel::SimpleDeviceChannel> devChannels;
|
||||
setupMeshConnections(devChannels, sendBuff_.get(), args_.maxBytes, recvBuff_.get(), args_.maxBytes);
|
||||
|
||||
assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel));
|
||||
CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(),
|
||||
sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size()));
|
||||
}
|
||||
|
||||
std::vector<void*> AllToAllTestEngine::getSendBuff() { return {sendBuff_.get()}; }
|
||||
void* AllToAllTestEngine::getExpectedBuff() { return expectedBuff_.get(); }
|
||||
void* AllToAllTestEngine::getRecvBuff() { return recvBuff_.get(); }
|
||||
|
||||
std::shared_ptr<BaseTestEngine> getTestEngine() { return std::make_shared<AllToAllTestEngine>(); }
|
||||
std::shared_ptr<BaseTestColl> getTestColl() { return std::make_shared<AllToAllTestColl>(); }
|
||||
@@ -227,6 +227,55 @@ size_t BaseTestEngine::checkData() {
|
||||
return nErrors;
|
||||
}
|
||||
|
||||
// Create mesh connections between all ranks. If recvBuff is nullptr, assume in-place.
|
||||
void BaseTestEngine::setupMeshConnections(std::vector<mscclpp::channel::SimpleDeviceChannel>& devChannels,
|
||||
void* sendBuff, size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) {
|
||||
const int worldSize = args_.totalRanks;
|
||||
const int rank = args_.rank;
|
||||
const int nRanksPerNode = args_.nRanksPerNode;
|
||||
const int thisNode = rank / nRanksPerNode;
|
||||
const mscclpp::Transport ibTransport = IBs[args_.gpuNum];
|
||||
const bool isOutPlace = (recvBuff != nullptr);
|
||||
|
||||
std::vector<mscclpp::channel::ChannelId> channelIds;
|
||||
std::vector<mscclpp::RegisteredMemory> localMemories;
|
||||
std::vector<mscclpp::RegisteredMemory> localTmpMemories;
|
||||
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemories;
|
||||
|
||||
auto rankToNode = [&](int rank) { return rank / nRanksPerNode; };
|
||||
for (int r = 0; r < worldSize; r++) {
|
||||
if (r == rank) {
|
||||
continue;
|
||||
}
|
||||
mscclpp::Transport transport;
|
||||
if (rankToNode(r) == thisNode) {
|
||||
transport = mscclpp::Transport::CudaIpc;
|
||||
} else {
|
||||
transport = ibTransport;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
channelIds.push_back(chanService_->addChannel(comm_->connectOnSetup(r, 0, transport)));
|
||||
auto sendMemory = comm_->registerMemory(sendBuff, sendBuffBytes, mscclpp::Transport::CudaIpc | ibTransport);
|
||||
localMemories.push_back(sendMemory);
|
||||
if (isOutPlace) {
|
||||
auto recvMemory = comm_->registerMemory(recvBuff, recvBuffBytes, mscclpp::Transport::CudaIpc | ibTransport);
|
||||
comm_->sendMemoryOnSetup(recvMemory, r, 0);
|
||||
localTmpMemories.push_back(recvMemory);
|
||||
} else {
|
||||
comm_->sendMemoryOnSetup(sendMemory, r, 0);
|
||||
}
|
||||
remoteMemories.push_back(comm_->recvMemoryOnSetup(r, 0));
|
||||
}
|
||||
comm_->setup();
|
||||
|
||||
for (size_t i = 0; i < channelIds.size(); ++i) {
|
||||
devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(
|
||||
chanService_->deviceChannel(channelIds[i]), chanService_->addMemory(remoteMemories[i].get()),
|
||||
chanService_->addMemory(localMemories[i]), remoteMemories[i].get().data(), localMemories[i].data(),
|
||||
(isOutPlace ? localTmpMemories[i].data() : nullptr)));
|
||||
}
|
||||
}
|
||||
|
||||
void run(int argc, char* argv[]);
|
||||
int main(int argc, char* argv[]) {
|
||||
// Make sure everyline is flushed so that we see the progress of the test
|
||||
|
||||
@@ -41,7 +41,7 @@ class BaseTestColl {
|
||||
virtual ~BaseTestColl() {}
|
||||
virtual void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) = 0;
|
||||
virtual void runColl(const TestArgs& args, cudaStream_t stream) = 0;
|
||||
virtual void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) = 0;
|
||||
virtual void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) = 0;
|
||||
|
||||
void setupCollTest(const TestArgs& args, size_t size);
|
||||
size_t getSendBytes() { return sendCount_ * typeSize_; }
|
||||
@@ -83,6 +83,9 @@ class BaseTestEngine {
|
||||
double benchTime();
|
||||
|
||||
protected:
|
||||
void setupMeshConnections(std::vector<mscclpp::channel::SimpleDeviceChannel>& devChannels, void* sendBuff,
|
||||
size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0);
|
||||
|
||||
TestArgs args_;
|
||||
std::shared_ptr<BaseTestColl> coll_;
|
||||
std::shared_ptr<mscclpp::Communicator> comm_;
|
||||
|
||||
@@ -54,7 +54,7 @@ class SendRecvTestColl : public BaseTestColl {
|
||||
|
||||
void runColl(const TestArgs& args, cudaStream_t stream) override;
|
||||
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
|
||||
void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void setupCollTest(size_t size) override;
|
||||
};
|
||||
|
||||
@@ -65,9 +65,9 @@ void SendRecvTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
kernel<<<blockNum, BLOCK_THREADS_NUM, 0, stream>>>(args.rank, sendBytes, bytesPerBlock);
|
||||
}
|
||||
|
||||
void SendRecvTestColl::getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) {
|
||||
void SendRecvTestColl::getBw(const double deltaSec, double& algBw /*OUT*/, double& busBw /*OUT*/) {
|
||||
double baseBw = (double)(paramCount_ * typeSize_) / 1.0E9 / deltaSec;
|
||||
algBW = baseBw;
|
||||
algBw = baseBw;
|
||||
double factor = 1;
|
||||
busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user