diff --git a/README.md b/README.md index 56f8056d..defeade2 100644 --- a/README.md +++ b/README.md @@ -55,42 +55,54 @@ $ ./build/bin/tests/bootstrap_test 127.0.0.1:50000 1 2 ## Performance -All results from NDv4. "xp-yn" means "x" total GPUs across "y" nodes. +All results from NDv4. NCCL version 2.17.1+cuda11.8, reported in-place numbers. + +nccl-tests command example: +```bash +mpirun --bind-to numa -hostfile /mnt/hostfile --tag-output --allow-run-as-root -map-by ppr:8:node --bind-to numa -mca pml ob1 -mca btl ^openib -mca btl_tcp_if_include eth0 -x PATH -x LD_PRELOAD=/mnt/nccl/build/lib/libnccl.so -x NCCL_IB_PCI_RELAXED_ORDERING=1 -x NCCL_SOCKET_IFNAME=eth0 -x CUDA_DEVICE_ORDER=PCI_BUS_ID -x NCCL_NET_GDR_LEVEL=5 -x NCCL_TOPO_FILE=/mnt/ndv4-topo.xml -x NCCL_DEBUG=WARN ./build/all_gather_perf -b 1K -e 1K -g 1 -c 1 -w 10 -n 10 -G 1 +``` + +mscclpp-tests command example: +```bash +mpirun -allow-run-as-root -map-by ppr:8:node -hostfile /mnt/hostfile -x LD_LIBRARY_PATH=/mnt/mscclpp/build/lib:$LD_LIBRARY_PATH ./build/bin/tests/allgather_test_perf -b 1K -e 1K -w 10 -n 10 -G 1 -k 0 +``` **NOTE:** NCCL AllGather leverages Ring algorithm instead of all-pairs alike algorithm, which greatly reduces inter-node transmission, causing significant higher performance. MSCCL++ should do something similar in the future -### 8p-1n +### 1 node, 8 gpus/node **Latency (us)** -| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 | -|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:| -| 1K | 13.12 | 9.61 | **7.76** | 21.06 | 28.50 | 157.91 | 143.21 | +| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce | +|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:| +| 1K | 12.53 | **16.96** | 9.34 | **7.76** / 21.06 / 28.50 | 157.91 / 143.21 / 447.0 | 326.4 | **BusBW (GB/s)** -| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 | -|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:| -| 1G | 218.27 | 220.09 | 217.05 | 216.98 | 217.15 | 93.69 | **255.06** | +| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce | +|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:----------------------------:|:-----------------:| +| 1G | 253.59 | **132.31** | 254.69 | 217.05 / 216.98 / 217.15 | 125.06 / **255.64** / 124.89 | 22.55 | -### 2p-2n +### 2 nodes, 1 gpu/node **Latency (us)** -| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 | -|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:| -| 1K | 15.31 | 28.36 | 14.67 | 29.12 | 35.43 | 15.32 | **13.84** | +| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce | +|:------------:|:--------------:|:--------------:|:--------------:|:------------------------------:|:--------------------------:|:-----------------:| +| 1K | 16.08 | **21.27** | 29.84 | 14.67 / 29.12 / 35.43 | 15.32 / **13.84** / 26.08 | - | **BusBW (GB/s)** -| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 | -|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:| -| 1G | 15.69 | 16.22 | 13.94 | 13.83 | 14.10 | **23.26** | **23.29** | +| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce | +|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:| +| 1G | 15.84 | **18.65** | 15.48 | 13.94 / 13.83 / 14.10 | **23.30** / 23.29 / 21.60 | - | -### 16p-2n +### 2 nodes, 8 gpus/node **Latency (us)** -| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 | -|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:| -| 1K | 31.70 | 45.12 | **22.55** | 39.33 | 56.93 | 159.14 | 230.52 | +| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce | +|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:| +| 1K | 33.74 | **35.85** | 49.75 | **22.55** / 39.33 / 56.93 | 159.14 / 230.52 / 462.7 | - | **BusBW (GB/s)** -| Message Size | NCCL AllGather | NCCL AllToAll | MSCCL AllToAll LL | MSCCL AllToAll LL128 | MSCCL AllToAll Simple | MSCCL++ AllGather K0 | MSCCL++ AllGather K1 | -|:------------:|:--------------:|:-------------:|:-----------------:|:--------------------:|:---------------------:|:--------------------:|:--------------------:| -| 1G | 174.28 | 38.30 | 40.17 | 40.18 | 40.23 | **44.08** | 9.31 | +| Message Size | NCCL AllGather | NCCL AllReduce | NCCL AllToAll | MSCCL AllToAll LL/LL128/Simple | MSCCL++ AllGather K0/K1/K2 | MSCCL++ AllReduce | +|:------------:|:--------------:|:--------------:|:-------------:|:------------------------------:|:--------------------------:|:-----------------:| +| 1G | 177.05 | **183.82** | 37.80 | 40.17 / 40.18 / 40.23 | 44.19 / 9.31 / **209.33** | - | +| 4G | 186.01 | **188.18** | 37.81 | - / - / - | 44.60 / - / **234.08** | - | + ## Contributing diff --git a/src/ib.cc b/src/ib.cc index 99450863..7eed6b5e 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -97,7 +97,9 @@ IbQp::IbQp(void* ctx, void* pd, int port) { this->info.linkLayer = portAttr.link_layer; this->info.qpn = _qp->qp_num; this->info.mtu = portAttr.active_mtu; - if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND) { + this->info.is_grh = (portAttr.flags & IBV_QPF_GRH_REQUIRED); + + if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND || this->info.is_grh) { union ibv_gid gid; if (ibv_query_gid(_ctx, port, 0, &gid) != 0) { std::stringstream err; @@ -105,6 +107,7 @@ IbQp::IbQp(void* ctx, void* pd, int port) { throw mscclpp::IbError(err.str(), errno); } this->info.spn = gid.global.subnet_prefix; + this->info.iid = gid.global.interface_id; } struct ibv_qp_attr qpAttr; @@ -142,18 +145,18 @@ void IbQp::rtr(const IbQpInfo& info) { qp_attr.rq_psn = 0; qp_attr.max_dest_rd_atomic = 1; qp_attr.min_rnr_timer = 0x12; - if (info.linkLayer == IBV_LINK_LAYER_ETHERNET) { + if (info.linkLayer == IBV_LINK_LAYER_ETHERNET || info.is_grh) { qp_attr.ah_attr.is_global = 1; qp_attr.ah_attr.grh.dgid.global.subnet_prefix = info.spn; - qp_attr.ah_attr.grh.dgid.global.interface_id = info.lid; + qp_attr.ah_attr.grh.dgid.global.interface_id = info.iid; qp_attr.ah_attr.grh.flow_label = 0; qp_attr.ah_attr.grh.sgid_index = 0; qp_attr.ah_attr.grh.hop_limit = 255; qp_attr.ah_attr.grh.traffic_class = 0; } else { qp_attr.ah_attr.is_global = 0; - qp_attr.ah_attr.dlid = info.lid; } + qp_attr.ah_attr.dlid = info.lid; qp_attr.ah_attr.sl = 0; qp_attr.ah_attr.src_path_bits = 0; qp_attr.ah_attr.port_num = info.port; diff --git a/src/include/ib.hpp b/src/include/ib.hpp index fea25615..2fe9a447 100644 --- a/src/include/ib.hpp +++ b/src/include/ib.hpp @@ -43,6 +43,8 @@ struct IbQpInfo { uint32_t qpn; uint64_t spn; int mtu; + uint64_t iid; + bool is_grh; }; class IbQp { diff --git a/src/include/mscclpp.h b/src/include/mscclpp.h index b74f5a8d..b57dc263 100644 --- a/src/include/mscclpp.h +++ b/src/include/mscclpp.h @@ -137,12 +137,44 @@ struct mscclppDevConn { ; } + // Version that uses the SM directly to do the copy, instead of using the proxy thread like the functions above. + __forceinline__ __device__ void putDirect(uint64_t dstDataOffset, uint64_t srcDataOffset, uint64_t dataSize, + uint32_t threadId, uint32_t numThreads) { + uint64_t* src = (uint64_t*)((char*)localBuff + srcDataOffset); + uint64_t* dst = (uint64_t*)((char*)remoteBuff + dstDataOffset); + // assume the memory is aligned to 8 bytes + size_t nElem = + dataSize % sizeof(uint64_t) ? (dataSize + sizeof(uint64_t)) / sizeof(uint64_t) : dataSize / sizeof(uint64_t); + for (size_t i = threadId; i < nElem; i += numThreads) { + dst[i] = src[i]; + } + } + + __forceinline__ __device__ void putDirect(uint64_t dataOffset, uint64_t dataSize, uint32_t threadId, + uint32_t numThreads) { + putDirect(dataOffset, dataOffset, dataSize, threadId, numThreads); + } + + __forceinline__ __device__ void signalDirect() { + // This fence ensures that the writes from a preceding putDirect() are visible on the peer GPU before the + // incremented epoch id is visible. + __threadfence_system(); + epochIncrement(); + *(volatile uint64_t*)&(remoteSignalEpochId->device) = localSignalEpochId->device; + } + __forceinline__ __device__ void wait() { (*waitEpochId) += 1; while (*(volatile uint64_t*)&(localSignalEpochId->proxy) < (*waitEpochId)) ; } + __forceinline__ __device__ void waitDirect() { + (*waitEpochId) += 1; + while (*(volatile uint64_t*)&(localSignalEpochId->device) < (*waitEpochId)) + ; + } + __forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(localSignalEpochId->device) += 1; } #endif // __CUDACC__ diff --git a/test/allgather_test.cu b/test/allgather_test.cu index 647debef..764f33fb 100644 --- a/test/allgather_test.cu +++ b/test/allgather_test.cu @@ -195,8 +195,8 @@ testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, return testSuccess; } -struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, AllGatherInitData, AllGatherGetBw, - AllGatherRunColl}; +struct testColl allGatherTest = {"AllGather", AllGatherGetCollByteCount, defaultInitColl, AllGatherInitData, + AllGatherGetBw, AllGatherRunColl}; void AllGatherGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) { @@ -215,6 +215,6 @@ testResult_t AllGatherRunTest(struct testArgs* args) return testSuccess; } -struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest}; +struct testEngine allGatherEngine = {AllGatherGetBuffSize, AllGatherRunTest, nullptr, nullptr}; #pragma weak mscclppTestEngine = allGatherEngine diff --git a/test/allreduce_allpairs_test.cu b/test/allreduce_allpairs_test.cu deleted file mode 100644 index 514a2b30..00000000 --- a/test/allreduce_allpairs_test.cu +++ /dev/null @@ -1,322 +0,0 @@ -#include "mscclpp.h" -#include -#include -#include - -#include "common.h" - -#define MSCCLPPCHECK(call) \ - do { \ - mscclppResult_t res = call; \ - if (res != mscclppSuccess && res != mscclppInProgress) { \ - /* Print the back trace*/ \ - printf("Failure at %s:%d -> %d\n", __FILE__, __LINE__, res); \ - return res; \ - } \ - } while (0); - -#define CUDACHECK(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ - exit(EXIT_FAILURE); \ - } \ - } while (false) - -struct Volume -{ - size_t offset; - size_t size; -}; - -__host__ __device__ Volume chunkVolume(size_t totalSize, size_t totalChunks, size_t chunkIdx, size_t chunkCount) -{ - size_t remainder = totalSize % totalChunks; - size_t smallChunk = totalSize / totalChunks; - size_t largeChunk = smallChunk + 1; - size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; - size_t numSmallChunks = chunkCount - numLargeChunks; - size_t offset = - (remainder - numLargeChunks) * largeChunk + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunk; - return Volume{offset, numLargeChunks * largeChunk + numSmallChunks * smallChunk}; -} - -template struct AllreduceAllpairs -{ - int rank; - int nRanks; - T* userData; - size_t userSize; - T* scratch; - size_t scratchSize; - mscclppDevConn_t* conns; - uint64_t* connFlags; - cuda::barrier* barrier; - - __device__ void run(int idx) - { - int myPeer = peerRank(idx, rank); - mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(myPeer)]; - mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(myPeer)]; - mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(myPeer)]; - - // 1st communication phase: send data to the scratch buffer of the peer associated with this block - Volume toPeer = chunkVolume(userSize, nRanks, myPeer, 1); - // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. - // The destination will have allocated a scratch buffer of size numPeers() * toPeer.size and - // inside that each of the destination's peers send to the nth chunk, where n is the index of the - // source peer from the destination's perspective. - size_t dstOffset = peerIdx(rank, myPeer) * toPeer.size; - send(phase1SendConn, toPeer.offset, dstOffset, toPeer.size); - recv(phase1RecvConn); - - if (threadIdx.x == 0) - barrier->arrive_and_wait(); - __syncthreads(); - - // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer - Volume rankUserChunk = chunkVolume(userSize, nRanks, rank, 1); - T* userChunk = userData + rankUserChunk.offset; - Volume blockUserChunk = chunkVolume(rankUserChunk.size, numBlocks(), idx, 1); - for (int peerIdx = 0; peerIdx < numPeers(); ++peerIdx) { - assert(scratchSize % numPeers() == 0); - assert(scratchSize / numPeers() == rankUserChunk.size); - size_t scratchChunkSize = scratchSize / numPeers(); - T* scratchChunk = scratch + peerIdx * scratchChunkSize; - Volume blockScratchChunk = chunkVolume(scratchChunkSize, numBlocks(), idx, 1); - assert(blockScratchChunk.size == blockUserChunk.size); - reduce(userChunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); - } - - if (threadIdx.x == 0) - barrier->arrive_and_wait(); - __syncthreads(); - - // 2nd communication phase: send the now reduced data between the user buffers - Volume srcVolume2 = chunkVolume(userSize, nRanks, rank, 1); - send(phase2Conn, srcVolume2.offset, srcVolume2.offset, srcVolume2.size); - recv(phase2Conn); - } - - __device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) - { - if (threadIdx.x == 0) { - volatile uint64_t* localFlag = conn.localFlag; - *localFlag = 1; // 1 is used to signal the send - - mscclppTrigger_t trigger; - auto request = conn.fifo.getTrigger(&trigger); - conn.fifo.setTrigger(trigger, mscclppData | mscclppFlag, srcOffset * sizeof(T), dstOffset * sizeof(T), - size * sizeof(T)); - } - __syncthreads(); - } - - __device__ void recv(mscclppDevConn_t& conn) - { - if (threadIdx.x == 0) { - volatile uint64_t* proxyFlag = conn.proxyFlag; - while (*proxyFlag != 1) { - } - *proxyFlag = 0; - } - __syncthreads(); - } - - __host__ __device__ int numPeers() - { - return nRanks - 1; - } - - __host__ __device__ int numBlocks() - { - return numPeers(); - } - - __host__ __device__ int peerIdx(int peerRank, int myRank) - { - return peerRank < myRank ? peerRank : peerRank - 1; - } - - __host__ __device__ int peerRank(int peerIdx, int myRank) - { - return peerIdx < myRank ? peerIdx : peerIdx + 1; - } - - __host__ __device__ int phase1SendConnIdx(int peerRank) - { - return peerIdx(peerRank, rank) * 3; - } - - __host__ __device__ int phase1RecvConnIdx(int peerRank) - { - return peerIdx(peerRank, rank) * 3 + 1; - } - - __host__ __device__ int phase2ConnIdx(int peerRank) - { - return peerIdx(peerRank, rank) * 3 + 2; - } - - void freeGPUResources() - { - if (scratch) - CUDACHECK(cudaFree(scratch)); - scratch = nullptr; - if (connFlags) - CUDACHECK(cudaFree(connFlags)); - connFlags = nullptr; - if (conns) - CUDACHECK(cudaFree(conns)); - conns = nullptr; - if (barrier) - CUDACHECK(cudaFree(barrier)); - barrier = nullptr; - } -}; - -// The builder class encapsulates the -template class AllreduceAllpairsBuilder -{ - AllreduceAllpairs d; - std::vector hostConns; - -public: - // The constructor is called after the user has allocated the buffer to be allreduced - AllreduceAllpairsBuilder(T* data, size_t size) - { - d.userData = data; - d.userSize = size; - d.scratch = nullptr; - d.connFlags = nullptr; - d.conns = nullptr; - d.barrier = nullptr; - } - - // connect is called after rank initialization but before connection setup - mscclppResult_t connect(mscclppComm_t comm) - { - MSCCLPPCHECK(mscclppCommRank(comm, &d.rank)); - MSCCLPPCHECK(mscclppCommSize(comm, &d.nRanks)); - - Volume myChunks = chunkVolume(d.userSize, d.nRanks, d.rank, 1); - d.scratchSize = myChunks.size * d.numPeers(); - - CUDACHECK(cudaMalloc(&d.scratch, d.scratchSize * sizeof(T))); - CUDACHECK(cudaMalloc(&d.connFlags, 3 * sizeof(uint64_t))); - CUDACHECK(cudaMemset(d.connFlags, 0, 3 * sizeof(uint64_t))); - - hostConns.resize(d.numPeers() * 3); - for (int peer = 0; peer < d.nRanks; ++peer) { - if (peer != d.rank) { - int sendTag = d.rank < peer ? 0 : 1; - int recvTag = d.rank < peer ? 1 : 0; - MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1SendConnIdx(peer), peer, d.userData, - d.userSize * sizeof(T), d.connFlags + 0, sendTag, mscclppTransportP2P, nullptr)); - MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase1RecvConnIdx(peer), peer, d.scratch, - d.scratchSize * sizeof(T), d.connFlags + 1, recvTag, mscclppTransportP2P, nullptr)); - MSCCLPPCHECK(mscclppConnect(comm, hostConns.data() + d.phase2ConnIdx(peer), peer, d.userData, - d.userSize * sizeof(T), d.connFlags + 2, 2, mscclppTransportP2P, nullptr)); - } - } - - return mscclppSuccess; - } - - // finishSetup is called after connection setup and returns an algorithm object that is ready to be passed to a GPU - // kernel - AllreduceAllpairs finishSetup() - { - CUDACHECK(cudaMalloc(&d.conns, hostConns.size() * sizeof(mscclppDevConn_t))); - CUDACHECK( - cudaMemcpy(d.conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice)); - CUDACHECK(cudaMalloc(&d.barrier, sizeof(cuda::barrier))); - cuda::barrier initBarrier(d.numBlocks()); - CUDACHECK( - cudaMemcpy(d.barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); - return d; - } -}; - -template __device__ void reduceSum(T* dst, T* src, size_t size) -{ - for (int i = threadIdx.x; i < size; i += blockDim.x) { - dst[i] += src[i]; - } -} - -template __global__ void init(T* data, size_t size, int rank) -{ - for (int i = threadIdx.x; i < size; i += blockDim.x) { - data[i] = rank; - } -} - -// The main test kernel -template __global__ void testKernel(AllreduceAllpairs d) -{ - d.run(blockIdx.x); -} - -int main(int argc, const char* argv[]) -{ -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - MPI_Init(NULL, NULL); -#endif - const char* ip_port; - int rank, world_size; - parse_arguments(argc, argv, &ip_port, &rank, &world_size); - - CUDACHECK(cudaSetDevice(rank)); - - // Allocate and initialize 1 MB of data - int* data; - size_t dataSize = 1024 * 1024 / sizeof(int); - CUDACHECK(cudaMalloc(&data, dataSize * sizeof(int))); - init<<<1, 256>>>(data, dataSize, rank); - - // Create the collective - AllreduceAllpairsBuilder builder(data, dataSize); - - // Create the communicator - mscclppComm_t comm; - MSCCLPPCHECK(mscclppCommInitRank(&comm, world_size, rank, ip_port)); - - // Connect the collective - builder.connect(comm); - - // Finish the setup - MSCCLPPCHECK(mscclppConnectionSetup(comm)); - MSCCLPPCHECK(mscclppProxyLaunch(comm)); - auto allreduce = builder.finishSetup(); - - // Run the collective - testKernel<<>>(allreduce); - - // Wait for kernel to finish - CUDACHECK(cudaDeviceSynchronize()); - - // Check the result - int* hostData = new int[dataSize]; - CUDACHECK(cudaMemcpy(hostData, data, dataSize * sizeof(int), cudaMemcpyDeviceToHost)); - int expectedValue = world_size * (world_size - 1) / 2; - for (size_t i = 0; i < dataSize; ++i) { - if (hostData[i] != expectedValue) { - printf("Error at index %lu: %d != %d\n", i, hostData[i], expectedValue); - return 1; - } - } - - MSCCLPPCHECK(mscclppProxyStop(comm)); - - MSCCLPPCHECK(mscclppCommDestroy(comm)); - -#ifdef MSCCLPP_USE_MPI_FOR_TESTS - if (argc == 2) { - MPI_Finalize(); - } -#endif - printf("Succeeded! %d\n", rank); - return 0; -} \ No newline at end of file diff --git a/test/allreduce_test.cu b/test/allreduce_test.cu new file mode 100644 index 00000000..7f2bf176 --- /dev/null +++ b/test/allreduce_test.cu @@ -0,0 +1,284 @@ +#include +#include +#include + +#include "comm.h" +#include "common.h" + +#define ALIGN 4 + +const int phase2Tag = 2; +mscclppDevConn_t* conns; +void* scratch = nullptr; +void* sendRecvData = nullptr; +cuda::barrier* barrier = nullptr; + +struct Chunk +{ + size_t offset; + size_t size; +}; + +inline int getSendTag(int rank, int peer) +{ + return rank < peer ? 0 : 1; +} + +inline int getRecvTag(int rank, int peer) +{ + return rank < peer ? 1 : 0; +} + +__host__ __device__ Chunk getChunk(size_t dataCount, size_t numChunks, size_t chunkIdx, size_t chunkCount) +{ + size_t remainder = dataCount % numChunks; + size_t smallChunkSize = dataCount / numChunks; + size_t largeChunkSize = smallChunkSize + 1; + size_t numLargeChunks = chunkIdx < remainder ? remainder - chunkIdx : 0; + size_t numSmallChunks = chunkCount - numLargeChunks; + size_t offset = + (remainder - numLargeChunks) * largeChunkSize + (chunkIdx > remainder ? chunkIdx - remainder : 0) * smallChunkSize; + return Chunk{offset, numLargeChunks * largeChunkSize + numSmallChunks * smallChunkSize}; +} + +__host__ __device__ int peerIdx(int peerRank, int rank) +{ + return peerRank < rank ? peerRank : peerRank - 1; +} + +__host__ __device__ int peerRank(int peerIdx, int rank) +{ + return peerIdx < rank ? peerIdx : peerIdx + 1; +} + +__host__ __device__ int phase1SendConnIdx(int peerRank, int rank) +{ + return peerIdx(peerRank, rank) * 3; +} + +__host__ __device__ int phase1RecvConnIdx(int peerRank, int rank) +{ + return peerIdx(peerRank, rank) * 3 + 1; +} + +__host__ __device__ int phase2ConnIdx(int peerRank, int rank) +{ + return peerIdx(peerRank, rank) * 3 + 2; +} + +__device__ void send(mscclppDevConn_t& conn, size_t srcOffset, size_t dstOffset, size_t size) +{ + if (threadIdx.x == 0) { + conn.putWithSignalAndFlush(dstOffset, srcOffset, size); + } + __syncthreads(); +} + +__device__ void recv(mscclppDevConn_t& conn) +{ + if (threadIdx.x == 0) { + conn.wait(); + } + __syncthreads(); +} + +__device__ void reduceSum(int* dst, int* src, size_t size) +{ + for (int i = threadIdx.x; i < size; i += blockDim.x) { + dst[i] += src[i]; + } +} + +__global__ void initData(int* data, size_t size, int rank) +{ + for (int i = threadIdx.x; i < size; i += blockDim.x) { + data[i] = rank; + } +} + +__global__ void allReduceKernel0(int rank, int nRanks, size_t dataCount, size_t scratchDataCount, + mscclppDevConn_t* conns, void* scratch, void* sendRecvData, + cuda::barrier* barrier) +{ + int idx = blockIdx.x; + int peer = peerRank(idx, rank); + mscclppDevConn_t phase1SendConn = conns[phase1SendConnIdx(peer, rank)]; + mscclppDevConn_t phase1RecvConn = conns[phase1RecvConnIdx(peer, rank)]; + mscclppDevConn_t phase2Conn = conns[phase2ConnIdx(peer, rank)]; + + // 1st communication phase: send data to the scratch buffer of the peer associated with this block + Chunk toPeerChunk = getChunk(dataCount, nRanks, peer, 1); + // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. + // The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and + // inside that each of the destination's peers send to the nth chunk, where n is the index of the + // source peer from the destination's perspective. + size_t dstOffset = peerIdx(rank, peer) * toPeerChunk.size; + send(phase1SendConn, toPeerChunk.offset * sizeof(int), dstOffset * sizeof(int), toPeerChunk.size * sizeof(int)); + recv(phase1RecvConn); + + if (threadIdx.x == 0) + barrier->arrive_and_wait(); + __syncthreads(); + + // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer + Chunk rankChunk = getChunk(dataCount, nRanks, rank, 1); + int* chunk = (int*)sendRecvData + rankChunk.offset; + int numPeers = nRanks - 1, numBlocks = nRanks - 1; + Chunk blockUserChunk = getChunk(rankChunk.size, numBlocks, idx, 1); + for (int peerIdx = 0; peerIdx < numPeers; ++peerIdx) { + assert(scratchDataCount % numPeers == 0); + assert(scratchDataCount / numPeers == rankChunk.size); + size_t scratchDataCountPerPeer = scratchDataCount / numPeers; + int* scratchChunk = (int*)scratch + peerIdx * scratchDataCountPerPeer; + Chunk blockScratchChunk = getChunk(scratchDataCountPerPeer, numBlocks, idx, 1); + assert(blockScratchChunk.size == blockUserChunk.size); + reduceSum(chunk + blockUserChunk.offset, scratchChunk + blockScratchChunk.offset, blockScratchChunk.size); + } + + if (threadIdx.x == 0) + barrier->arrive_and_wait(); + __syncthreads(); + + // 2nd communication phase: send the now reduced data between the user buffers + Chunk collectionChunk = getChunk(dataCount, nRanks, rank, 1); + send(phase2Conn, collectionChunk.offset * sizeof(int), collectionChunk.offset * sizeof(int), + collectionChunk.size * sizeof(int)); + recv(phase2Conn); +} + +void AllReduceGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, + size_t* recvInplaceOffset, size_t count, int nranks) +{ + size_t base = (count / ALIGN) * ALIGN; + *sendcount = base; + *recvcount = base; + *sendInplaceOffset = 0; + *recvInplaceOffset = 0; + *paramcount = base; +} + +void AllReduceGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) +{ + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + AllReduceGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t AllReduceInitData(struct testArgs* args, int in_place) +{ + size_t recvcount = args->expectedBytes / sizeof(int); + + CUDACHECK(cudaSetDevice(args->gpuNum)); + CUDACHECK(cudaMemset(args->recvbuff, 0, args->expectedBytes)); + initData<<<1, 256>>>((int*)args->recvbuff, recvcount, args->proc); + + int* dataHost = new int[recvcount]; + for (size_t i = 0; i < recvcount; i++) { + dataHost[i] = args->totalProcs * (args->totalProcs - 1) / 2; + } + CUDACHECK(cudaMemcpy(args->expected, dataHost, recvcount * sizeof(int), cudaMemcpyHostToDevice)); + delete dataHost; + CUDACHECK(cudaDeviceSynchronize()); + MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm)); + return testSuccess; +} + +void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) +{ + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = (2 * (double)(nranks - 1)) / ((double)nranks); + *busBw = baseBw * factor; +} + +testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t nBytes, mscclppComm_t comm, + cudaStream_t stream, int kernelNum) +{ + int worldSize = comm->nRanks; + int nPeers = worldSize - 1; + int dataCount = nBytes / sizeof(int); + Chunk chunk = getChunk(dataCount, worldSize, comm->rank, 1); + size_t scratchDataCount = chunk.size * nPeers; + allReduceKernel0<<>>(comm->rank, worldSize, dataCount, scratchDataCount, conns, + scratch, sendRecvData, barrier); + return testSuccess; +} + +struct testColl allReduceTest = {"AllReduce", AllReduceGetCollByteCount, defaultInitColl, AllReduceInitData, + AllReduceGetBw, AllReduceRunColl}; + +testResult_t AllReduceSetupMscclppConnections(struct testArgs* args) +{ + int rank = args->proc, worldSize = args->totalProcs; + size_t bufferSize = args->maxbytes; + Chunk chunk = getChunk(bufferSize / sizeof(int), args->totalProcs, rank, 1); + int nPeers = args->totalProcs - 1; + size_t scratchBytes = chunk.size * nPeers * sizeof(int); + + CUDACHECK(cudaMalloc(&scratch, scratchBytes)); + + for (int peer = 0; peer < worldSize; ++peer) { + if (peer != args->proc) { + int sendTag = getSendTag(args->proc, peer); + int recvTag = getRecvTag(args->proc, peer); + MSCCLPPCHECK(mscclppConnect(args->comm, peer, sendTag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); + MSCCLPPCHECK(mscclppConnect(args->comm, peer, recvTag, scratch, scratchBytes, mscclppTransportP2P, nullptr)); + MSCCLPPCHECK( + mscclppConnect(args->comm, peer, phase2Tag, args->recvbuff, bufferSize, mscclppTransportP2P, nullptr)); + } + } + MSCCLPPCHECK(mscclppConnectionSetup(args->comm)); + + return testSuccess; +} + +testResult_t AllReduceTeardownMscclppConnections() +{ + if (scratch != nullptr) { + CUDACHECK(cudaFree(scratch)); + scratch = nullptr; + } + return testSuccess; +} + +testResult_t AllReduceRunTest(struct testArgs* args) +{ + args->collTest = &allReduceTest; + + sendRecvData = args->recvbuff; + CUDACHECK(cudaMalloc(&barrier, sizeof(cuda::barrier))); + cuda::barrier initBarrier(args->totalProcs - 1); + CUDACHECK( + cudaMemcpy(barrier, &initBarrier, sizeof(cuda::barrier), cudaMemcpyHostToDevice)); + int nPeers = args->totalProcs - 1; + int rank = args->proc; + std::vector hostConns(nPeers * 3, mscclppDevConn_t()); + + for (int peer = 0; peer < args->totalProcs; ++peer) { + mscclppDevConn_t* devConn; + if (peer != rank) { + int sendTag = getSendTag(args->proc, peer); + int recvTag = getRecvTag(args->proc, peer); + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, sendTag, &devConn)); + hostConns[phase1SendConnIdx(peer, rank)] = *devConn; + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, recvTag, &devConn)); + hostConns[phase1RecvConnIdx(peer, rank)] = *devConn; + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, peer, phase2Tag, &devConn)); + hostConns[phase2ConnIdx(peer, rank)] = *devConn; + } + } + CUDACHECK(cudaMalloc(&conns, nPeers * 3 * sizeof(mscclppDevConn_t))); + CUDACHECK(cudaMemcpy(conns, hostConns.data(), hostConns.size() * sizeof(mscclppDevConn_t), cudaMemcpyHostToDevice)); + + TESTCHECK(TimeTest(args)); + + CUDACHECK(cudaFree(barrier)); + CUDACHECK(cudaFree(conns)); + + return testSuccess; +} + +struct testEngine allReduceEngine = {AllReduceGetBuffSize, AllReduceRunTest, AllReduceSetupMscclppConnections, + AllReduceTeardownMscclppConnections}; + +#pragma weak mscclppTestEngine = allReduceEngine diff --git a/test/common.cu b/test/common.cu index 77c3223f..35d82c74 100644 --- a/test/common.cu +++ b/test/common.cu @@ -212,11 +212,8 @@ template void Allreduce(struct testArgs* args, T* value, int averag *value = accumulator; } -testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts) +testResult_t CheckData(struct testArgs* args, int64_t* wrongElts) { - if (in_place == 0) { - return testInternalError; - } size_t count = args->expectedBytes / sizeof(int); int* dataHostRecv = new int[count]; @@ -226,10 +223,11 @@ testResult_t CheckData(struct testArgs* args, int in_place, int64_t* wrongElts) for (size_t i = 0; i < count; i++) { if (dataHostRecv[i] != dataHostExpected[i]) { + // PRINT("Error: dataHostRecv[%ld] = %d, dataHostExpected[%ld] = %d\n", i, dataHostRecv[i], i, + // dataHostExpected[i]); *wrongElts += 1; } } - if (args->reportErrors && *wrongElts) { (args->error)++; } @@ -300,7 +298,7 @@ testResult_t BenchTime(struct testArgs* args, int in_place) CUDACHECK(cudaGraphExecDestroy(graphExec)); CUDACHECK(cudaGraphDestroy(graph)); - TESTCHECK(CheckData(args, in_place, &wrongElts)); + TESTCHECK(CheckData(args, &wrongElts)); // aggregate delta from all threads and procs long long wrongElts1 = wrongElts; @@ -317,6 +315,9 @@ testResult_t BenchTime(struct testArgs* args, int in_place) } else { sprintf(timeStr, "%7.2f", timeUsec); } + if (!in_place) { + PRINT(" "); + } if (args->reportErrors) { PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts); } else { @@ -328,7 +329,7 @@ testResult_t BenchTime(struct testArgs* args, int in_place) return testSuccess; } -void setupArgs(size_t size, struct testArgs* args) +testResult_t setupArgsAndInit(size_t size, struct testArgs* args) { int nranks = args->totalProcs; size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset; @@ -344,6 +345,8 @@ void setupArgs(size_t size, struct testArgs* args) args->expectedBytes = recvCount * typeSize; args->sendInplaceOffset = sendInplaceOffset * typeSize; args->recvInplaceOffset = recvInplaceOffset * typeSize; + + return args->collTest->initColl(); } testResult_t TimeTest(struct testArgs* args) @@ -352,7 +355,7 @@ testResult_t TimeTest(struct testArgs* args) TESTCHECK(Barrier(args)); // Warm-up for large size - setupArgs(args->maxbytes, args); + TESTCHECK(setupArgsAndInit(args->maxbytes, args)); TESTCHECK(args->collTest->initData(args, 1)); for (int iter = 0; iter < warmup_iters; iter++) { TESTCHECK(startColl(args, 1, iter)); @@ -360,7 +363,7 @@ testResult_t TimeTest(struct testArgs* args) TESTCHECK(completeColl(args)); // Warm-up for small size - setupArgs(args->minbytes, args); + TESTCHECK(setupArgsAndInit(args->minbytes, args)); for (int iter = 0; iter < warmup_iters; iter++) { TESTCHECK(startColl(args, 1, iter)); } @@ -375,11 +378,9 @@ testResult_t TimeTest(struct testArgs* args) // Benchmark for (size_t size = args->minbytes; size <= args->maxbytes; size = ((args->stepfactor > 1) ? size * args->stepfactor : size + args->stepbytes)) { - setupArgs(size, args); + TESTCHECK(setupArgsAndInit(size, args)); PRINT("%12li %12li", max(args->sendBytes, args->expectedBytes), args->nbytes / sizeof(int)); - // Don't support out-of-place for now - // TESTCHECK(BenchTime(args, 0)); - TESTCHECK(BenchTime(args, 1)); + TESTCHECK(BenchTime(args, args->in_place)); PRINT("\n"); } return testSuccess; @@ -415,13 +416,20 @@ testResult_t setupMscclppConnections(int rank, int worldSize, int ranksPerNode, testResult_t runTests(struct testArgs* args) { PRINT("# Setting up the connection in MSCCL++\n"); - TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff, - args->maxbytes)); + if (mscclppTestEngine.setupMscclppConnections != nullptr) { + TESTCHECK(mscclppTestEngine.setupMscclppConnections(args)); + } else { + TESTCHECK(setupMscclppConnections(args->proc, args->totalProcs, args->nranksPerNode, args->comm, args->recvbuff, + args->maxbytes)); + } PRINT("# Launching MSCCL++ proxy threads\n"); MSCCLPPCHECK(mscclppProxyLaunch(args->comm)); TESTCHECK(mscclppTestEngine.runTest(args)); PRINT("Stopping MSCCL++ proxy threads\n"); MSCCLPPCHECK(mscclppProxyStop(args->comm)); + if (mscclppTestEngine.teardownMscclppConnections != nullptr) { + TESTCHECK(mscclppTestEngine.teardownMscclppConnections()); + } return testSuccess; } @@ -638,6 +646,7 @@ testResult_t run() worker.args.stepfactor = stepFactor; worker.args.localRank = localRank; worker.args.nranksPerNode = nranksPerNode; + worker.args.in_place = 1; worker.args.totalProcs = totalProcs; worker.args.proc = proc; @@ -670,6 +679,9 @@ testResult_t run() CUDACHECK(cudaFreeHost(delta)); int error = worker.args.error; +#if MSCCLPP_USE_MPI_FOR_TESTS + MPI_Allreduce(MPI_IN_PLACE, &error, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); +#endif PRINT("# Out of bounds values : %d %s\n", error, error ? "FAILED" : "OK"); PRINT("#\n"); diff --git a/test/common.h b/test/common.h index 39766de0..e5e95519 100644 --- a/test/common.h +++ b/test/common.h @@ -64,11 +64,17 @@ typedef enum testNumResults = 5 } testResult_t; +inline testResult_t defaultInitColl() +{ + return testSuccess; +} + struct testColl { const char name[20]; void (*getCollByteCount)(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, size_t* recvInplaceOffset, size_t count, int nranks); + testResult_t (*initColl)(); testResult_t (*initData)(struct testArgs* args, int in_place); void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks); testResult_t (*runColl)(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, @@ -80,6 +86,8 @@ struct testEngine void (*getBuffSize)(size_t* sendcount, size_t* recvcount, size_t count, int nranks); // We can add more parameters for other communication primitives testResult_t (*runTest)(struct testArgs* args); + testResult_t (*setupMscclppConnections)(struct testArgs* args); + testResult_t (*teardownMscclppConnections)(); }; extern struct testEngine mscclppTestEngine; @@ -98,6 +106,7 @@ struct testArgs int localRank; int nranksPerNode; int kernel_num; + int in_place; void* sendbuff; size_t sendBytes; size_t sendInplaceOffset; @@ -151,4 +160,4 @@ inline void print_usage(const char* prog) if (is_main_thread) \ printf -#endif // MSCCLPP_TESTS_COMMON_H_ \ No newline at end of file +#endif // MSCCLPP_TESTS_COMMON_H_ diff --git a/test/sendrecv_test.cu b/test/sendrecv_test.cu new file mode 100644 index 00000000..06ec2466 --- /dev/null +++ b/test/sendrecv_test.cu @@ -0,0 +1,203 @@ +#include "comm.h" +#include "common.h" + +#include +#include +#include +#include +#include + +constexpr size_t BLOCK_THREADS_NUM = 1024; +// Try to use more blocks if per-block data size exceeds this threshold +constexpr size_t THRES_BYTES_PER_BLOCK = 8192; +// Let it no more than the number of SMs on a GPU +constexpr size_t MAX_BLOCKS_NUM = 32; + +#define ALIGN 4 + +__constant__ mscclppDevConn_t sendConnConst; +__constant__ mscclppDevConn_t recvConnConst; + +struct SyncGpuState +{ + volatile int flag; + int cnt; + int is_add; +}; + +// Synchronize multiple thread blocks inside a kernel. Guarantee that all +// previous work of all threads in cooperating blocks is finished and +// visible to all threads in the device. +__forceinline__ __device__ void sync_gpu(SyncGpuState& state, int blockNum) +{ + int maxOldCnt = blockNum - 1; + __syncthreads(); + if (threadIdx.x == 0) { + int is_add_ = state.is_add ^ 1; + if (is_add_) { + if (atomicAdd(&state.cnt, 1) == maxOldCnt) { + state.flag = 1; + } + while (!state.flag) { + } + } else { + if (atomicSub(&state.cnt, 1) == 1) { + state.flag = 0; + } + while (state.flag) { + } + } + state.is_add = is_add_; + } + // We need sync here because only a single thread is checking whether + // the flag is flipped. + __syncthreads(); +} + +inline int getSendTag(int rank, int peer) +{ + return rank < peer ? 0 : 1; +} + +inline int getRecvTag(int rank, int peer) +{ + return rank < peer ? 1 : 0; +} + +inline int getBlockNum(size_t count) +{ + return std::min((count + THRES_BYTES_PER_BLOCK - 1) / THRES_BYTES_PER_BLOCK, MAX_BLOCKS_NUM); +} + +__device__ SyncGpuState GLOBAL_SYNC_STATE; + +__global__ void kernel(int rank, size_t dataSize, size_t dataPerBlock) +{ + mscclppDevConn_t sendConn = sendConnConst; + mscclppDevConn_t recvConn = recvConnConst; + size_t startIndex = blockIdx.x * dataPerBlock; + size_t blockDataSize = min(dataSize - startIndex, dataPerBlock); + int tid = blockIdx.x * blockDim.x + threadIdx.x; + + sendConn.putDirect(startIndex, blockDataSize, threadIdx.x, blockDim.x); + sync_gpu(GLOBAL_SYNC_STATE, gridDim.x); + if (tid == 0) { + sendConn.signalDirect(); + recvConn.waitDirect(); + } +} + +void SendRecvGetCollByteCount(size_t* sendcount, size_t* recvcount, size_t* paramcount, size_t* sendInplaceOffset, + size_t* recvInplaceOffset, size_t count, int nranks) +{ + size_t base = (count / ALIGN) * ALIGN; + *sendcount = base; + *recvcount = base; + *sendInplaceOffset = base; + *recvInplaceOffset = 0; + *paramcount = base; +} + +testResult_t SendRecvInitColl() +{ + SyncGpuState state = {}; + CUDACHECK(cudaMemcpyToSymbol(GLOBAL_SYNC_STATE, &state, sizeof(SyncGpuState))); + return testSuccess; +} + +testResult_t SendRecvInitData(struct testArgs* args, int in_place) +{ + size_t sendCount = args->sendBytes / sizeof(int); + size_t recvCount = args->expectedBytes / sizeof(int); + size_t maxCount = std::max(sendCount, recvCount); + + int rank = args->proc; + CUDACHECK(cudaMemset(args->sendbuff, 0, args->sendBytes)); + std::vector dataHost(maxCount, rank); + CUDACHECK(cudaMemcpy(args->sendbuff, dataHost.data(), sendCount * sizeof(int), cudaMemcpyHostToDevice)); + + int recvPeerRank = (rank - 1 + args->totalProcs) % args->totalProcs; + for (size_t i = 0; i < recvCount; i++) { + dataHost[i] = recvPeerRank; + } + CUDACHECK(cudaMemcpy(args->expected, dataHost.data(), recvCount * sizeof(int), cudaMemcpyHostToDevice)); + MSCCLPPCHECK(mscclppBootstrapBarrier(args->comm)); + + return testSuccess; +} + +void SendRecvGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) +{ + double baseBw = (double)(count * typesize) / 1.0E9 / sec; + + *algBw = baseBw; + double factor = 1; + *busBw = baseBw * factor; +} + +testResult_t SendRecvRunColl(void* sendbuff, void* recvbuff, int nranksPerNode, size_t count, mscclppComm_t comm, + cudaStream_t stream, int kernel_num) +{ + int blockNum = getBlockNum(count); + size_t bytesPerBlock = (count + blockNum - 1) / blockNum; + kernel<<>>(comm->rank, count, bytesPerBlock); + return testSuccess; +} + +struct testColl sendRecvTest = {"SendRecvTest", SendRecvGetCollByteCount, SendRecvInitColl, SendRecvInitData, + SendRecvGetBw, SendRecvRunColl}; + +void SendRecvGetBuffSize(size_t* sendcount, size_t* recvcount, size_t count, int nranks) +{ + size_t paramcount, sendInplaceOffset, recvInplaceOffset; + SendRecvGetCollByteCount(sendcount, recvcount, ¶mcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks); +} + +testResult_t SendRecvSetupConnections(struct testArgs* args) +{ + int rank = args->proc; + int worldSize = args->totalProcs; + int ranksPerNode = args->nranksPerNode; + int thisNode = rank / ranksPerNode; + int localRank = rank % ranksPerNode; + std::string ibDevStr = "mlx5_ib" + std::to_string(localRank); + int sendToRank = (rank + 1) % worldSize; + int recvFromRank = (rank - 1 + worldSize) % worldSize; + std::array ranks = {sendToRank, recvFromRank}; + + for (int i = 0; i < 2; i++) { + int r = ranks[i]; + const char* ibDev = r / ranksPerNode == thisNode ? nullptr : ibDevStr.c_str(); + mscclppTransport_t transportType = ibDev == nullptr ? mscclppTransportP2P : mscclppTransportIB; + void* buff = (i == 0) ? args->sendbuff : args->recvbuff; + int tag = (i == 0) ? getSendTag(rank, r) : getRecvTag(rank, r); + MSCCLPPCHECK(mscclppConnect(args->comm, r, tag, buff, args->maxbytes, transportType, ibDev)); + } + MSCCLPPCHECK(mscclppConnectionSetup(args->comm)); + + return testSuccess; +} + +testResult_t SendRecvRunTest(struct testArgs* args) +{ + args->collTest = &sendRecvTest; + int rank = args->proc, worldSize = args->totalProcs; + + // only support out-of-place for sendrecv test + args->in_place = 0; + + mscclppDevConn_t* sendDevConn; + mscclppDevConn_t* recvDevConn; + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, (rank + 1) % worldSize, getSendTag(rank, (rank + 1) % worldSize), + &sendDevConn)); + MSCCLPPCHECK(mscclppGetDeviceConnection(args->comm, (rank - 1 + worldSize) % worldSize, + getRecvTag(rank, (rank - 1 + worldSize) % worldSize), &recvDevConn)); + CUDACHECK(cudaMemcpyToSymbol(sendConnConst, sendDevConn, sizeof(mscclppDevConn_t))); + CUDACHECK(cudaMemcpyToSymbol(recvConnConst, recvDevConn, sizeof(mscclppDevConn_t))); + TESTCHECK(TimeTest(args)); + return testSuccess; +} + +struct testEngine sendRecvTestEngine = {SendRecvGetBuffSize, SendRecvRunTest, SendRecvSetupConnections, nullptr}; + +#pragma weak mscclppTestEngine = sendRecvTestEngine