mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 01:10:22 +00:00
Merge branch 'api-extension' into binyli/exception
This commit is contained in:
56
README.md
56
README.md
@@ -55,42 +55,54 @@ $ ./build/bin/tests/bootstrap_test 127.0.0.1:50000 1 2
|
||||
|
||||
## Performance
|
||||
|
||||
All results from NDv4. "xp-yn" means "x" total GPUs across "y" nodes.
|
||||
All results from NDv4. NCCL version 2.17.1+cuda11.8, reported in-place numbers.
|
||||
|
||||
nccl-tests command example:
|
||||
```bash
|
||||
mpirun --bind-to numa -hostfile /mnt/hostfile --tag-output --allow-run-as-root -map-by ppr:8:node --bind-to numa -mca pml ob1 -mca btl ^openib -mca btl_tcp_if_include eth0 -x PATH -x LD_PRELOAD=/mnt/nccl/build/lib/libnccl.so -x NCCL_IB_PCI_RELAXED_ORDERING=1 -x NCCL_SOCKET_IFNAME=eth0 -x CUDA_DEVICE_ORDER=PCI_BUS_ID -x NCCL_NET_GDR_LEVEL=5 -x NCCL_TOPO_FILE=/mnt/ndv4-topo.xml -x NCCL_DEBUG=WARN ./build/all_gather_perf -b 1K -e 1K -g 1 -c 1 -w 10 -n 10 -G 1
|
||||
```
|
||||
|
||||
mscclpp-tests command example:
|
||||
```bash
|
||||
mpirun -allow-run-as-root -map-by ppr:8:node -hostfile /mnt/hostfile -x LD_LIBRARY_PATH=/mnt/mscclpp/build/lib:$LD_LIBRARY_PATH ./build/bin/tests/allgather_test_perf -b 1K -e 1K -w 10 -n 10 -G 1 -k 0
|
||||
```
|
||||
|
||||
**NOTE:** NCCL AllGather leverages Ring algorithm instead of all-pairs alike algorithm, which greatly reduces inter-node transmission, causing significant higher performance. MSCCL++ should do something similar in the future
|
||||
|
||||
### 8p-1n
|
||||
### 1 node, 8 gpus/node
|
||||
**Latency (us)**
|
||||
| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 |
|
||||
|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:|
|
||||
| 1K | 13.12 | 9.61 | **7.76** | 21.06 | 28.50 | 157.91 | 143.21 |
|
||||
| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce |
|
||||
|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:|
|
||||
| 1K | 12.53 | **16.96** | 9.34 | **7.76** / 21.06 / 28.50 | 157.91 / 143.21 / 447.0 | 326.4 |
|
||||
|
||||
**BusBW (GB/s)**
|
||||
| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 |
|
||||
|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:|
|
||||
| 1G | 218.27 | 220.09 | 217.05 | 216.98 | 217.15 | 93.69 | **255.06** |
|
||||
| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce |
|
||||
|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:----------------------------:|:-----------------:|
|
||||
| 1G | 253.59 | **132.31** | 254.69 | 217.05 / 216.98 / 217.15 | 125.06 / **255.64** / 124.89 | 22.55 |
|
||||
|
||||
### 2p-2n
|
||||
### 2 nodes, 1 gpu/node
|
||||
**Latency (us)**
|
||||
| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 |
|
||||
|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:|
|
||||
| 1K | 15.31 | 28.36 | 14.67 | 29.12 | 35.43 | 15.32 | **13.84** |
|
||||
| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce |
|
||||
|:------------:|:--------------:|:--------------:|:--------------:|:------------------------------:|:--------------------------:|:-----------------:|
|
||||
| 1K | 16.08 | **21.27** | 29.84 | 14.67 / 29.12 / 35.43 | 15.32 / **13.84** / 26.08 | - |
|
||||
|
||||
**BusBW (GB/s)**
|
||||
| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 |
|
||||
|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:|
|
||||
| 1G | 15.69 | 16.22 | 13.94 | 13.83 | 14.10 | **23.26** | **23.29** |
|
||||
| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce |
|
||||
|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:|
|
||||
| 1G | 15.84 | **18.65** | 15.48 | 13.94 / 13.83 / 14.10 | **23.30** / 23.29 / 21.60 | - |
|
||||
|
||||
### 16p-2n
|
||||
### 2 nodes, 8 gpus/node
|
||||
**Latency (us)**
|
||||
| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 |
|
||||
|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:|
|
||||
| 1K | 31.70 | 45.12 | **22.55** | 39.33 | 56.93 | 159.14 | 230.52 |
|
||||
| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce |
|
||||
|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:|
|
||||
| 1K | 33.74 | **35.85** | 49.75 | **22.55** / 39.33 / 56.93 | 159.14 / 230.52 / 462.7 | - |
|
||||
|
||||
**BusBW (GB/s)**
|
||||
| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 |
|
||||
|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:|
|
||||
| 1G | 174.28 | 38.30 | 40.17 | 40.18 | 40.23 | **44.08** | 9.31 |
|
||||
| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce |
|
||||
|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:|
|
||||
| 1G | 177.05 | **183.82** | 37.80 | 40.17 / 40.18 / 40.23 | 44.19 / 9.31 / **209.33** | - |
|
||||
| 4G | 186.01 | **188.18** | 37.81 | - / - / - | 44.60 / - / **234.08** | - |
|
||||
|
||||
|
||||
|
||||
## Contributing
|
||||
|
||||
11
src/ib.cc
11
src/ib.cc
@@ -97,7 +97,9 @@ IbQp::IbQp(void* ctx, void* pd, int port) {
|
||||
this->info.linkLayer = portAttr.link_layer;
|
||||
this->info.qpn = _qp->qp_num;
|
||||
this->info.mtu = portAttr.active_mtu;
|
||||
if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND) {
|
||||
this->info.is_grh = (portAttr.flags & IBV_QPF_GRH_REQUIRED);
|
||||
|
||||
if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND || this->info.is_grh) {
|
||||
union ibv_gid gid;
|
||||
if (ibv_query_gid(_ctx, port, 0, &gid) != 0) {
|
||||
std::stringstream err;
|
||||
@@ -105,6 +107,7 @@ IbQp::IbQp(void* ctx, void* pd, int port) {
|
||||
throw mscclpp::IbError(err.str(), errno);
|
||||
}
|
||||
this->info.spn = gid.global.subnet_prefix;
|
||||
this->info.iid = gid.global.interface_id;
|
||||
}
|
||||
|
||||
struct ibv_qp_attr qpAttr;
|
||||
@@ -142,18 +145,18 @@ void IbQp::rtr(const IbQpInfo& info) {
|
||||
qp_attr.rq_psn = 0;
|
||||
qp_attr.max_dest_rd_atomic = 1;
|
||||
qp_attr.min_rnr_timer = 0x12;
|
||||
if (info.linkLayer == IBV_LINK_LAYER_ETHERNET) {
|
||||
if (info.linkLayer == IBV_LINK_LAYER_ETHERNET || info.is_grh) {
|
||||
qp_attr.ah_attr.is_global = 1;
|
||||
qp_attr.ah_attr.grh.dgid.global.subnet_prefix = info.spn;
|
||||
qp_attr.ah_attr.grh.dgid.global.interface_id = info.lid;
|
||||
qp_attr.ah_attr.grh.dgid.global.interface_id = info.iid;
|
||||
qp_attr.ah_attr.grh.flow_label = 0;
|
||||
qp_attr.ah_attr.grh.sgid_index = 0;
|
||||
qp_attr.ah_attr.grh.hop_limit = 255;
|
||||
qp_attr.ah_attr.grh.traffic_class = 0;
|
||||
} else {
|
||||
qp_attr.ah_attr.is_global = 0;
|
||||
qp_attr.ah_attr.dlid = info.lid;
|
||||
}
|
||||
qp_attr.ah_attr.dlid = info.lid;
|
||||
qp_attr.ah_attr.sl = 0;
|
||||
qp_attr.ah_attr.src_path_bits = 0;
|
||||
qp_attr.ah_attr.port_num = info.port;
|
||||
|
||||
@@ -43,6 +43,8 @@ struct IbQpInfo {
|
||||
uint32_t qpn;
|
||||
uint64_t spn;
|
||||
int mtu;
|
||||
uint64_t iid;
|
||||
bool is_grh;
|
||||
};
|
||||
|
||||
class IbQp {
|
||||
|
||||
@@ -137,12 +137,44 @@ struct mscclppDevConn {
|
||||
;
|
||||
}
|
||||
|
||||
// Version that uses the SM directly to do the copy, instead of using the proxy thread like the functions above.
|
||||
__forceinline__ __device__ void putDirect(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize,
|
||||
uint32_t threadId, uint32_t numThreads) {
|
||||
uint64_t* src = (uint64_t*)((char*)localBuff + srcDataOffset);
|
||||
uint64_t* dst = (uint64_t*)((char*)remoteBuff + dstDataOffset);
|
||||
// assume the memory is aligned to 8 bytes
|
||||
size_t nElem =
|
||||
dataSize % sizeof(uint64_t) ? (dataSize + sizeof(uint64_t)) / sizeof(uint64_t) : dataSize / sizeof(uint64_t);
|
||||
for (size_t i = threadId; i < nElem; i += numThreads) {
|
||||
dst[i] = src[i];
|
||||
}
|
||||
}
|
||||
|
||||
__forceinline__ __device__ void putDirect(uint64_t dataOffset, uint64_t dataSize, uint32_t threadId,
|
||||
uint32_t numThreads) {
|
||||
putDirect(dataOffset, dataOffset, dataSize, threadId, numThreads);
|
||||
}
|
||||
|
||||
__forceinline__ __device__ void signalDirect() {
|
||||
// This fence ensures that the writes from a preceding putDirect() are visible on the peer GPU before the
|
||||
// incremented epoch id is visible.
|
||||
__threadfence_system();
|
||||
epochIncrement();
|
||||
*(volatile uint64_t*)&(remoteSignalEpochId->device) = localSignalEpochId->device;
|
||||
}
|
||||
|
||||
__forceinline__ __device__ void wait() {
|
||||
(*waitEpochId) += 1;
|
||||
while (*(volatile uint64_t*)&(localSignalEpochId->proxy) < (*waitEpochId))
|
||||
;
|
||||
}
|
||||
|
||||
__forceinline__ __device__ void waitDirect() {
|
||||
(*waitEpochId) += 1;
|
||||
while (*(volatile uint64_t*)&(localSignalEpochId->device) < (*waitEpochId))
|
||||
;
|
||||
}
|
||||
|
||||
__forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(localSignalEpochId->device) += 1; }
|
||||
|
||||
#endif // __CUDACC__
|
||||
|
||||
@@ -195,8 +195,8 @@ testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode,
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, AllGatherInitData, AllGatherGetBw,
|
||||
AllGatherRunColl};
|
||||
struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, defaultInitColl, AllGatherInitData,
|
||||
AllGatherGetBw, AllGatherRunColl};
|
||||
|
||||
void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
|
||||
{
|
||||
@@ -215,6 +215,6 @@ testResult_t AllGatherRunTest(struct testArgs* args)
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest};
|
||||
struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest, nullptr, nullptr};
|
||||
|
||||
#pragma weak mscclppTestEngine = allGatherEngine
|
||||
|
||||
@@ -1,322 +0,0 @@
|
||||
#include "mscclpp.h"
|
||||
#include <cuda/barrier>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include "common.h"
|
||||
|
||||
#define MSCCLPPCHECK(call) \
|
||||
do { \
|
||||
mscclppResult_t res = call; \
|
||||
if (res != mscclppSuccess && res != mscclppInProgress) { \
|
||||
/* Print the back trace*/ \
|
||||
printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \
|
||||
return res; \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define CUDACHECK(cmd) \
|
||||
do { \
|
||||
cudaError_t err = cmd; \
|
||||
if (err != cudaSuccess) { \
|
||||
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
exit(EXIT_FAILURE); \
|
||||
} \
|
||||
} while (false)
|
||||
|
||||
struct Volume
|
||||
{
|
||||
size_t offset;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
__host__ __device__ Volume chunkVolume(size_t totalSize, size_t totalChunks, size_t chunkIdx, size_t chunkCount)
|
||||
{
|
||||
size_t remainder = totalSize % totalChunks;
|
||||
size_t smallChunk = totalSize / totalChunks;
|
||||
size_t largeChunk = smallChunk + 1;
|
||||
size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
|
||||
size_t numSmallChunks = chunkCount - numLargeChunks;
|
||||
size_t offset =
|
||||
(remainder - numLargeChunks) * largeChunk + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunk;
|
||||
return Volume{offset, numLargeChunks * largeChunk + numSmallChunks * smallChunk};
|
||||
}
|
||||
|
||||
template <class T, void (*reduce)(T*, T*, size_t)> struct AllreduceAllpairs
|
||||
{
|
||||
int rank;
|
||||
int nRanks;
|
||||
T* userData;
|
||||
size_t userSize;
|
||||
T* scratch;
|
||||
size_t scratchSize;
|
||||
mscclppDevConn_t* conns;
|
||||
uint64_t* connFlags;
|
||||
cuda::barrier<cuda::thread_scope_device>* barrier;
|
||||
|
||||
__device__ void run(int idx)
|
||||
{
|
||||
int myPeer = peerRank(idx, rank);
|
||||
mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(myPeer)];
|
||||
mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(myPeer)];
|
||||
mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(myPeer)];
|
||||
|
||||
// 1st communication phase: send data to the scratch buffer of the peer associated with this block
|
||||
Volume toPeer = chunkVolume(userSize, nRanks, myPeer, 1);
|
||||
// Now we need to figure out the offset of this chunk in the scratch buffer of the destination.
|
||||
// The destination will have allocated a scratch buffer of size numPeers() * toPeer.size and
|
||||
// inside that each of the destination's peers send to the nth chunk, where n is the index of the
|
||||
// source peer from the destination's perspective.
|
||||
size_t dstOffset = peerIdx(rank, myPeer) * toPeer.size;
|
||||
send(phase1SendConn, toPeer.offset, dstOffset, toPeer.size);
|
||||
recv(phase1RecvConn);
|
||||
|
||||
if (threadIdx.x == 0)
|
||||
barrier->arrive_and_wait();
|
||||
__syncthreads();
|
||||
|
||||
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
|
||||
Volume rankUserChunk = chunkVolume(userSize, nRanks, rank, 1);
|
||||
T* userChunk = userData + rankUserChunk.offset;
|
||||
Volume blockUserChunk = chunkVolume(rankUserChunk.size, numBlocks(), idx, 1);
|
||||
for (int peerIdx = 0; peerIdx < numPeers(); ++peerIdx) {
|
||||
assert(scratchSize % numPeers() == 0);
|
||||
assert(scratchSize / numPeers() == rankUserChunk.size);
|
||||
size_t scratchChunkSize = scratchSize / numPeers();
|
||||
T* scratchChunk = scratch + peerIdx * scratchChunkSize;
|
||||
Volume blockScratchChunk = chunkVolume(scratchChunkSize, numBlocks(), idx, 1);
|
||||
assert(blockScratchChunk.size == blockUserChunk.size);
|
||||
reduce(userChunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
|
||||
}
|
||||
|
||||
if (threadIdx.x == 0)
|
||||
barrier->arrive_and_wait();
|
||||
__syncthreads();
|
||||
|
||||
// 2nd communication phase: send the now reduced data between the user buffers
|
||||
Volume srcVolume2 = chunkVolume(userSize, nRanks, rank, 1);
|
||||
send(phase2Conn, srcVolume2.offset, srcVolume2.offset, srcVolume2.size);
|
||||
recv(phase2Conn);
|
||||
}
|
||||
|
||||
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size)
|
||||
{
|
||||
if (threadIdx.x == 0) {
|
||||
volatile uint64_t* localFlag = conn.localFlag;
|
||||
*localFlag = 1; // 1 is used to signal the send
|
||||
|
||||
mscclppTrigger_t trigger;
|
||||
auto request = conn.fifo.getTrigger(&trigger);
|
||||
conn.fifo.setTrigger(trigger, mscclppData | mscclppFlag, srcOffset * sizeof(T), dstOffset * sizeof(T),
|
||||
size * sizeof(T));
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void recv(mscclppDevConn_t& conn)
|
||||
{
|
||||
if (threadIdx.x == 0) {
|
||||
volatile uint64_t* proxyFlag = conn.proxyFlag;
|
||||
while (*proxyFlag != 1) {
|
||||
}
|
||||
*proxyFlag = 0;
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__host__ __device__ int numPeers()
|
||||
{
|
||||
return nRanks - 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int numBlocks()
|
||||
{
|
||||
return numPeers();
|
||||
}
|
||||
|
||||
__host__ __device__ int peerIdx(int peerRank, int myRank)
|
||||
{
|
||||
return peerRank < myRank ? peerRank : peerRank - 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int peerRank(int peerIdx, int myRank)
|
||||
{
|
||||
return peerIdx < myRank ? peerIdx : peerIdx + 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int phase1SendConnIdx(int peerRank)
|
||||
{
|
||||
return peerIdx(peerRank, rank) * 3;
|
||||
}
|
||||
|
||||
__host__ __device__ int phase1RecvConnIdx(int peerRank)
|
||||
{
|
||||
return peerIdx(peerRank, rank) * 3 + 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int phase2ConnIdx(int peerRank)
|
||||
{
|
||||
return peerIdx(peerRank, rank) * 3 + 2;
|
||||
}
|
||||
|
||||
void freeGPUResources()
|
||||
{
|
||||
if (scratch)
|
||||
CUDACHECK(cudaFree(scratch));
|
||||
scratch = nullptr;
|
||||
if (connFlags)
|
||||
CUDACHECK(cudaFree(connFlags));
|
||||
connFlags = nullptr;
|
||||
if (conns)
|
||||
CUDACHECK(cudaFree(conns));
|
||||
conns = nullptr;
|
||||
if (barrier)
|
||||
CUDACHECK(cudaFree(barrier));
|
||||
barrier = nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
// The builder class encapsulates the
|
||||
template <class T, void (*reduce)(T*, T*, size_t)> class AllreduceAllpairsBuilder
|
||||
{
|
||||
AllreduceAllpairs<T, reduce> d;
|
||||
std::vector<mscclppDevConn_t> hostConns;
|
||||
|
||||
public:
|
||||
// The constructor is called after the user has allocated the buffer to be allreduced
|
||||
AllreduceAllpairsBuilder(T* data, size_t size)
|
||||
{
|
||||
d.userData = data;
|
||||
d.userSize = size;
|
||||
d.scratch = nullptr;
|
||||
d.connFlags = nullptr;
|
||||
d.conns = nullptr;
|
||||
d.barrier = nullptr;
|
||||
}
|
||||
|
||||
// connect is called after rank initialization but before connection setup
|
||||
mscclppResult_t connect(mscclppComm_t comm)
|
||||
{
|
||||
MSCCLPPCHECK(mscclppCommRank(comm, &d.rank));
|
||||
MSCCLPPCHECK(mscclppCommSize(comm, &d.nRanks));
|
||||
|
||||
Volume myChunks = chunkVolume(d.userSize, d.nRanks, d.rank, 1);
|
||||
d.scratchSize = myChunks.size * d.numPeers();
|
||||
|
||||
CUDACHECK(cudaMalloc(&d.scratch, d.scratchSize * sizeof(T)));
|
||||
CUDACHECK(cudaMalloc(&d.connFlags, 3 * sizeof(uint64_t)));
|
||||
CUDACHECK(cudaMemset(d.connFlags, 0, 3 * sizeof(uint64_t)));
|
||||
|
||||
hostConns.resize(d.numPeers() * 3);
|
||||
for (int peer = 0; peer < d.nRanks; ++peer) {
|
||||
if (peer != d.rank) {
|
||||
int sendTag = d.rank < peer ? 0 : 1;
|
||||
int recvTag = d.rank < peer ? 1 : 0;
|
||||
MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1SendConnIdx(peer), peer, d.userData,
|
||||
d.userSize * sizeof(T), d.connFlags + 0, sendTag, mscclppTransportP2P, nullptr));
|
||||
MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1RecvConnIdx(peer), peer, d.scratch,
|
||||
d.scratchSize * sizeof(T), d.connFlags + 1, recvTag, mscclppTransportP2P, nullptr));
|
||||
MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase2ConnIdx(peer), peer, d.userData,
|
||||
d.userSize * sizeof(T), d.connFlags + 2, 2, mscclppTransportP2P, nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
return mscclppSuccess;
|
||||
}
|
||||
|
||||
// finishSetup is called after connection setup and returns an algorithm object that is ready to be passed to a GPU
|
||||
// kernel
|
||||
AllreduceAllpairs<T, reduce> finishSetup()
|
||||
{
|
||||
CUDACHECK(cudaMalloc(&d.conns, hostConns.size() * sizeof(mscclppDevConn_t)));
|
||||
CUDACHECK(
|
||||
cudaMemcpy(d.conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice));
|
||||
CUDACHECK(cudaMalloc(&d.barrier, sizeof(cuda::barrier<cuda::thread_scope_device>)));
|
||||
cuda::barrier<cuda::thread_scope_device> initBarrier(d.numBlocks());
|
||||
CUDACHECK(
|
||||
cudaMemcpy(d.barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
|
||||
return d;
|
||||
}
|
||||
};
|
||||
|
||||
template <class T> __device__ void reduceSum(T* dst, T* src, size_t size)
|
||||
{
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
dst[i] += src[i];
|
||||
}
|
||||
}
|
||||
|
||||
template <class T> __global__ void init(T* data, size_t size, int rank)
|
||||
{
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
data[i] = rank;
|
||||
}
|
||||
}
|
||||
|
||||
// The main test kernel
|
||||
template <class T> __global__ void testKernel(AllreduceAllpairs<T, reduceSum> d)
|
||||
{
|
||||
d.run(blockIdx.x);
|
||||
}
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Init(NULL, NULL);
|
||||
#endif
|
||||
const char* ip_port;
|
||||
int rank, world_size;
|
||||
parse_arguments(argc, argv, &ip_port, &rank, &world_size);
|
||||
|
||||
CUDACHECK(cudaSetDevice(rank));
|
||||
|
||||
// Allocate and initialize 1 MB of data
|
||||
int* data;
|
||||
size_t dataSize = 1024 * 1024 / sizeof(int);
|
||||
CUDACHECK(cudaMalloc(&data, dataSize * sizeof(int)));
|
||||
init<<<1, 256>>>(data, dataSize, rank);
|
||||
|
||||
// Create the collective
|
||||
AllreduceAllpairsBuilder<int, reduceSum> builder(data, dataSize);
|
||||
|
||||
// Create the communicator
|
||||
mscclppComm_t comm;
|
||||
MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port));
|
||||
|
||||
// Connect the collective
|
||||
builder.connect(comm);
|
||||
|
||||
// Finish the setup
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(comm));
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(comm));
|
||||
auto allreduce = builder.finishSetup();
|
||||
|
||||
// Run the collective
|
||||
testKernel<<<allreduce.numBlocks(), 256>>>(allreduce);
|
||||
|
||||
// Wait for kernel to finish
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
|
||||
// Check the result
|
||||
int* hostData = new int[dataSize];
|
||||
CUDACHECK(cudaMemcpy(hostData, data, dataSize * sizeof(int), cudaMemcpyDeviceToHost));
|
||||
int expectedValue = world_size * (world_size - 1) / 2;
|
||||
for (size_t i = 0; i < dataSize; ++i) {
|
||||
if (hostData[i] != expectedValue) {
|
||||
printf("Error at index %lu: %d != %d\n", i, hostData[i], expectedValue);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
MSCCLPPCHECK(mscclppProxyStop(comm));
|
||||
|
||||
MSCCLPPCHECK(mscclppCommDestroy(comm));
|
||||
|
||||
#ifdef MSCCLPP_USE_MPI_FOR_TESTS
|
||||
if (argc == 2) {
|
||||
MPI_Finalize();
|
||||
}
|
||||
#endif
|
||||
printf("Succeeded! %d\n", rank);
|
||||
return 0;
|
||||
}
|
||||
284
test/allreduce_test.cu
Normal file
284
test/allreduce_test.cu
Normal file
@@ -0,0 +1,284 @@
|
||||
#include <cuda/barrier>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include "comm.h"
|
||||
#include "common.h"
|
||||
|
||||
#define ALIGN 4
|
||||
|
||||
const int phase2Tag = 2;
|
||||
mscclppDevConn_t* conns;
|
||||
void* scratch = nullptr;
|
||||
void* sendRecvData = nullptr;
|
||||
cuda::barrier<cuda::thread_scope_device>* barrier = nullptr;
|
||||
|
||||
struct Chunk
|
||||
{
|
||||
size_t offset;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
inline int getSendTag(int rank, int peer)
|
||||
{
|
||||
return rank < peer ? 0 : 1;
|
||||
}
|
||||
|
||||
inline int getRecvTag(int rank, int peer)
|
||||
{
|
||||
return rank < peer ? 1 : 0;
|
||||
}
|
||||
|
||||
__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount)
|
||||
{
|
||||
size_t remainder = dataCount % numChunks;
|
||||
size_t smallChunkSize = dataCount / numChunks;
|
||||
size_t largeChunkSize = smallChunkSize + 1;
|
||||
size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0;
|
||||
size_t numSmallChunks = chunkCount - numLargeChunks;
|
||||
size_t offset =
|
||||
(remainder - numLargeChunks) * largeChunkSize + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize;
|
||||
return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize};
|
||||
}
|
||||
|
||||
__host__ __device__ int peerIdx(int peerRank, int rank)
|
||||
{
|
||||
return peerRank < rank ? peerRank : peerRank - 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int peerRank(int peerIdx, int rank)
|
||||
{
|
||||
return peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int phase1SendConnIdx(int peerRank, int rank)
|
||||
{
|
||||
return peerIdx(peerRank, rank) * 3;
|
||||
}
|
||||
|
||||
__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank)
|
||||
{
|
||||
return peerIdx(peerRank, rank) * 3 + 1;
|
||||
}
|
||||
|
||||
__host__ __device__ int phase2ConnIdx(int peerRank, int rank)
|
||||
{
|
||||
return peerIdx(peerRank, rank) * 3 + 2;
|
||||
}
|
||||
|
||||
__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size)
|
||||
{
|
||||
if (threadIdx.x == 0) {
|
||||
conn.putWithSignalAndFlush(dstOffset, srcOffset, size);
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void recv(mscclppDevConn_t& conn)
|
||||
{
|
||||
if (threadIdx.x == 0) {
|
||||
conn.wait();
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
__device__ void reduceSum(int* dst, int* src, size_t size)
|
||||
{
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
dst[i] += src[i];
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void initData(int* data, size_t size, int rank)
|
||||
{
|
||||
for (int i = threadIdx.x; i < size; i += blockDim.x) {
|
||||
data[i] = rank;
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount,
|
||||
mscclppDevConn_t* conns, void* scratch, void* sendRecvData,
|
||||
cuda::barrier<cuda::thread_scope_device>* barrier)
|
||||
{
|
||||
int idx = blockIdx.x;
|
||||
int peer = peerRank(idx, rank);
|
||||
mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)];
|
||||
mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(peer, rank)];
|
||||
mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(peer, rank)];
|
||||
|
||||
// 1st communication phase: send data to the scratch buffer of the peer associated with this block
|
||||
Chunk toPeerChunk = getChunk(dataCount, nRanks, peer, 1);
|
||||
// Now we need to figure out the offset of this chunk in the scratch buffer of the destination.
|
||||
// The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and
|
||||
// inside that each of the destination's peers send to the nth chunk, where n is the index of the
|
||||
// source peer from the destination's perspective.
|
||||
size_t dstOffset = peerIdx(rank, peer) * toPeerChunk.size;
|
||||
send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int));
|
||||
recv(phase1RecvConn);
|
||||
|
||||
if (threadIdx.x == 0)
|
||||
barrier->arrive_and_wait();
|
||||
__syncthreads();
|
||||
|
||||
// Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer
|
||||
Chunk rankChunk = getChunk(dataCount, nRanks, rank, 1);
|
||||
int* chunk = (int*)sendRecvData + rankChunk.offset;
|
||||
int numPeers = nRanks - 1, numBlocks = nRanks - 1;
|
||||
Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, idx, 1);
|
||||
for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) {
|
||||
assert(scratchDataCount % numPeers == 0);
|
||||
assert(scratchDataCount / numPeers == rankChunk.size);
|
||||
size_t scratchDataCountPerPeer = scratchDataCount / numPeers;
|
||||
int* scratchChunk = (int*)scratch + peerIdx * scratchDataCountPerPeer;
|
||||
Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, idx, 1);
|
||||
assert(blockScratchChunk.size == blockUserChunk.size);
|
||||
reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size);
|
||||
}
|
||||
|
||||
if (threadIdx.x == 0)
|
||||
barrier->arrive_and_wait();
|
||||
__syncthreads();
|
||||
|
||||
// 2nd communication phase: send the now reduced data between the user buffers
|
||||
Chunk collectionChunk = getChunk(dataCount, nRanks, rank, 1);
|
||||
send(phase2Conn, collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int),
|
||||
collectionChunk.size * sizeof(int));
|
||||
recv(phase2Conn);
|
||||
}
|
||||
|
||||
void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks)
|
||||
{
|
||||
size_t base = (count / ALIGN) * ALIGN;
|
||||
*sendcount = base;
|
||||
*recvcount = base;
|
||||
*sendInplaceOffset = 0;
|
||||
*recvInplaceOffset = 0;
|
||||
*paramcount = base;
|
||||
}
|
||||
|
||||
void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
|
||||
{
|
||||
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
|
||||
AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
|
||||
}
|
||||
|
||||
testResult_t AllReduceInitData(struct testArgs* args, int in_place)
|
||||
{
|
||||
size_t recvcount = args->expectedBytes / sizeof(int);
|
||||
|
||||
CUDACHECK(cudaSetDevice(args->gpuNum));
|
||||
CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes));
|
||||
initData<<<1, 256>>>((int*)args->recvbuff, recvcount, args->proc);
|
||||
|
||||
int* dataHost = new int[recvcount];
|
||||
for (size_t i = 0; i < recvcount; i++) {
|
||||
dataHost[i] = args->totalProcs * (args->totalProcs - 1) / 2;
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
delete dataHost;
|
||||
CUDACHECK(cudaDeviceSynchronize());
|
||||
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
|
||||
{
|
||||
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
|
||||
|
||||
*algBw = baseBw;
|
||||
double factor = (2 * (double)(nranks - 1)) / ((double)nranks);
|
||||
*busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm,
|
||||
cudaStream_t stream, int kernelNum)
|
||||
{
|
||||
int worldSize = comm->nRanks;
|
||||
int nPeers = worldSize - 1;
|
||||
int dataCount = nBytes / sizeof(int);
|
||||
Chunk chunk = getChunk(dataCount, worldSize, comm->rank, 1);
|
||||
size_t scratchDataCount = chunk.size * nPeers;
|
||||
allReduceKernel0<<<worldSize - 1, 256, 0, stream>>>(comm->rank, worldSize, dataCount, scratchDataCount, conns,
|
||||
scratch, sendRecvData, barrier);
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, defaultInitColl, AllReduceInitData,
|
||||
AllReduceGetBw, AllReduceRunColl};
|
||||
|
||||
testResult_t AllReduceSetupMscclppConnections(struct testArgs* args)
|
||||
{
|
||||
int rank = args->proc, worldSize = args->totalProcs;
|
||||
size_t bufferSize = args->maxbytes;
|
||||
Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1);
|
||||
int nPeers = args->totalProcs - 1;
|
||||
size_t scratchBytes = chunk.size * nPeers * sizeof(int);
|
||||
|
||||
CUDACHECK(cudaMalloc(&scratch, scratchBytes));
|
||||
|
||||
for (int peer = 0; peer < worldSize; ++peer) {
|
||||
if (peer != args->proc) {
|
||||
int sendTag = getSendTag(args->proc, peer);
|
||||
int recvTag = getRecvTag(args->proc, peer);
|
||||
MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
|
||||
MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr));
|
||||
MSCCLPPCHECK(
|
||||
mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr));
|
||||
}
|
||||
}
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(args->comm));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t AllReduceTeardownMscclppConnections()
|
||||
{
|
||||
if (scratch != nullptr) {
|
||||
CUDACHECK(cudaFree(scratch));
|
||||
scratch = nullptr;
|
||||
}
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t AllReduceRunTest(struct testArgs* args)
|
||||
{
|
||||
args->collTest = &allReduceTest;
|
||||
|
||||
sendRecvData = args->recvbuff;
|
||||
CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier<cuda::thread_scope_device>)));
|
||||
cuda::barrier<cuda::thread_scope_device> initBarrier(args->totalProcs - 1);
|
||||
CUDACHECK(
|
||||
cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier<cuda::thread_scope_device>), cudaMemcpyHostToDevice));
|
||||
int nPeers = args->totalProcs - 1;
|
||||
int rank = args->proc;
|
||||
std::vector<mscclppDevConn_t> hostConns(nPeers * 3, mscclppDevConn_t());
|
||||
|
||||
for (int peer = 0; peer < args->totalProcs; ++peer) {
|
||||
mscclppDevConn_t* devConn;
|
||||
if (peer != rank) {
|
||||
int sendTag = getSendTag(args->proc, peer);
|
||||
int recvTag = getRecvTag(args->proc, peer);
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, sendTag, &devConn));
|
||||
hostConns[phase1SendConnIdx(peer, rank)] = *devConn;
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, recvTag, &devConn));
|
||||
hostConns[phase1RecvConnIdx(peer, rank)] = *devConn;
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, phase2Tag, &devConn));
|
||||
hostConns[phase2ConnIdx(peer, rank)] = *devConn;
|
||||
}
|
||||
}
|
||||
CUDACHECK(cudaMalloc(&conns, nPeers * 3 * sizeof(mscclppDevConn_t)));
|
||||
CUDACHECK(cudaMemcpy(conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice));
|
||||
|
||||
TESTCHECK(TimeTest(args));
|
||||
|
||||
CUDACHECK(cudaFree(barrier));
|
||||
CUDACHECK(cudaFree(conns));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testEngine allReduceEngine = {AllReduceGetBuffSize, AllReduceRunTest, AllReduceSetupMscclppConnections,
|
||||
AllReduceTeardownMscclppConnections};
|
||||
|
||||
#pragma weak mscclppTestEngine = allReduceEngine
|
||||
@@ -212,11 +212,8 @@ template <typename T> void Allreduce(struct testArgs* args, T* value, int averag
|
||||
*value = accumulator;
|
||||
}
|
||||
|
||||
testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts)
|
||||
testResult_t CheckData(struct testArgs* args, int64_t* wrongElts)
|
||||
{
|
||||
if (in_place == 0) {
|
||||
return testInternalError;
|
||||
}
|
||||
size_t count = args->expectedBytes / sizeof(int);
|
||||
|
||||
int* dataHostRecv = new int[count];
|
||||
@@ -226,10 +223,11 @@ testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts)
|
||||
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
if (dataHostRecv[i] != dataHostExpected[i]) {
|
||||
// PRINT("Error: dataHostRecv[%ld] = %d, dataHostExpected[%ld] = %d\n", i, dataHostRecv[i], i,
|
||||
// dataHostExpected[i]);
|
||||
*wrongElts += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (args->reportErrors && *wrongElts) {
|
||||
(args->error)++;
|
||||
}
|
||||
@@ -300,7 +298,7 @@ testResult_t BenchTime(struct testArgs* args, int in_place)
|
||||
CUDACHECK(cudaGraphExecDestroy(graphExec));
|
||||
CUDACHECK(cudaGraphDestroy(graph));
|
||||
|
||||
TESTCHECK(CheckData(args, in_place, &wrongElts));
|
||||
TESTCHECK(CheckData(args, &wrongElts));
|
||||
|
||||
// aggregate delta from all threads and procs
|
||||
long long wrongElts1 = wrongElts;
|
||||
@@ -317,6 +315,9 @@ testResult_t BenchTime(struct testArgs* args, int in_place)
|
||||
} else {
|
||||
sprintf(timeStr, "%7.2f", timeUsec);
|
||||
}
|
||||
if (!in_place) {
|
||||
PRINT(" ");
|
||||
}
|
||||
if (args->reportErrors) {
|
||||
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
|
||||
} else {
|
||||
@@ -328,7 +329,7 @@ testResult_t BenchTime(struct testArgs* args, int in_place)
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void setupArgs(size_t size, struct testArgs* args)
|
||||
testResult_t setupArgsAndInit(size_t size, struct testArgs* args)
|
||||
{
|
||||
int nranks = args->totalProcs;
|
||||
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
|
||||
@@ -344,6 +345,8 @@ void setupArgs(size_t size, struct testArgs* args)
|
||||
args->expectedBytes = recvCount * typeSize;
|
||||
args->sendInplaceOffset = sendInplaceOffset * typeSize;
|
||||
args->recvInplaceOffset = recvInplaceOffset * typeSize;
|
||||
|
||||
return args->collTest->initColl();
|
||||
}
|
||||
|
||||
testResult_t TimeTest(struct testArgs* args)
|
||||
@@ -352,7 +355,7 @@ testResult_t TimeTest(struct testArgs* args)
|
||||
TESTCHECK(Barrier(args));
|
||||
|
||||
// Warm-up for large size
|
||||
setupArgs(args->maxbytes, args);
|
||||
TESTCHECK(setupArgsAndInit(args->maxbytes, args));
|
||||
TESTCHECK(args->collTest->initData(args, 1));
|
||||
for (int iter = 0; iter < warmup_iters; iter++) {
|
||||
TESTCHECK(startColl(args, 1, iter));
|
||||
@@ -360,7 +363,7 @@ testResult_t TimeTest(struct testArgs* args)
|
||||
TESTCHECK(completeColl(args));
|
||||
|
||||
// Warm-up for small size
|
||||
setupArgs(args->minbytes, args);
|
||||
TESTCHECK(setupArgsAndInit(args->minbytes, args));
|
||||
for (int iter = 0; iter < warmup_iters; iter++) {
|
||||
TESTCHECK(startColl(args, 1, iter));
|
||||
}
|
||||
@@ -375,11 +378,9 @@ testResult_t TimeTest(struct testArgs* args)
|
||||
// Benchmark
|
||||
for (size_t size = args->minbytes; size <= args->maxbytes;
|
||||
size = ((args->stepfactor > 1) ? size * args->stepfactor : size + args->stepbytes)) {
|
||||
setupArgs(size, args);
|
||||
TESTCHECK(setupArgsAndInit(size, args));
|
||||
PRINT("%12li %12li", max(args->sendBytes, args->expectedBytes), args->nbytes / sizeof(int));
|
||||
// Don't support out-of-place for now
|
||||
// TESTCHECK(BenchTime(args, 0));
|
||||
TESTCHECK(BenchTime(args, 1));
|
||||
TESTCHECK(BenchTime(args, args->in_place));
|
||||
PRINT("\n");
|
||||
}
|
||||
return testSuccess;
|
||||
@@ -415,13 +416,20 @@ testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode,
|
||||
testResult_t runTests(struct testArgs* args)
|
||||
{
|
||||
PRINT("# Setting up the connection in MSCCL++\n");
|
||||
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
|
||||
args->maxbytes));
|
||||
if (mscclppTestEngine.setupMscclppConnections != nullptr) {
|
||||
TESTCHECK(mscclppTestEngine.setupMscclppConnections(args));
|
||||
} else {
|
||||
TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff,
|
||||
args->maxbytes));
|
||||
}
|
||||
PRINT("# Launching MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyLaunch(args->comm));
|
||||
TESTCHECK(mscclppTestEngine.runTest(args));
|
||||
PRINT("Stopping MSCCL++ proxy threads\n");
|
||||
MSCCLPPCHECK(mscclppProxyStop(args->comm));
|
||||
if (mscclppTestEngine.teardownMscclppConnections != nullptr) {
|
||||
TESTCHECK(mscclppTestEngine.teardownMscclppConnections());
|
||||
}
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
@@ -638,6 +646,7 @@ testResult_t run()
|
||||
worker.args.stepfactor = stepFactor;
|
||||
worker.args.localRank = localRank;
|
||||
worker.args.nranksPerNode = nranksPerNode;
|
||||
worker.args.in_place = 1;
|
||||
|
||||
worker.args.totalProcs = totalProcs;
|
||||
worker.args.proc = proc;
|
||||
@@ -670,6 +679,9 @@ testResult_t run()
|
||||
CUDACHECK(cudaFreeHost(delta));
|
||||
|
||||
int error = worker.args.error;
|
||||
#if MSCCLPP_USE_MPI_FOR_TESTS
|
||||
MPI_Allreduce(MPI_IN_PLACE, &error, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
|
||||
#endif
|
||||
PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK");
|
||||
PRINT("#\n");
|
||||
|
||||
|
||||
@@ -64,11 +64,17 @@ typedef enum
|
||||
testNumResults = 5
|
||||
} testResult_t;
|
||||
|
||||
inline testResult_t defaultInitColl()
|
||||
{
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl
|
||||
{
|
||||
const char name[20];
|
||||
void (*getCollByteCount)(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks);
|
||||
testResult_t (*initColl)();
|
||||
testResult_t (*initData)(struct testArgs* args, int in_place);
|
||||
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
|
||||
testResult_t (*runColl)(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
|
||||
@@ -80,6 +86,8 @@ struct testEngine
|
||||
void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks);
|
||||
// We can add more parameters for other communication primitives
|
||||
testResult_t (*runTest)(struct testArgs* args);
|
||||
testResult_t (*setupMscclppConnections)(struct testArgs* args);
|
||||
testResult_t (*teardownMscclppConnections)();
|
||||
};
|
||||
|
||||
extern struct testEngine mscclppTestEngine;
|
||||
@@ -98,6 +106,7 @@ struct testArgs
|
||||
int localRank;
|
||||
int nranksPerNode;
|
||||
int kernel_num;
|
||||
int in_place;
|
||||
void* sendbuff;
|
||||
size_t sendBytes;
|
||||
size_t sendInplaceOffset;
|
||||
@@ -151,4 +160,4 @@ inline void print_usage(const char* prog)
|
||||
if (is_main_thread) \
|
||||
printf
|
||||
|
||||
#endif // MSCCLPP_TESTS_COMMON_H_
|
||||
#endif // MSCCLPP_TESTS_COMMON_H_
|
||||
|
||||
203
test/sendrecv_test.cu
Normal file
203
test/sendrecv_test.cu
Normal file
@@ -0,0 +1,203 @@
|
||||
#include "comm.h"
|
||||
#include "common.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
|
||||
constexpr size_t BLOCK_THREADS_NUM = 1024;
|
||||
// Try to use more blocks if per-block data size exceeds this threshold
|
||||
constexpr size_t THRES_BYTES_PER_BLOCK = 8192;
|
||||
// Let it no more than the number of SMs on a GPU
|
||||
constexpr size_t MAX_BLOCKS_NUM = 32;
|
||||
|
||||
#define ALIGN 4
|
||||
|
||||
__constant__ mscclppDevConn_t sendConnConst;
|
||||
__constant__ mscclppDevConn_t recvConnConst;
|
||||
|
||||
struct SyncGpuState
|
||||
{
|
||||
volatile int flag;
|
||||
int cnt;
|
||||
int is_add;
|
||||
};
|
||||
|
||||
// Synchronize multiple thread blocks inside a kernel. Guarantee that all
|
||||
// previous work of all threads in cooperating blocks is finished and
|
||||
// visible to all threads in the device.
|
||||
__forceinline__ __device__ void sync_gpu(SyncGpuState& state, int blockNum)
|
||||
{
|
||||
int maxOldCnt = blockNum - 1;
|
||||
__syncthreads();
|
||||
if (threadIdx.x == 0) {
|
||||
int is_add_ = state.is_add ^ 1;
|
||||
if (is_add_) {
|
||||
if (atomicAdd(&state.cnt, 1) == maxOldCnt) {
|
||||
state.flag = 1;
|
||||
}
|
||||
while (!state.flag) {
|
||||
}
|
||||
} else {
|
||||
if (atomicSub(&state.cnt, 1) == 1) {
|
||||
state.flag = 0;
|
||||
}
|
||||
while (state.flag) {
|
||||
}
|
||||
}
|
||||
state.is_add = is_add_;
|
||||
}
|
||||
// We need sync here because only a single thread is checking whether
|
||||
// the flag is flipped.
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
inline int getSendTag(int rank, int peer)
|
||||
{
|
||||
return rank < peer ? 0 : 1;
|
||||
}
|
||||
|
||||
inline int getRecvTag(int rank, int peer)
|
||||
{
|
||||
return rank < peer ? 1 : 0;
|
||||
}
|
||||
|
||||
inline int getBlockNum(size_t count)
|
||||
{
|
||||
return std::min((count + THRES_BYTES_PER_BLOCK - 1) / THRES_BYTES_PER_BLOCK, MAX_BLOCKS_NUM);
|
||||
}
|
||||
|
||||
__device__ SyncGpuState GLOBAL_SYNC_STATE;
|
||||
|
||||
__global__ void kernel(int rank, size_t dataSize, size_t dataPerBlock)
|
||||
{
|
||||
mscclppDevConn_t sendConn = sendConnConst;
|
||||
mscclppDevConn_t recvConn = recvConnConst;
|
||||
size_t startIndex = blockIdx.x * dataPerBlock;
|
||||
size_t blockDataSize = min(dataSize - startIndex, dataPerBlock);
|
||||
int tid = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
|
||||
sendConn.putDirect(startIndex, blockDataSize, threadIdx.x, blockDim.x);
|
||||
sync_gpu(GLOBAL_SYNC_STATE, gridDim.x);
|
||||
if (tid == 0) {
|
||||
sendConn.signalDirect();
|
||||
recvConn.waitDirect();
|
||||
}
|
||||
}
|
||||
|
||||
void SendRecvGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset,
|
||||
size_t* recvInplaceOffset, size_t count, int nranks)
|
||||
{
|
||||
size_t base = (count / ALIGN) * ALIGN;
|
||||
*sendcount = base;
|
||||
*recvcount = base;
|
||||
*sendInplaceOffset = base;
|
||||
*recvInplaceOffset = 0;
|
||||
*paramcount = base;
|
||||
}
|
||||
|
||||
testResult_t SendRecvInitColl()
|
||||
{
|
||||
SyncGpuState state = {};
|
||||
CUDACHECK(cudaMemcpyToSymbol(GLOBAL_SYNC_STATE, &state, sizeof(SyncGpuState)));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t SendRecvInitData(struct testArgs* args, int in_place)
|
||||
{
|
||||
size_t sendCount = args->sendBytes / sizeof(int);
|
||||
size_t recvCount = args->expectedBytes / sizeof(int);
|
||||
size_t maxCount = std::max(sendCount, recvCount);
|
||||
|
||||
int rank = args->proc;
|
||||
CUDACHECK(cudaMemset(args->sendbuff, 0, args->sendBytes));
|
||||
std::vector<int> dataHost(maxCount, rank);
|
||||
CUDACHECK(cudaMemcpy(args->sendbuff, dataHost.data(), sendCount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
|
||||
int recvPeerRank = (rank - 1 + args->totalProcs) % args->totalProcs;
|
||||
for (size_t i = 0; i < recvCount; i++) {
|
||||
dataHost[i] = recvPeerRank;
|
||||
}
|
||||
CUDACHECK(cudaMemcpy(args->expected, dataHost.data(), recvCount * sizeof(int), cudaMemcpyHostToDevice));
|
||||
MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
void SendRecvGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks)
|
||||
{
|
||||
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
|
||||
|
||||
*algBw = baseBw;
|
||||
double factor = 1;
|
||||
*busBw = baseBw * factor;
|
||||
}
|
||||
|
||||
testResult_t SendRecvRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm,
|
||||
cudaStream_t stream, int kernel_num)
|
||||
{
|
||||
int blockNum = getBlockNum(count);
|
||||
size_t bytesPerBlock = (count + blockNum - 1) / blockNum;
|
||||
kernel<<<blockNum, BLOCK_THREADS_NUM, 0, stream>>>(comm->rank, count, bytesPerBlock);
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testColl sendRecvTest = {"SendRecvTest", SendRecvGetCollByteCount, SendRecvInitColl, SendRecvInitData,
|
||||
SendRecvGetBw, SendRecvRunColl};
|
||||
|
||||
void SendRecvGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks)
|
||||
{
|
||||
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
|
||||
SendRecvGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
|
||||
}
|
||||
|
||||
testResult_t SendRecvSetupConnections(struct testArgs* args)
|
||||
{
|
||||
int rank = args->proc;
|
||||
int worldSize = args->totalProcs;
|
||||
int ranksPerNode = args->nranksPerNode;
|
||||
int thisNode = rank / ranksPerNode;
|
||||
int localRank = rank % ranksPerNode;
|
||||
std::string ibDevStr = "mlx5_ib" + std::to_string(localRank);
|
||||
int sendToRank = (rank + 1) % worldSize;
|
||||
int recvFromRank = (rank - 1 + worldSize) % worldSize;
|
||||
std::array<int, 2> ranks = {sendToRank, recvFromRank};
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
int r = ranks[i];
|
||||
const char* ibDev = r / ranksPerNode == thisNode ? nullptr : ibDevStr.c_str();
|
||||
mscclppTransport_t transportType = ibDev == nullptr ? mscclppTransportP2P : mscclppTransportIB;
|
||||
void* buff = (i == 0) ? args->sendbuff : args->recvbuff;
|
||||
int tag = (i == 0) ? getSendTag(rank, r) : getRecvTag(rank, r);
|
||||
MSCCLPPCHECK(mscclppConnect(args->comm, r, tag, buff, args->maxbytes, transportType, ibDev));
|
||||
}
|
||||
MSCCLPPCHECK(mscclppConnectionSetup(args->comm));
|
||||
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
testResult_t SendRecvRunTest(struct testArgs* args)
|
||||
{
|
||||
args->collTest = &sendRecvTest;
|
||||
int rank = args->proc, worldSize = args->totalProcs;
|
||||
|
||||
// only support out-of-place for sendrecv test
|
||||
args->in_place = 0;
|
||||
|
||||
mscclppDevConn_t* sendDevConn;
|
||||
mscclppDevConn_t* recvDevConn;
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, (rank + 1) % worldSize, getSendTag(rank, (rank + 1) % worldSize),
|
||||
&sendDevConn));
|
||||
MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, (rank - 1 + worldSize) % worldSize,
|
||||
getRecvTag(rank, (rank - 1 + worldSize) % worldSize), &recvDevConn));
|
||||
CUDACHECK(cudaMemcpyToSymbol(sendConnConst, sendDevConn, sizeof(mscclppDevConn_t)));
|
||||
CUDACHECK(cudaMemcpyToSymbol(recvConnConst, recvDevConn, sizeof(mscclppDevConn_t)));
|
||||
TESTCHECK(TimeTest(args));
|
||||
return testSuccess;
|
||||
}
|
||||
|
||||
struct testEngine sendRecvTestEngine = {SendRecvGetBuffSize, SendRecvRunTest, SendRecvSetupConnections, nullptr};
|
||||
|
||||
#pragma weak mscclppTestEngine = sendRecvTestEngine
|
||||
Reference in New Issue
Block a user