Integrate allReduceTest to mscclpp-test (#57)

This commit is contained in:
Binyang2014
2023-04-23 13:32:01 +08:00
committed by GitHub
parent 3d767f3367
commit d338c6e701
7 changed files with 298 additions and 332 deletions

View File

@@ -153,7 +153,7 @@ TESTSOBJTARGETS := $(TESTSOBJS:%=$(BUILDDIR)/$(OBJDIR)/%)
TESTSBINS := $(patsubst %.o,$(BUILDDIR)/$(BINDIR)/%,$(TESTSOBJS))
MSCLLPPTESTSOBJSDIR:= $(BUILDDIR)/$(OBJDIR)/$(TESTSDIR)
MSCLLPPTESTBINFILESLIST := allgather_test ring_send_recv_test
MSCLLPPTESTBINFILESLIST := allgather_test allreduce_test ring_send_recv_test
MSCLLPPTESTBINS := $(MSCLLPPTESTBINFILESLIST:%=$(BUILDDIR)/$(BINDIR)/$(TESTSDIR)/%_perf)
INCLUDE := -Isrc -Isrc/include

View File

@@ -99,6 +99,7 @@ void* mscclppProxyService(void* _args)
struct mscclppComm* comm = args->comm;
// from this point on, proxy thread will stay close to the device
PROXYCUDACHECK(cudaSetDevice(comm->cudaDev));
PROXYMSCCLPPCHECK(numaBind(comm->devNumaNode));
volatile mscclppProxyRunState_t* run = &args->proxyState->run;

View File

@@ -215,6 +215,6 @@ testResult_t AllGatherRunTest(struct testArgs* args)
return testSuccess;
}
struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest};
struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest, nullptr, nullptr};
#pragma weak mscclppTestEngine = allGatherEngine

View File

@@ -1,322 +0,0 @@
#include "mscclpp.h"
#include <cuda/barrier>
#include <tuple>
#include <vector>
#include "common.h"
#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
#define CUDACHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while (false)
struct Volume
{
size_t offset;
size_t size;
};
__host__ __device__ Volume chunkVolume(size_t totalSize, size_t totalChunks, size_t chunkIdx, size_t chunkCount)
{
size_t remainder = totalSize % totalChunks;
size_t smallChunk = totalSize / totalChunks;
size_t largeChunk = smallChunk + 1;
size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
size_t numSmallChunks = chunkCount - numLargeChunks;
size_t offset =
(remainder - numLargeChunks) * largeChunk + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunk;
return Volume{offset, numLargeChunks * largeChunk + numSmallChunks * smallChunk};
}
template <class T, void (*reduce)(T*, T*, size_t)> struct AllreduceAllpairs
{
int rank;
int nRanks;
T* userData;
size_t userSize;
T* scratch;
size_t scratchSize;
mscclppDevConn_t* conns;
uint64_t* connFlags;
cuda::barrier<cuda::thread_scope_device>* barrier;
__device__ void run(int idx)
{
int myPeer = peerRank(idx, rank);
mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(myPeer)];
mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(myPeer)];
mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(myPeer)];
// 1st communication phase: send data to the scratch buffer of the peer associated with this block
Volume toPeer = chunkVolume(userSize, nRanks, myPeer, 1);
// Now we need to figure out the offset of this chunk in the scratch buffer of the destination.
// The destination will have allocated a scratch buffer of size numPeers() * toPeer.size and
// inside that each of the destination's peers send to the nth chunk, where n is the index of the
// source peer from the destination's perspective.
size_t dstOffset = peerIdx(rank, myPeer) * toPeer.size;
send(phase1SendConn, toPeer.offset, dstOffset, toPeer.size);
recv(phase1RecvConn);
if (threadIdx.x == 0)
barrier->arrive_and_wait();
__syncthreads();
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
Volume rankUserChunk = chunkVolume(userSize, nRanks, rank, 1);
T* userChunk = userData + rankUserChunk.offset;
Volume blockUserChunk = chunkVolume(rankUserChunk.size, numBlocks(), idx, 1);
for (int peerIdx = 0; peerIdx < numPeers(); ++peerIdx) {
assert(scratchSize % numPeers() == 0);
assert(scratchSize / numPeers() == rankUserChunk.size);
size_t scratchChunkSize = scratchSize / numPeers();
T* scratchChunk = scratch + peerIdx * scratchChunkSize;
Volume blockScratchChunk = chunkVolume(scratchChunkSize, numBlocks(), idx, 1);
assert(blockScratchChunk.size == blockUserChunk.size);
reduce(userChunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
}
if (threadIdx.x == 0)
barrier->arrive_and_wait();
__syncthreads();
// 2nd communication phase: send the now reduced data between the user buffers
Volume srcVolume2 = chunkVolume(userSize, nRanks, rank, 1);
send(phase2Conn, srcVolume2.offset, srcVolume2.offset, srcVolume2.size);
recv(phase2Conn);
}
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size)
{
if (threadIdx.x == 0) {
volatile uint64_t* localFlag = conn.localFlag;
*localFlag = 1; // 1 is used to signal the send
mscclppTrigger_t trigger;
auto request = conn.fifo.getTrigger(&trigger);
conn.fifo.setTrigger(trigger, mscclppData | mscclppFlag, srcOffset * sizeof(T), dstOffset * sizeof(T),
size * sizeof(T));
}
__syncthreads();
}
__device__ void recv(mscclppDevConn_t& conn)
{
if (threadIdx.x == 0) {
volatile uint64_t* proxyFlag = conn.proxyFlag;
while (*proxyFlag != 1) {
}
*proxyFlag = 0;
}
__syncthreads();
}
__host__ __device__ int numPeers()
{
return nRanks - 1;
}
__host__ __device__ int numBlocks()
{
return numPeers();
}
__host__ __device__ int peerIdx(int peerRank, int myRank)
{
return peerRank < myRank ? peerRank : peerRank - 1;
}
__host__ __device__ int peerRank(int peerIdx, int myRank)
{
return peerIdx < myRank ? peerIdx : peerIdx + 1;
}
__host__ __device__ int phase1SendConnIdx(int peerRank)
{
return peerIdx(peerRank, rank) * 3;
}
__host__ __device__ int phase1RecvConnIdx(int peerRank)
{
return peerIdx(peerRank, rank) * 3 + 1;
}
__host__ __device__ int phase2ConnIdx(int peerRank)
{
return peerIdx(peerRank, rank) * 3 + 2;
}
void freeGPUResources()
{
if (scratch)
CUDACHECK(cudaFree(scratch));
scratch = nullptr;
if (connFlags)
CUDACHECK(cudaFree(connFlags));
connFlags = nullptr;
if (conns)
CUDACHECK(cudaFree(conns));
conns = nullptr;
if (barrier)
CUDACHECK(cudaFree(barrier));
barrier = nullptr;
}
};
// The builder class encapsulates the
template <class T, void (*reduce)(T*, T*, size_t)> class AllreduceAllpairsBuilder
{
AllreduceAllpairs<T, reduce> d;
std::vector<mscclppDevConn_t> hostConns;
public:
// The constructor is called after the user has allocated the buffer to be allreduced
AllreduceAllpairsBuilder(T* data, size_t size)
{
d.userData = data;
d.userSize = size;
d.scratch = nullptr;
d.connFlags = nullptr;
d.conns = nullptr;
d.barrier = nullptr;
}
// connect is called after rank initialization but before connection setup
mscclppResult_t connect(mscclppComm_t comm)
{
MSCCLPPCHECK(mscclppCommRank(comm, &d.rank));
MSCCLPPCHECK(mscclppCommSize(comm, &d.nRanks));
Volume myChunks = chunkVolume(d.userSize, d.nRanks, d.rank, 1);
d.scratchSize = myChunks.size * d.numPeers();
CUDACHECK(cudaMalloc(&d.scratch, d.scratchSize * sizeof(T)));
CUDACHECK(cudaMalloc(&d.connFlags, 3 * sizeof(uint64_t)));
CUDACHECK(cudaMemset(d.connFlags, 0, 3 * sizeof(uint64_t)));
hostConns.resize(d.numPeers() * 3);
for (int peer = 0; peer < d.nRanks; ++peer) {
if (peer != d.rank) {
int sendTag = d.rank < peer ? 0 : 1;
int recvTag = d.rank < peer ? 1 : 0;
MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1SendConnIdx(peer), peer, d.userData,
d.userSize * sizeof(T), d.connFlags + 0, sendTag, mscclppTransportP2P, nullptr));
MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1RecvConnIdx(peer), peer, d.scratch,
d.scratchSize * sizeof(T), d.connFlags + 1, recvTag, mscclppTransportP2P, nullptr));
MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase2ConnIdx(peer), peer, d.userData,
d.userSize * sizeof(T), d.connFlags + 2, 2, mscclppTransportP2P, nullptr));
}
}
return mscclppSuccess;
}
// finishSetup is called after connection setup and returns an algorithm object that is ready to be passed to a GPU
// kernel
AllreduceAllpairs<T, reduce> finishSetup()
{
CUDACHECK(cudaMalloc(&d.conns, hostConns.size() * sizeof(mscclppDevConn_t)));
CUDACHECK(
cudaMemcpy(d.conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice));
CUDACHECK(cudaMalloc(&d.barrier, sizeof(cuda::barrier<cuda::thread_scope_device>)));
cuda::barrier<cuda::thread_scope_device> initBarrier(d.numBlocks());
CUDACHECK(
cudaMemcpy(d.barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
return d;
}
};
template <class T> __device__ void reduceSum(T* dst, T* src, size_t size)
{
for (int i = threadIdx.x; i < size; i += blockDim.x) {
dst[i] += src[i];
}
}
template <class T> __global__ void init(T* data, size_t size, int rank)
{
for (int i = threadIdx.x; i < size; i += blockDim.x) {
data[i] = rank;
}
}
// The main test kernel
template <class T> __global__ void testKernel(AllreduceAllpairs<T, reduceSum> d)
{
d.run(blockIdx.x);
}
int main(int argc, const char* argv[])
{
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
MPI_Init(NULL, NULL);
#endif
const char* ip_port;
int rank, world_size;
parse_arguments(argc, argv, &ip_port, &rank, &world_size);
CUDACHECK(cudaSetDevice(rank));
// Allocate and initialize 1 MB of data
int* data;
size_t dataSize = 1024 * 1024 / sizeof(int);
CUDACHECK(cudaMalloc(&data, dataSize * sizeof(int)));
init<<<1, 256>>>(data, dataSize, rank);
// Create the collective
AllreduceAllpairsBuilder<int, reduceSum> builder(data, dataSize);
// Create the communicator
mscclppComm_t comm;
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
// Connect the collective
builder.connect(comm);
// Finish the setup
MSCCLPPCHECK(mscclppConnectionSetup(comm));
MSCCLPPCHECK(mscclppProxyLaunch(comm));
auto allreduce = builder.finishSetup();
// Run the collective
testKernel<<<allreduce.numBlocks(), 256>>>(allreduce);
// Wait for kernel to finish
CUDACHECK(cudaDeviceSynchronize());
// Check the result
int* hostData = new int[dataSize];
CUDACHECK(cudaMemcpy(hostData, data, dataSize * sizeof(int), cudaMemcpyDeviceToHost));
int expectedValue = world_size * (world_size - 1) / 2;
for (size_t i = 0; i < dataSize; ++i) {
if (hostData[i] != expectedValue) {
printf("Error at index %lu: %d != %d\n", i, hostData[i], expectedValue);
return 1;
}
}
MSCCLPPCHECK(mscclppProxyStop(comm));
MSCCLPPCHECK(mscclppCommDestroy(comm));
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
if (argc == 2) {
MPI_Finalize();
}
#endif
printf("Succeeded! %d\n", rank);
return 0;
}

284
tests/allreduce_test.cu Normal file
View File

@@ -0,0 +1,284 @@
#include <cuda/barrier>
#include <tuple>
#include <vector>
#include "comm.h"
#include "common.h"
#define ALIGN 4
const int phase2Tag = 2;
mscclppDevConn_t* conns;
void* scratch = nullptr;
void* sendRecvData = nullptr;
cuda::barrier<cuda::thread_scope_device>* barrier = nullptr;
struct Chunk
{
size_t offset;
size_t size;
};
inline int getSendTag(int rank, int peer)
{
return rank < peer ? 0 : 1;
}
inline int getRecvTag(int rank, int peer)
{
return rank < peer ? 1 : 0;
}
__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount)
{
size_t remainder = dataCount % numChunks;
size_t smallChunkSize = dataCount / numChunks;
size_t largeChunkSize = smallChunkSize + 1;
size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
size_t numSmallChunks = chunkCount - numLargeChunks;
size_t offset =
(remainder - numLargeChunks) * largeChunkSize + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize;
return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize};
}
__host__ __device__ int peerIdx(int peerRank, int rank)
{
return peerRank < rank ? peerRank : peerRank - 1;
}
__host__ __device__ int peerRank(int peerIdx, int rank)
{
return peerIdx < rank ? peerIdx : peerIdx + 1;
}
__host__ __device__ int phase1SendConnIdx(int peerRank, int rank)
{
return peerIdx(peerRank, rank) * 3;
}
__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank)
{
return peerIdx(peerRank, rank) * 3 + 1;
}
__host__ __device__ int phase2ConnIdx(int peerRank, int rank)
{
return peerIdx(peerRank, rank) * 3 + 2;
}
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size)
{
if (threadIdx.x == 0) {
conn.putWithSignalAndFlush(dstOffset, srcOffset, size);
}
__syncthreads();
}
__device__ void recv(mscclppDevConn_t& conn)
{
if (threadIdx.x == 0) {
conn.wait();
}
__syncthreads();
}
__device__ void reduceSum(int* dst, int* src, size_t size)
{
for (int i = threadIdx.x; i < size; i += blockDim.x) {
dst[i] += src[i];
}
}
__global__ void initData(int* data, size_t size, int rank)
{
for (int i = threadIdx.x; i < size; i += blockDim.x) {
data[i] = rank;
}
}
__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount,
mscclppDevConn_t* conns, void* scratch, void* sendRecvData,
cuda::barrier<cuda::thread_scope_device>* barrier)
{
int idx = blockIdx.x;
int peer = peerRank(idx, rank);
mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)];
mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(peer, rank)];
mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(peer, rank)];
// 1st communication phase: send data to the scratch buffer of the peer associated with this block
Chunk toPeerChunk = getChunk(dataCount, nRanks, peer, 1);
// Now we need to figure out the offset of this chunk in the scratch buffer of the destination.
// The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and
// inside that each of the destination's peers send to the nth chunk, where n is the index of the
// source peer from the destination's perspective.
size_t dstOffset = peerIdx(rank, peer) * toPeerChunk.size;
send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int));
recv(phase1RecvConn);
if (threadIdx.x == 0)
barrier->arrive_and_wait();
__syncthreads();
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
Chunk rankChunk = getChunk(dataCount, nRanks, rank, 1);
int* chunk = (int*)sendRecvData + rankChunk.offset;
int numPeers = nRanks - 1, numBlocks = nRanks - 1;
Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, idx, 1);
for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) {
assert(scratchDataCount % numPeers == 0);
assert(scratchDataCount / numPeers == rankChunk.size);
size_t scratchDataCountPerPeer = scratchDataCount / numPeers;
int* scratchChunk = (int*)scratch + peerIdx * scratchDataCountPerPeer;
Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, idx, 1);
assert(blockScratchChunk.size == blockUserChunk.size);
reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
}
if (threadIdx.x == 0)
barrier->arrive_and_wait();
__syncthreads();
// 2nd communication phase: send the now reduced data between the user buffers
Chunk collectionChunk = getChunk(dataCount, nRanks, rank, 1);
send(phase2Conn, collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int),
collectionChunk.size * sizeof(int));
recv(phase2Conn);
}
void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
size_t* recvInplaceOffset, size_t count, int nranks)
{
size_t base = (count / ALIGN) * ALIGN;
*sendcount = base;
*recvcount = base;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = base;
}
void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
{
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
AllReduceGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AllReduceInitData(struct testArgs* args, int in_place)
{
size_t recvcount = args->expectedBytes / sizeof(int);
CUDACHECK(cudaSetDevice(args->gpuNum));
CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes));
initData<<<1, 256>>>((int*)args->recvbuff, recvcount, args->proc);
int* dataHost = new int[recvcount];
for (size_t i = 0; i < recvcount; i++) {
dataHost[i] = args->totalProcs * (args->totalProcs - 1) / 2;
}
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
delete dataHost;
CUDACHECK(cudaDeviceSynchronize());
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
return testSuccess;
}
void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
{
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = (2 * (double)(nranks - 1)) / ((double)nranks);
*busBw = baseBw * factor;
}
testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm,
cudaStream_t stream, int kernelNum)
{
int worldSize = comm->nRanks;
int nPeers = worldSize - 1;
int dataCount = nBytes / sizeof(int);
Chunk chunk = getChunk(dataCount, worldSize, comm->rank, 1);
size_t scratchDataCount = chunk.size * nPeers;
allReduceKernel0<<<worldSize - 1, 256, 0, stream>>>(comm->rank, worldSize, dataCount, scratchDataCount, conns,
scratch, sendRecvData, barrier);
return testSuccess;
}
struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, AllReduceInitData, AllReduceGetBw,
AllReduceRunColl};
testResult_t AllReduceSetupMscclppConnections(struct testArgs* args)
{
int rank = args->proc, worldSize = args->totalProcs;
size_t bufferSize = args->maxbytes;
Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1);
int nPeers = args->totalProcs - 1;
size_t scratchBytes = chunk.size * nPeers * sizeof(int);
CUDACHECK(cudaMalloc(&scratch, scratchBytes));
for (int peer = 0; peer < worldSize; ++peer) {
if (peer != args->proc) {
int sendTag = getSendTag(args->proc, peer);
int recvTag = getRecvTag(args->proc, peer);
MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr));
MSCCLPPCHECK(
mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
}
}
MSCCLPPCHECK(mscclppConnectionSetup(args->comm));
return testSuccess;
}
testResult_t AllReduceTeardownMscclppConnections()
{
if (scratch != nullptr) {
CUDACHECK(cudaFree(scratch));
scratch = nullptr;
}
return testSuccess;
}
testResult_t AllReduceRunTest(struct testArgs* args)
{
args->collTest = &allReduceTest;
sendRecvData = args->recvbuff;
CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier<cuda::thread_scope_device>)));
cuda::barrier<cuda::thread_scope_device> initBarrier(args->totalProcs - 1);
CUDACHECK(
cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
int nPeers = args->totalProcs - 1;
int rank = args->proc;
std::vector<mscclppDevConn_t> hostConns(nPeers * 3, mscclppDevConn_t());
for (int peer = 0; peer < args->totalProcs; ++peer) {
mscclppDevConn_t* devConn;
if (peer != rank) {
int sendTag = getSendTag(args->proc, peer);
int recvTag = getRecvTag(args->proc, peer);
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, sendTag, &devConn));
hostConns[phase1SendConnIdx(peer, rank)] = *devConn;
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, recvTag, &devConn));
hostConns[phase1RecvConnIdx(peer, rank)] = *devConn;
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, phase2Tag, &devConn));
hostConns[phase2ConnIdx(peer, rank)] = *devConn;
}
}
CUDACHECK(cudaMalloc(&conns, nPeers * 3 * sizeof(mscclppDevConn_t)));
CUDACHECK(cudaMemcpy(conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice));
TESTCHECK(TimeTest(args));
CUDACHECK(cudaFree(barrier));
CUDACHECK(cudaFree(conns));
return testSuccess;
}
struct testEngine allReduceEngine = {AllReduceGetBuffSize, AllReduceRunTest, AllReduceSetupMscclppConnections,
AllReduceTeardownMscclppConnections};
#pragma weak mscclppTestEngine = allReduceEngine

View File

@@ -224,12 +224,6 @@ testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts)
CUDACHECK(cudaMemcpy(dataHostRecv, args->recvbuff, args->expectedBytes, cudaMemcpyDeviceToHost));
CUDACHECK(cudaMemcpy(dataHostExpected, args->expected, args->expectedBytes, cudaMemcpyDeviceToHost));
for (size_t i = 0; i < count; i++) {
if (dataHostRecv[i] != dataHostExpected[i]) {
*wrongElts += 1;
}
}
if (args->reportErrors && *wrongElts) {
(args->error)++;
}
@@ -415,13 +409,20 @@ testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode,
testResult_t runTests(struct testArgs* args)
{
PRINT("# Setting up the connection in MSCCL++\n");
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
args->maxbytes));
if (mscclppTestEngine.setupMscclppConnections != nullptr) {
TESTCHECK(mscclppTestEngine.setupMscclppConnections(args));
} else {
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
args->maxbytes));
}
PRINT("# Launching MSCCL++ proxy threads\n");
MSCCLPPCHECK(mscclppProxyLaunch(args->comm));
TESTCHECK(mscclppTestEngine.runTest(args));
PRINT("Stopping MSCCL++ proxy threads\n");
MSCCLPPCHECK(mscclppProxyStop(args->comm));
if (mscclppTestEngine.teardownMscclppConnections != nullptr) {
TESTCHECK(mscclppTestEngine.teardownMscclppConnections());
}
return testSuccess;
}

View File

@@ -80,6 +80,8 @@ struct testEngine
void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks);
// We can add more parameters for other communication primitives
testResult_t (*runTest)(struct testArgs* args);
testResult_t (*setupMscclppConnections)(struct testArgs* args);
testResult_t (*teardownMscclppConnections)();
};
extern struct testEngine mscclppTestEngine;