mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 09:17:06 +00:00
Add allgather test to mscclpp-test (#78)
Finish allGather Co-authored-by: Changho Hwang <changhohwang@microsoft.com>
This commit is contained in:
@@ -1,220 +0,0 @@
|
||||
#include "comm.h"
|
||||
#include "common.h"
|
||||
|
||||
#include <cuda_runtime.h>
|
||||
#include <string>
|
||||
|
||||
#define ALIGN 4
|
||||
__constant__ mscclppDevConn_t constDevConns[16];
|
||||
|
||||
__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU)
|
||||
{
|
||||
// this allgather is really simple and implemented as an alltoall
|
||||
|
||||
// this thread's role is a sender role
|
||||
// put your data asynchronously
|
||||
if (threadIdx.x % 32 == 0)
|
||||
devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
|
||||
// make sure everyone is put their data before some thread randomly blocks everyone else in signal
|
||||
__syncthreads();
|
||||
// push with flag and sync to make sure the data is received
|
||||
if (threadIdx.x % 32 == 0)
|
||||
devConn.flush();
|
||||
|
||||
// this thread's role is a receiver role. wait on the semaphore to make sure the data is ready
|
||||
if (threadIdx.x % 32 == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
uint64_t offset, uint64_t size)
|
||||
{
|
||||
// this allgather algorithm works as follows:
|
||||
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
|
||||
// and waits for data from GPU rank (i-1) % nranksPerNode
|
||||
// Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode
|
||||
// ...
|
||||
// This order is much better for DMA engine for NVLinks
|
||||
for (int i = 1; i < nranksPerNode; i++) {
|
||||
if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) {
|
||||
// put your data to GPU (rank+i) % nranksPerNode and signal in one call
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush(offset, size);
|
||||
}
|
||||
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
|
||||
if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) {
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory");
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
size_t nelemsPerGPU)
|
||||
{
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
|
||||
__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
size_t nelemsPerGPU)
|
||||
{
|
||||
// this allgather is a pipelined and hierarchical one and only works for two nodes
|
||||
// it is implemented as follows:
|
||||
// Step 1: each node does a local allgather and concurrently,
|
||||
// local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with
|
||||
// its cross-node neighbor (local GPU i on the other node) via IB
|
||||
// Step 2: each node does a local allgather again with the data just received from its
|
||||
// cross-node neighbor in step 1, and concurrently, exchange the rest of the data with
|
||||
// its cross-node neighbor
|
||||
// Step 3: each node does a local allgather for the last time with the rest of the data
|
||||
|
||||
int pipelineSize = 3;
|
||||
|
||||
// Step 1
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 2
|
||||
// local allgather
|
||||
int otherNghr = (rank + nranksPerNode) % world_size;
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
}
|
||||
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) *
|
||||
sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 3
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank,
|
||||
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel)
|
||||
{
|
||||
// find the mapping between remoteRank and devConns
|
||||
int warpId = threadIdx.x / 32;
|
||||
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
|
||||
// Each warp is responsible for one of the remote ranks
|
||||
mscclppDevConn_t devConn = constDevConns[warpId];
|
||||
|
||||
if (kernel == 0)
|
||||
allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 1)
|
||||
allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 2)
|
||||
allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
}
|
||||
|
||||
void AllGatherGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks)
|
||||
{
|
||||
size_t base = (count / (ALIGN * nranks)) * ALIGN;
|
||||
*sendcount = base;
|
||||
*recvcount = base * nranks;
|
||||
*sendInplaceOffset = base;
|
||||
*recvInplaceOffset = 0;
|
||||
*paramcount = base;
|
||||
}
|
||||
|
||||
testResult_t AllGatherInitData(struct testArgs* args, int in_place)
|
||||
{
|
||||
size_t sendcount = args->sendBytes / sizeof(int);
|
||||
size_t recvcount = args->expectedBytes / sizeof(int);
|
||||
// int nranks = args->totalProcs;
|
||||
|
||||
CUDACHECK(cudaSetDevice(args->gpuNum));
|
||||
int rank = args->proc;
|
||||
CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes));
|
||||
// void* data = in_place ? ((char*)args->recvbuffs[0]) + rank * args->sendBytes : args->sendbuffs[0];
|
||||
|
||||
int* dataHost = new int[recvcount];
|
||||
for (size_t i = 0; i < recvcount; i++) {
|
||||
int val = i + 1;
|
||||
if (i / sendcount == (size_t)rank) {
|
||||
dataHost[i] = val;
|
||||
} else {
|
||||
dataHost[i] = 0;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(args->recvbuff, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
for (int i = 0; i < static_cast<int>(recvcount); i++) {
|
||||
dataHost[i] = i + 1;
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
delete dataHost;
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
|
||||
{
|
||||
double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec;
|
||||
|
||||
*algBw = baseBw;
|
||||
double factor = ((double)(nranks - 1)) / ((double)nranks);
|
||||
*busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
|
||||
cudaStream_t stream, int kernel_num)
|
||||
{
|
||||
int worldSize = comm->nRanks;
|
||||
kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(comm->rank, worldSize, nranksPerNode, count / sizeof(int), kernel_num);
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, defaultInitColl, AllGatherInitData,
|
||||
AllGatherGetBw, AllGatherRunColl};
|
||||
|
||||
void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
|
||||
{
|
||||
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
|
||||
AllGatherGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
|
||||
}
|
||||
|
||||
testResult_t AllGatherRunTest(struct testArgs* args)
|
||||
{
|
||||
args->collTest = &allGatherTest;
|
||||
mscclppDevConn_t* devConns;
|
||||
int nCons;
|
||||
MSCCLPPCHECK(mscclppGetAllDeviceConnections(args->comm, &devConns, &nCons));
|
||||
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons));
|
||||
TESTCHECK(TimeTest(args));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest, nullptr, nullptr};
|
||||
|
||||
#pragma weak mscclppTestEngine = allGatherEngine
|
||||
@@ -1,501 +0,0 @@
|
||||
#include "mscclpp.h"
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include "mpi.h"
|
||||
#endif // MSCCLPP_USE_MPI_FOR_TESTS
|
||||
#include <iostream>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <unordered_map>
|
||||
|
||||
static int nranksPerNode = 8;
|
||||
|
||||
// Propagate errors up
|
||||
|
||||
#define MSCCLPPCHECK(call) \
|
||||
do { \
|
||||
mscclppResult_t res = call; \
|
||||
if (res != mscclppSuccess && res != mscclppInProgress) { \
|
||||
/* Print the back trace*/ \
|
||||
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
|
||||
return res; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
// Check CUDA RT calls
|
||||
#define CUDACHECK(cmd) \
|
||||
do { \
|
||||
cudaError_t err = cmd; \
|
||||
if (err != cudaSuccess) { \
|
||||
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while (false)
|
||||
|
||||
// Measure current time in second.
|
||||
static double getTime(void)
|
||||
{
|
||||
struct timespec tspec;
|
||||
if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) {
|
||||
printf("clock_gettime failed\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
|
||||
}
|
||||
|
||||
__constant__ mscclppDevConn_t constDevConns[16];
|
||||
|
||||
__device__ void allgather0(mscclppDevConn_t devConn, int rank, int world_size, int remoteRank, size_t nelemsPerGPU)
|
||||
{
|
||||
// this allgather is really simple and implemented as an alltoall
|
||||
|
||||
// this thread's role is a sender role
|
||||
// put your data asynchronously
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
|
||||
// make sure everyone is put their data before some thread randomly blocks everyone else in signal
|
||||
__syncthreads();
|
||||
// push with flag and sync to make sure the data is received
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.flush();
|
||||
|
||||
// this thread's role is a receiver role. wait on the semaphore to make sure the data is ready
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__device__ void localAllGather(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
uint64_t offset, uint64_t size)
|
||||
{
|
||||
// this allgather algorithm works as follows:
|
||||
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
|
||||
// and waits for data from GPU rank (i-1) % nranksPerNode
|
||||
// Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode
|
||||
// ...
|
||||
// This order is much better for DMA engine for NVLinks
|
||||
for (int i = 1; i < nranksPerNode; i++) {
|
||||
if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) {
|
||||
// put your data to GPU (rank+i) % nranksPerNode and signal in one call
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush(offset, size);
|
||||
}
|
||||
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
|
||||
if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) {
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory");
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void allgather1(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
size_t nelemsPerGPU)
|
||||
{
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
|
||||
__device__ void allgather2(mscclppDevConn_t devConn, int rank, int world_size, int nranksPerNode, int remoteRank,
|
||||
size_t nelemsPerGPU)
|
||||
{
|
||||
// this allgather is a pipelined and hierarchical one and only works for two nodes
|
||||
// it is implemented as follows:
|
||||
// Step 1: each node does a local allgather and concurrently,
|
||||
// local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with
|
||||
// its cross-node neighbor (local GPU i on the other node) via IB
|
||||
// Step 2: each node does a local allgather again with the data just received from its
|
||||
// cross-node neighbor in step 1, and concurrently, exchange the rest of the data with
|
||||
// its cross-node neighbor
|
||||
// Step 3: each node does a local allgather for the last time with the rest of the data
|
||||
|
||||
int pipelineSize = 3;
|
||||
|
||||
// Step 1
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 2
|
||||
// local allgather
|
||||
int otherNghr = (rank + nranksPerNode) % world_size;
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
}
|
||||
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.putWithSignalAndFlush((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) *
|
||||
sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devConn.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 3
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devConn, rank, world_size, nranksPerNode, remoteRank,
|
||||
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelemsPerGPU, int kernel)
|
||||
{
|
||||
// find the mapping between remoteRank and devConns
|
||||
int warpId = threadIdx.x / 32;
|
||||
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
|
||||
// Each warp is responsible for one of the remote ranks
|
||||
mscclppDevConn_t devConn = constDevConns[warpId];
|
||||
|
||||
if (kernel == 0)
|
||||
allgather0(devConn, rank, world_size, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 1)
|
||||
allgather1(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 2)
|
||||
allgather2(devConn, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
}
|
||||
|
||||
int rankToLocalRank(int rank)
|
||||
{
|
||||
return rank % nranksPerNode;
|
||||
}
|
||||
|
||||
int rankToNode(int rank)
|
||||
{
|
||||
return rank / nranksPerNode;
|
||||
}
|
||||
|
||||
void print_usage(const char* prog)
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
printf("usage: %s IP:PORT [rank nranks]\n", prog);
|
||||
#else
|
||||
printf("usage: %s IP:PORT rank nranks\n", prog);
|
||||
#endif
|
||||
}
|
||||
|
||||
void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
|
||||
int** data_d)
|
||||
{
|
||||
CUDACHECK(cudaMalloc(data_d, dataSize));
|
||||
CUDACHECK(cudaMemset(*data_d, 0, dataSize));
|
||||
|
||||
*data_h = new int[nelemsPerGPU * world_size];
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int val = i + 1;
|
||||
if (i / nelemsPerGPU == (size_t)rank) {
|
||||
(*data_h)[i] = val;
|
||||
} else {
|
||||
(*data_h)[i] = 0;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
|
||||
}
|
||||
|
||||
mscclppResult_t setupMscclppConnections(int rank, int world_size, mscclppComm_t comm, int* data_d, size_t dataSize)
|
||||
{
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = rankToLocalRank(rank);
|
||||
std::string ibDevStr = "mlx5_ib" + std::to_string(cudaNum);
|
||||
|
||||
for (int r = 0; r < world_size; ++r) {
|
||||
if (r == rank)
|
||||
continue;
|
||||
mscclppTransport_t transportType;
|
||||
const char* ibDev = ibDevStr.c_str();
|
||||
if (rankToNode(r) == thisNode) {
|
||||
ibDev = NULL;
|
||||
transportType = mscclppTransportP2P;
|
||||
} else {
|
||||
transportType = mscclppTransportIB;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
MSCCLPPCHECK(mscclppConnect(comm, r, 0, data_d, dataSize, transportType, ibDev));
|
||||
}
|
||||
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(comm));
|
||||
|
||||
mscclppDevConn_t* devConns;
|
||||
int nCons;
|
||||
MSCCLPPCHECK(mscclppGetAllDeviceConnections(comm, &devConns, &nCons));
|
||||
|
||||
CUDACHECK(cudaMemcpyToSymbol(constDevConns, devConns, sizeof(mscclppDevConn_t) * nCons));
|
||||
|
||||
return mscclppSuccess;
|
||||
}
|
||||
|
||||
void printUsage(const char* prog, bool isMpi)
|
||||
{
|
||||
if (isMpi) {
|
||||
std::string st = "you are using MPI for this test\n";
|
||||
st += "two possilbe usages are:\n";
|
||||
st += "> " + std::string(prog) + "\n";
|
||||
st += "or\n";
|
||||
st += "> " + std::string(prog) + " -ip_port [ip:port]\n";
|
||||
printf("%s", st.c_str());
|
||||
} else {
|
||||
std::string st = "you are NOT using MPI for this test\n";
|
||||
st += "the only possible usage:\n";
|
||||
st += "> " + std::string(prog) + " -ip_port [ip:port] -rank [rank] -nranks [nranks]\n";
|
||||
printf("%s", st.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::string> parseArgs(int argc, const char* argv[], bool isMpi)
|
||||
{
|
||||
std::unordered_map<std::string, std::string> options;
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
std::string arg = argv[i];
|
||||
if (arg == "-rankspernode") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -rankspernode should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["rankspernode"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -rankspernode option requires an argument.\n");
|
||||
;
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-kernel") {
|
||||
if (i + 1 < argc) {
|
||||
options["kernel"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -kernel option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-ip_port") {
|
||||
if (i + 1 < argc) {
|
||||
options["ip_port"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-rank") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -rank should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["rank"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-nranks") {
|
||||
if (isMpi) {
|
||||
fprintf(stderr, "Error: -nranks should not be specified with MPI.\n");
|
||||
exit(-1);
|
||||
}
|
||||
if (i + 1 < argc) {
|
||||
options["nranks"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -ip_port option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-datasize") {
|
||||
if (i + 1 < argc) {
|
||||
options["datasize"] = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "Error: -datasize option requires an argument.\n");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (arg == "-help" || arg == "-h") {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(0);
|
||||
} else {
|
||||
fprintf(stderr, "Error: Unknown option %s\n", argv[i]);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
bool isMpi = false;
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
isMpi = true;
|
||||
#endif
|
||||
|
||||
auto parsedArgs = parseArgs(argc, argv, isMpi);
|
||||
|
||||
int rank;
|
||||
int world_size;
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Init(NULL, NULL);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||
// get the local number of nodes with MPI
|
||||
MPI_Comm shmcomm;
|
||||
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
|
||||
int shmrank;
|
||||
MPI_Comm_size(shmcomm, &shmrank);
|
||||
nranksPerNode = shmrank;
|
||||
MPI_Comm_free(&shmcomm);
|
||||
#else
|
||||
if (parsedArgs.find("rank") == parsedArgs.end() || parsedArgs.find("nranks") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
rank = std::stoi(parsedArgs["rank"]);
|
||||
world_size = std::stoi(parsedArgs["nranks"]);
|
||||
if (parsedArgs.find("rankspernode") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
nranksPerNode = std::stoi(parsedArgs["rankspernode"]);
|
||||
#endif
|
||||
int kernelNum = 0;
|
||||
if (parsedArgs.find("kernel") != parsedArgs.end()) {
|
||||
kernelNum = std::stoi(parsedArgs["kernel"]);
|
||||
}
|
||||
char* ip_port = NULL;
|
||||
if (parsedArgs.find("ip_port") == parsedArgs.end()) {
|
||||
printUsage(argv[0], isMpi);
|
||||
exit(-1);
|
||||
}
|
||||
ip_port = (char*)parsedArgs["ip_port"].c_str();
|
||||
|
||||
int thisNode = rankToNode(rank);
|
||||
int cudaNum = rankToLocalRank(rank);
|
||||
CUDACHECK(cudaSetDevice(cudaNum));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Initializing MSCCL++\n");
|
||||
mscclppComm_t comm;
|
||||
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, ip_port, rank));
|
||||
|
||||
int* data_d;
|
||||
int* data_h;
|
||||
size_t dataSize = 1024 * 1024 * 1024;
|
||||
if (parsedArgs.find("datasize") != parsedArgs.end()) {
|
||||
dataSize = std::stoul(parsedArgs["datasize"]);
|
||||
}
|
||||
size_t nelemsPerGPU = dataSize / sizeof(int) / world_size;
|
||||
|
||||
if (rank == 0)
|
||||
printf("Initializing data for allgather test\n");
|
||||
initializeAndAllocateAllGatherData(rank, world_size, dataSize, nelemsPerGPU, &data_h, &data_d);
|
||||
|
||||
if (rank == 0)
|
||||
printf("Setting up the connection in MSCCL++\n");
|
||||
MSCCLPPCHECK(setupMscclppConnections(rank, world_size, comm, data_d, dataSize));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Launching MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(comm));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Testing the correctness of AllGather implementation\n");
|
||||
cudaStream_t stream;
|
||||
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
CUDACHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));
|
||||
|
||||
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
|
||||
int val = i + 1;
|
||||
if (data_h[i] != val) {
|
||||
printf("oh uh! data_h[%ld] (%d) != val (%d)\n", i, data_h[i], val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
int tmp[16];
|
||||
// A simple barrier
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
if (rank == 0)
|
||||
printf("Successfully checked the correctness\n");
|
||||
|
||||
// Perf test
|
||||
int iterwithoutcudagraph = 10;
|
||||
if (rank == 0)
|
||||
printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
for (int i = 0; i < iterwithoutcudagraph; ++i) {
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
// cudaGraph Capture
|
||||
int cudagraphiter = 10;
|
||||
if (rank == 0)
|
||||
printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
|
||||
cudaGraph_t graph;
|
||||
cudaGraphExec_t instance;
|
||||
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
|
||||
for (int i = 0; i < cudagraphiter; ++i) {
|
||||
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
|
||||
}
|
||||
cudaStreamEndCapture(stream, &graph);
|
||||
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
|
||||
|
||||
int cudagraphwarmup = 10;
|
||||
if (rank == 0)
|
||||
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
|
||||
cudagraphiter);
|
||||
for (int i = 0; i < cudagraphwarmup; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
// measure runtime
|
||||
int cudagraphlaunch = 10;
|
||||
if (rank == 0)
|
||||
printf("Running %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphlaunch,
|
||||
cudagraphiter);
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
double t0, t1, ms, time_in_us;
|
||||
t0 = getTime();
|
||||
for (int i = 0; i < cudagraphlaunch; ++i) {
|
||||
cudaGraphLaunch(instance, stream);
|
||||
}
|
||||
CUDACHECK(cudaStreamSynchronize(stream));
|
||||
|
||||
t1 = getTime();
|
||||
ms = (t1 - t0) * 1000.0;
|
||||
time_in_us = ms * 1000. / (float)cudagraphlaunch / (float)cudagraphiter;
|
||||
printf("Rank %d report: size %lu time: %f us/iter algBW %f GBps\n", rank, dataSize, time_in_us,
|
||||
(double)(dataSize) / 1e9 / (time_in_us / 1e6));
|
||||
MSCCLPPCHECK(mscclppBootstrapAllGather(comm, tmp, sizeof(int)));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Stopping MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyStop(comm));
|
||||
|
||||
if (rank == 0)
|
||||
printf("Destroying MSCCL++ communicator\n");
|
||||
MSCCLPPCHECK(mscclppCommDestroy(comm));
|
||||
printf("Rank %d succeeded!\n", rank);
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Finalize();
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
@@ -1,2 +1,7 @@
|
||||
add_executable(sendrecv_test_perf sendrecv_test.cu common.cu)
|
||||
target_link_libraries(sendrecv_test_perf mscclpp MPI::MPI_CXX)
|
||||
function(add_mscclpp_test_executable name sources)
|
||||
add_executable(${name} ${sources} common.cu)
|
||||
target_link_libraries(${name} mscclpp MPI::MPI_CXX CUDA::cudart CUDA::cuda_driver)
|
||||
endfunction()
|
||||
|
||||
add_mscclpp_test_executable(sendrecv_test_perf sendrecv_test.cu)
|
||||
add_mscclpp_test_executable(allgather_test_perf allgather_test.cu)
|
||||
|
||||
260
test/mscclpp-test/allgather_test.cu
Normal file
260
test/mscclpp-test/allgather_test.cu
Normal file
@@ -0,0 +1,260 @@
|
||||
#include <cuda_runtime.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#define ALIGN 4
|
||||
__constant__ mscclpp::channel::SimpleDeviceChannel constDevChans[16];
|
||||
|
||||
__device__ void allgather0(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, int remoteRank,
|
||||
size_t nelemsPerGPU) {
|
||||
// this allgather is really simple and implemented as an alltoall
|
||||
|
||||
// this thread's role is a sender role
|
||||
// put your data asynchronously
|
||||
if (threadIdx.x % 32 == 0) devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int));
|
||||
// make sure everyone is put their data before some thread randomly blocks everyone else in signal
|
||||
__syncthreads();
|
||||
// push with flag and sync to make sure the data is received
|
||||
if (threadIdx.x % 32 == 0) devChan.flush();
|
||||
|
||||
// this thread's role is a receiver role. wait on the semaphore to make sure the data is ready
|
||||
if (threadIdx.x % 32 == 0) devChan.wait();
|
||||
}
|
||||
|
||||
__device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize,
|
||||
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) {
|
||||
// this allgather algorithm works as follows:
|
||||
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
|
||||
// and waits for data from GPU rank (i-1) % nranksPerNode
|
||||
// Step 2: GPU rank i sends data to GPU rank (i+2) % nranksPerNode
|
||||
// ...
|
||||
// This order is much better for DMA engine for NVLinks
|
||||
for (int i = 1; i < nranksPerNode; i++) {
|
||||
if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) {
|
||||
// put your data to GPU (rank+i) % nranksPerNode and signal in one call
|
||||
if ((threadIdx.x % 32) == 0) devChan.putWithSignalAndFlush(offset, size);
|
||||
}
|
||||
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
|
||||
if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) {
|
||||
if ((threadIdx.x % 32) == 0) devChan.wait();
|
||||
}
|
||||
asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory");
|
||||
}
|
||||
}
|
||||
|
||||
__device__ void allgather1(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, int nranksPerNode,
|
||||
int remoteRank, size_t nelemsPerGPU) {
|
||||
localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
|
||||
__device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize, int nranksPerNode,
|
||||
int remoteRank, size_t nelemsPerGPU) {
|
||||
// this allgather is a pipelined and hierarchical one and only works for two nodes
|
||||
// it is implemented as follows:
|
||||
// Step 1: each node does a local allgather and concurrently,
|
||||
// local GPU i exchange (piplineSize-1)/pipelineSize portion of their data with
|
||||
// its cross-node neighbor (local GPU i on the other node) via IB
|
||||
// Step 2: each node does a local allgather again with the data just received from its
|
||||
// cross-node neighbor in step 1, and concurrently, exchange the rest of the data with
|
||||
// its cross-node neighbor
|
||||
// Step 3: each node does a local allgather for the last time with the rest of the data
|
||||
|
||||
int pipelineSize = 3;
|
||||
|
||||
// Step 1
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
|
||||
nelemsPerGPU * sizeof(int));
|
||||
}
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devChan.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0) devChan.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 2
|
||||
// local allgather
|
||||
int otherNghr = (rank + nranksPerNode) % worldSize;
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
|
||||
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
|
||||
}
|
||||
|
||||
// cross-node exchange
|
||||
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
|
||||
// opposite side
|
||||
if ((threadIdx.x % 32) == 0)
|
||||
devChan.putWithSignalAndFlush(
|
||||
(rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
if ((threadIdx.x % 32) == 0) devChan.wait();
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
|
||||
// Step 3
|
||||
// local allgather
|
||||
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
|
||||
localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank,
|
||||
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
|
||||
nelemsPerGPU / pipelineSize * sizeof(int));
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void kernel(int rank, int worldSize, int nranksPerNode, size_t nelemsPerGPU, int kernel) {
|
||||
// find the mapping between remoteRank and devConns
|
||||
int warpId = threadIdx.x / 32;
|
||||
int remoteRank = (warpId < rank) ? warpId : warpId + 1;
|
||||
// Each warp is responsible for one of the remote ranks
|
||||
mscclpp::channel::SimpleDeviceChannel devChan = constDevChans[warpId];
|
||||
|
||||
if (kernel == 0)
|
||||
allgather0(devChan, rank, worldSize, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 1)
|
||||
allgather1(devChan, rank, worldSize, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
else if (kernel == 2)
|
||||
allgather2(devChan, rank, worldSize, nranksPerNode, remoteRank, nelemsPerGPU);
|
||||
}
|
||||
|
||||
class AllGatherTestColl : public BaseTestColl {
|
||||
public:
|
||||
AllGatherTestColl() = default;
|
||||
~AllGatherTestColl() override = default;
|
||||
|
||||
void runColl(const TestArgs& args, cudaStream_t stream) override;
|
||||
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
|
||||
void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void setupCollTest(size_t size) override;
|
||||
};
|
||||
|
||||
void AllGatherTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
const int worldSize = args.totalRanks;
|
||||
const int rank = args.rank;
|
||||
const int nRanksPerNode = args.nRanksPerNode;
|
||||
const int kernelNum = args.kernelNum;
|
||||
kernel<<<1, 32 * (worldSize - 1), 0, stream>>>(rank, worldSize, nRanksPerNode, paramCount_, kernelNum);
|
||||
}
|
||||
|
||||
void AllGatherTestColl::initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) {
|
||||
assert(sendBuff.size() == 1);
|
||||
int rank = args.rank;
|
||||
std::vector<int> dataHost(std::max(sendCount_, recvCount_), 0);
|
||||
for (size_t i = 0; i < recvCount_; i++) {
|
||||
int val = i + 1;
|
||||
if (i / sendCount_ == (size_t)rank) {
|
||||
dataHost[i] = val;
|
||||
} else {
|
||||
dataHost[i] = 0;
|
||||
}
|
||||
}
|
||||
CUDATHROW(cudaMemcpy(sendBuff[0], dataHost.data(), recvCount_ * typeSize_, cudaMemcpyHostToDevice));
|
||||
|
||||
for (size_t i = 0; i < recvCount_; i++) {
|
||||
dataHost[i] = static_cast<int>(i) + 1;
|
||||
}
|
||||
std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_);
|
||||
}
|
||||
|
||||
void AllGatherTestColl::getBw(const double deltaSec, double& algBw, double& busBw) {
|
||||
double baseBw = (double)(paramCount_ * typeSize_ * worldSize_) / 1.0E9 / deltaSec;
|
||||
|
||||
algBw = baseBw;
|
||||
double factor = ((double)(worldSize_ - 1)) / ((double)worldSize_);
|
||||
busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
void AllGatherTestColl::setupCollTest(size_t size) {
|
||||
size_t count = size / typeSize_;
|
||||
size_t base = (count / (ALIGN * worldSize_)) * ALIGN;
|
||||
sendCount_ = base;
|
||||
recvCount_ = base * worldSize_;
|
||||
paramCount_ = base;
|
||||
expectedCount_ = recvCount_;
|
||||
}
|
||||
|
||||
class AllGatherTestEngine : public BaseTestEngine {
|
||||
public:
|
||||
AllGatherTestEngine() = default;
|
||||
~AllGatherTestEngine() override = default;
|
||||
|
||||
void allocateBuffer() override;
|
||||
void setupConnections() override;
|
||||
|
||||
private:
|
||||
std::vector<void*> getSendBuff() override;
|
||||
void* getExpectedBuff() override;
|
||||
void* getRecvBuff() override;
|
||||
|
||||
std::shared_ptr<int> sendBuff_;
|
||||
std::shared_ptr<int[]> expectedBuff_;
|
||||
};
|
||||
|
||||
void AllGatherTestEngine::allocateBuffer() {
|
||||
sendBuff_ = mscclpp::makeSharedCuda<int>(args_.maxBytes / sizeof(int));
|
||||
expectedBuff_ = std::shared_ptr<int[]>(new int[args_.maxBytes / sizeof(int)]);
|
||||
}
|
||||
|
||||
void AllGatherTestEngine::setupConnections() {
|
||||
const int worldSize = args_.totalRanks;
|
||||
const int rank = args_.rank;
|
||||
const int nRanksPerNode = args_.nRanksPerNode;
|
||||
const int thisNode = rank / nRanksPerNode;
|
||||
const mscclpp::Transport ibTransport = IBs[args_.gpuNum];
|
||||
|
||||
std::vector<mscclpp::channel::ChannelId> channelIds;
|
||||
std::vector<mscclpp::RegisteredMemory> localMemories;
|
||||
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemories;
|
||||
|
||||
auto rankToNode = [&](int rank) { return rank / nRanksPerNode; };
|
||||
for (int r = 0; r < worldSize; r++) {
|
||||
if (r == rank) {
|
||||
continue;
|
||||
}
|
||||
mscclpp::Transport transport;
|
||||
if (rankToNode(r) == thisNode) {
|
||||
transport = mscclpp::Transport::CudaIpc;
|
||||
} else {
|
||||
transport = ibTransport;
|
||||
}
|
||||
// Connect with all other ranks
|
||||
channelIds.push_back(chanService_->addChannel(comm_->connectOnSetup(r, 0, transport)));
|
||||
auto memory = comm_->registerMemory(sendBuff_.get(), args_.maxBytes, mscclpp::Transport::CudaIpc | ibTransport);
|
||||
localMemories.push_back(memory);
|
||||
comm_->sendMemoryOnSetup(memory, r, 0);
|
||||
remoteMemories.push_back(comm_->recvMemoryOnSetup(r, 0));
|
||||
}
|
||||
comm_->setup();
|
||||
|
||||
std::vector<mscclpp::channel::SimpleDeviceChannel> devChannels;
|
||||
for (size_t i = 0; i < channelIds.size(); ++i) {
|
||||
devChannels.push_back(mscclpp::channel::SimpleDeviceChannel(chanService_->deviceChannel(channelIds[i]),
|
||||
chanService_->addMemory(remoteMemories[i].get()),
|
||||
chanService_->addMemory(localMemories[i])));
|
||||
}
|
||||
|
||||
assert(devChannels.size() < sizeof(constDevChans) / sizeof(mscclpp::channel::SimpleDeviceChannel));
|
||||
CUDATHROW(cudaMemcpyToSymbol(constDevChans, devChannels.data(),
|
||||
sizeof(mscclpp::channel::SimpleDeviceChannel) * devChannels.size()));
|
||||
}
|
||||
|
||||
std::vector<void*> AllGatherTestEngine::getSendBuff() { return {sendBuff_.get()}; }
|
||||
|
||||
void* AllGatherTestEngine::getExpectedBuff() { return expectedBuff_.get(); }
|
||||
|
||||
void* AllGatherTestEngine::getRecvBuff() {
|
||||
// in-place operation reuse the send buffer
|
||||
return sendBuff_.get();
|
||||
}
|
||||
|
||||
std::shared_ptr<BaseTestEngine> getTestEngine() { return std::make_shared<AllGatherTestEngine>(); }
|
||||
std::shared_ptr<BaseTestColl> getTestColl() { return std::make_shared<AllGatherTestColl>(); }
|
||||
@@ -94,6 +94,13 @@ BaseTestEngine::BaseTestEngine(bool inPlace) : error_(0), inPlace_(inPlace) {
|
||||
|
||||
BaseTestEngine::~BaseTestEngine() { cudaStreamDestroy(stream_); }
|
||||
|
||||
void BaseTestColl::setupCollTest(const TestArgs& args, size_t size)
|
||||
{
|
||||
this->worldSize_ = args.totalRanks;
|
||||
this->typeSize_ = sizeof(int);
|
||||
this->setupCollTest(size);
|
||||
}
|
||||
|
||||
double BaseTestEngine::benchTime() {
|
||||
// Performance Benchmark
|
||||
cudaGraph_t graph;
|
||||
@@ -128,7 +135,7 @@ void BaseTestEngine::barrier() {
|
||||
void BaseTestEngine::runTest()
|
||||
{
|
||||
// warm-up for large size
|
||||
this->coll_->setupCollTest(args_.maxBytes, sizeof(int));
|
||||
this->coll_->setupCollTest(args_, args_.maxBytes);
|
||||
this->barrier();
|
||||
for (int iter = 0; iter < warmup_iters; iter++) {
|
||||
this->coll_->runColl(args_, stream_);
|
||||
@@ -136,7 +143,7 @@ void BaseTestEngine::runTest()
|
||||
CUDATHROW(cudaDeviceSynchronize());
|
||||
|
||||
// warm-up for small size
|
||||
this->coll_->setupCollTest(args_.minBytes, sizeof(int));
|
||||
this->coll_->setupCollTest(args_, args_.minBytes);
|
||||
this->barrier();
|
||||
for (int iter = 0; iter < warmup_iters; iter++) {
|
||||
this->coll_->runColl(args_, stream_);
|
||||
@@ -153,14 +160,14 @@ void BaseTestEngine::runTest()
|
||||
// Benchmark
|
||||
for (size_t size = args_.minBytes; size <= args_.maxBytes;
|
||||
size = ((args_.stepFactor > 1) ? size * args_.stepFactor : size + args_.stepBytes)) {
|
||||
coll_->setupCollTest(size, sizeof(int));
|
||||
coll_->setupCollTest(args_, size);
|
||||
this->coll_->initData(this->args_, this->getSendBuff(), this->getExpectedBuff());
|
||||
PRINT("%12li %12li", max(coll_->getSendBytes(), coll_->getExpectedBytes()), coll_->getParamBytes() / sizeof(int));
|
||||
double deltaSec = benchTime();
|
||||
|
||||
size_t nErrors = 0;
|
||||
if (args_.reportErrors) {
|
||||
this->coll_->setupCollTest(size, sizeof(int));
|
||||
this->coll_->setupCollTest(args_, size);
|
||||
this->coll_->initData(this->args_, this->getSendBuff(), this->getExpectedBuff());
|
||||
this->barrier();
|
||||
this->coll_->runColl(args_, stream_);
|
||||
|
||||
@@ -40,10 +40,10 @@ class BaseTestColl {
|
||||
BaseTestColl() {}
|
||||
virtual ~BaseTestColl() {}
|
||||
virtual void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) = 0;
|
||||
virtual void setupCollTest(size_t size, size_t typeSize) = 0;
|
||||
virtual void runColl(const TestArgs& args, cudaStream_t stream) = 0;
|
||||
virtual void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) = 0;
|
||||
|
||||
void setupCollTest(const TestArgs& args, size_t size);
|
||||
size_t getSendBytes() { return sendCount_ * typeSize_; }
|
||||
size_t getRecvBytes() { return recvCount_ * typeSize_; }
|
||||
size_t getExpectedBytes() { return expectedCount_ * typeSize_; }
|
||||
@@ -55,11 +55,15 @@ class BaseTestColl {
|
||||
size_t expectedCount_;
|
||||
size_t paramCount_;
|
||||
int typeSize_;
|
||||
int worldSize_;
|
||||
|
||||
private:
|
||||
virtual void setupCollTest(size_t size) = 0;
|
||||
};
|
||||
|
||||
class BaseTestEngine {
|
||||
public:
|
||||
BaseTestEngine(bool inPlace);
|
||||
BaseTestEngine(bool inPlace = true);
|
||||
virtual ~BaseTestEngine();
|
||||
virtual void allocateBuffer() = 0;
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ class SendRecvTestColl : public BaseTestColl {
|
||||
void runColl(const TestArgs& args, cudaStream_t stream) override;
|
||||
void initData(const TestArgs& args, std::vector<void*> sendBuff, void* expectedBuff) override;
|
||||
void getBw(const double deltaSec, double& algBW /*OUT*/, double& busBw /*OUT*/) override;
|
||||
void setupCollTest(size_t size, size_t typeSize) override;
|
||||
void setupCollTest(size_t size) override;
|
||||
};
|
||||
|
||||
void SendRecvTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
@@ -89,14 +89,13 @@ void SendRecvTestColl::initData(const TestArgs& args, std::vector<void*> sendBuf
|
||||
std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_);
|
||||
}
|
||||
|
||||
void SendRecvTestColl::setupCollTest(size_t size, size_t typeSize) {
|
||||
size_t count = size / typeSize;
|
||||
void SendRecvTestColl::setupCollTest(size_t size) {
|
||||
size_t count = size / typeSize_;
|
||||
size_t base = (count / ALIGN) * ALIGN;
|
||||
sendCount_ = base;
|
||||
recvCount_ = base;
|
||||
paramCount_ = base;
|
||||
expectedCount_ = base;
|
||||
typeSize_ = typeSize;
|
||||
|
||||
mscclpp::DeviceSyncer syncer = {};
|
||||
CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer)));
|
||||
|
||||
Reference in New Issue
Block a user