diff --git a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu index 2853ad16..4a57d30d 100644 --- a/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu +++ b/src/ext/collectives/alltoallv/alltoallv_fullmesh.cu @@ -13,18 +13,11 @@ #include #include -#include #include "debug.h" namespace mscclpp { namespace collective { -#if defined(__HIP_PLATFORM_AMD__) -#define ALLTOALLV_WARP_SIZE 64 -#else -#define ALLTOALLV_WARP_SIZE 32 -#endif - using MultiNodeMode = AlltoallvFullmesh::MultiNodeMode; // Context to hold all necessary state for alltoallv execution @@ -397,7 +390,5 @@ AlgorithmCtxKey AlltoallvFullmesh::generateAlltoallvContextKey( return {(void*)input, output, inputSize, outputSize, 0}; } -#undef ALLTOALLV_WARP_SIZE - } // namespace collective } // namespace mscclpp diff --git a/src/ext/collectives/collective_utils.cc b/src/ext/collectives/collective_utils.cc index 270223b3..90a3530c 100644 --- a/src/ext/collectives/collective_utils.cc +++ b/src/ext/collectives/collective_utils.cc @@ -4,7 +4,6 @@ #include "collective_utils.hpp" #include -#include #include #include #include @@ -124,26 +123,6 @@ std::vector setupPortChannels( return channels; } -std::vector setupAllPortChannels( - std::shared_ptr proxyService, - mscclpp::Communicator& comm, - const std::vector& connections, - const std::vector& remoteMemories, - mscclpp::RegisteredMemory localMemory) { - std::vector channels; - mscclpp::MemoryId srcMemId = proxyService->addMemory(localMemory); - for (size_t cid = 0; cid < connections.size(); ++cid) { - // Create PortChannel for EVERY connection (CudaIpc and IB alike). - // The ProxyService proxy thread handles both connection types: - // - CudaIpc: cudaMemcpyD2D via IPC-mapped pointer - // - IB: RDMA write via ibv_post_send - mscclpp::SemaphoreId semId = proxyService->buildAndAddSemaphore(comm, connections[cid]); - mscclpp::MemoryId dstMemId = proxyService->addMemory(remoteMemories[cid]); - channels.emplace_back(proxyService->portChannel(semId, dstMemId, srcMemId)); - } - return channels; -} - std::shared_ptr setupPortChannelDeviceHandles( const std::vector& portChannels) { if (portChannels.empty()) return nullptr; diff --git a/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp b/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp index e00773f0..8fffab74 100644 --- a/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp +++ b/src/ext/collectives/include/alltoallv/alltoallv_kernel.hpp @@ -11,21 +11,6 @@ namespace mscclpp { namespace collective { -#if defined(__HIP_PLATFORM_AMD__) -#define ALLTOALLV_WARP_SIZE 64 -#else -#define ALLTOALLV_WARP_SIZE 32 -#endif - -// Chunk size for pipelined transfers (1MB) -// Large enough to amortize overhead, small enough for good memory patterns -constexpr size_t ALLTOALLV_CHUNK_SIZE = 1 << 20; - -// Default number of blocks for multi-block kernels. -// Tuned for H100 (132 SMs). Enough to saturate NVLink bandwidth without -// excessive DeviceSyncer overhead. -constexpr int ALLTOALLV_DEFAULT_NBLOCKS = 24; - // Default blocks per peer for the peer-parallel kernel. // Controls how many thread blocks cooperate on each peer's data transfer. constexpr int ALLTOALLV_DEFAULT_BLOCKS_PER_PEER = 16; @@ -239,352 +224,7 @@ __global__ void __launch_bounds__(1024) } } -/** - * Legacy multi-block AllToAllV kernel (sequential peers). - * - * All thread blocks cooperate on each peer's data transfer using global thread IDs. - * Peers are processed sequentially. Kept for comparison; prefer alltoallvPeerParallelKernel. - * - * Launch config: <<>> - */ -__global__ void __launch_bounds__(1024) - alltoallvMultiBlockKernel(DeviceHandle* memoryChannels, - DeviceSyncer* syncer, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - const int gtid = threadIdx.x + blockIdx.x * blockDim.x; - const int nThreads = blockDim.x * gridDim.x; - const int nPeers = worldSize - 1; - // Phase 1: Local copy — all threads across all blocks cooperate - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], gtid, nThreads); - } - // Phase 2: Remote puts — all blocks cooperate on each peer's transfer - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - int chanIdx = peerIdx; - - if (sendCounts[peer] > 0) { - memoryChannels[chanIdx].put( - remoteRecvDispls[peer], - sendDispls[peer], - sendCounts[peer], - gtid, - nThreads - ); - } - } - - // Phase 3: Grid-wide barrier - syncer->sync(gridDim.x); - - // Phase 4: Signal all peers, then wait (single thread) - if (gtid == 0) { - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - memoryChannels[peerIdx].signal(); - } - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - if (recvCounts[peer] > 0) { - memoryChannels[peerIdx].wait(); - } - } - } -} - -/** - * High-performance AllToAllV kernel using maximum thread parallelism. - * - * Processes each peer sequentially but uses ALL block threads (1024) for each - * data transfer to maximize copy bandwidth. This provides much better performance - * than the warp-per-peer approach for large message sizes. - * - * Launch config: <<<1, 1024>>> for maximum bandwidth within a single block. - * - * @param memoryChannels Array of MemoryChannel handles for each peer (worldSize-1 channels) - * @param rank Current rank - * @param worldSize Total number of ranks - * @param sendBuff Source buffer containing data to send - * @param recvBuff Destination buffer for received data - * @param sendCounts Array of send counts for each rank (in bytes) - * @param sendDispls Array of send displacements for each rank (in bytes) - * @param recvCounts Array of receive counts for each rank (in bytes) - * @param recvDispls Array of receive displacements for each rank (in bytes) - */ -__global__ void __launch_bounds__(1024) - alltoallvKernel(DeviceHandle* memoryChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - int tid = threadIdx.x; - int nThreads = blockDim.x; - int nPeers = worldSize - 1; - - // Step 1: Copy local data using ALL threads for maximum bandwidth - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], tid, nThreads); - } - __syncthreads(); - - // Step 2: Process each peer sequentially, but use ALL threads for each transfer - // This maximizes bandwidth for each transfer compared to warp-per-peer approach - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - int chanIdx = peerIdx; - - if (sendCounts[peer] > 0) { - // Use all threads for maximum copy throughput - memoryChannels[chanIdx].put( - remoteRecvDispls[peer], // dst offset in peer's buffer (peer's recvDispls[rank]) - sendDispls[peer], // src offset in our buffer - sendCounts[peer], // size - tid, // thread id - nThreads // total threads - ); - } - __syncthreads(); - - // Only one thread signals per peer - if (tid == 0) { - memoryChannels[chanIdx].signal(); - } - __syncthreads(); - - // Wait for incoming data from this peer - if (tid == 0 && recvCounts[peer] > 0) { - memoryChannels[chanIdx].wait(); - } - __syncthreads(); - } -} - -/** - * Pipelined AllToAllV kernel for imbalanced workloads. - * - * For large messages, breaks transfers into chunks to improve memory access - * patterns, but avoids excessive signaling overhead by signaling only once - * per peer after all chunks are sent. - * - * Optimized for MoE workloads where message sizes can vary by 100x+ between ranks. - * - * Launch config: <<<1, 1024>>> - */ -__global__ void __launch_bounds__(1024) - alltoallvPipelinedKernel(DeviceHandle* memoryChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - int tid = threadIdx.x; - int nThreads = blockDim.x; - int nPeers = worldSize - 1; - - // Step 1: Copy local data - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], tid, nThreads); - } - __syncthreads(); - - // Step 2: Process each peer - send all data in chunks, then signal once - for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) { - int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - int chanIdx = peerIdx; - - size_t sendSize = sendCounts[peer]; - size_t recvSize = recvCounts[peer]; - size_t dstOffset = remoteRecvDispls[peer]; // peer's recvDispls[rank] - size_t srcOffset = sendDispls[peer]; - - // Send data in chunks for better memory access patterns - // But only signal ONCE after all chunks are sent (avoids signaling overhead) - if (sendSize > 0) { - for (size_t offset = 0; offset < sendSize; offset += ALLTOALLV_CHUNK_SIZE) { - size_t chunkSize = (sendSize - offset < ALLTOALLV_CHUNK_SIZE) - ? (sendSize - offset) : ALLTOALLV_CHUNK_SIZE; - memoryChannels[chanIdx].put( - dstOffset + offset, - srcOffset + offset, - chunkSize, - tid, - nThreads - ); - __syncthreads(); - } - } - - // Signal ONCE after all data is sent - if (tid == 0 && sendSize > 0) { - memoryChannels[chanIdx].signal(); - } - __syncthreads(); - - // Wait ONCE for all peer's data - if (tid == 0 && recvSize > 0) { - memoryChannels[chanIdx].wait(); - } - __syncthreads(); - } -} - -/** - * Ring-based AllToAllV kernel with maximum thread parallelism. - * - * Uses step-by-step ring pattern with ALL threads for maximum bandwidth. - * Each step processes one peer pair, with correct semaphore handling. - */ -__global__ void __launch_bounds__(1024) - alltoallvRingKernel(DeviceHandle* memoryChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - int tid = threadIdx.x; - int nThreads = blockDim.x; - - // Copy local data first using ALL threads - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], tid, nThreads); - } - __syncthreads(); - - // Ring-based exchange - process each peer sequentially - // Key fix: use the SAME channel for both signal and wait (peer-pair symmetry) - for (int step = 1; step < worldSize; step++) { - int sendPeer = (rank + step) % worldSize; - int chanIdx = sendPeer < rank ? sendPeer : sendPeer - 1; - - // Send data to sendPeer using ALL threads - if (sendCounts[sendPeer] > 0) { - memoryChannels[chanIdx].put( - remoteRecvDispls[sendPeer], // dst offset in peer's buffer (peer's recvDispls[rank]) - sendDispls[sendPeer], - sendCounts[sendPeer], - tid, - nThreads - ); - } - __syncthreads(); - - // Signal completion on the SAME channel we'll wait on - if (tid == 0) { - memoryChannels[chanIdx].signal(); - } - __syncthreads(); - - // Wait for peer's data on the SAME channel (correct semaphore pairing) - if (tid == 0 && recvCounts[sendPeer] > 0) { - memoryChannels[chanIdx].wait(); - } - __syncthreads(); - } -} - -/** - * PortChannel-only AllToAllV kernel for multi-node. - * - * Uses PortChannel (proxy-based) for ALL peers — both intra-node and inter-node. - * This follows the proven pattern from allgather_test_cpp.cu which works reliably - * on GB200 multi-node NVSwitch systems. - * - * For intra-node CudaIpc connections, the proxy performs cudaMemcpyD2D. - * For inter-node IB connections, the proxy performs RDMA writes. - * - * Each block handles one peer. Thread 0 pushes a put descriptor to the FIFO - * (single-threaded), which triggers the proxy to perform the data transfer. - * - * Launch config: <<>> - */ -__global__ void __launch_bounds__(1024) - alltoallvPortChannelKernel(PortChannelDeviceHandle* portChannels, - int rank, - int worldSize, - const void* sendBuff, - void* recvBuff, - const size_t* sendCounts, - const size_t* sendDispls, - const size_t* recvCounts, - const size_t* recvDispls, - const size_t* remoteRecvDispls) { - const int nPeers = worldSize - 1; - - // Handle trivial case (single rank) - if (nPeers == 0) { - const int gtid = threadIdx.x + blockIdx.x * blockDim.x; - const int nThreads = blockDim.x * gridDim.x; - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], gtid, nThreads); - } - return; - } - - // Phase 1: Local copy — all blocks cooperate using global thread IDs - const int gtid = threadIdx.x + blockIdx.x * blockDim.x; - const int nThreads = blockDim.x * gridDim.x; - if (sendCounts[rank] > 0) { - mscclpp::copy((char*)recvBuff + recvDispls[rank], - (void*)((const char*)sendBuff + sendDispls[rank]), - sendCounts[rank], gtid, nThreads); - } - - // Phase 2: Per-peer data transfer via PortChannel (proxy-based). - // Each block handles one peer: blockIdx.x == peerIdx. - const int peerIdx = blockIdx.x; - if (peerIdx >= nPeers) return; - - const int peer = peerIdx < rank ? peerIdx : peerIdx + 1; - - // Thread 0 pushes a put+signal+flush descriptor to the proxy FIFO. - // The proxy thread performs the actual data transfer (cudaMemcpy or RDMA). - if (threadIdx.x == 0 && sendCounts[peer] > 0) { - portChannels[peerIdx].putWithSignalAndFlush( - remoteRecvDispls[peer], // dst offset in peer's output buffer - sendDispls[peer], // src offset in our input buffer - sendCounts[peer] // bytes to transfer - ); - } - __syncthreads(); - - // Wait for incoming data from this peer - if (threadIdx.x == 0 && recvCounts[peer] > 0) { - portChannels[peerIdx].wait(); - } -} - -#undef ALLTOALLV_WARP_SIZE } // namespace collective } // namespace mscclpp \ No newline at end of file diff --git a/src/ext/collectives/include/collective_utils.hpp b/src/ext/collectives/include/collective_utils.hpp index 02c85096..97497eea 100644 --- a/src/ext/collectives/include/collective_utils.hpp +++ b/src/ext/collectives/include/collective_utils.hpp @@ -51,12 +51,6 @@ std::vector setupConnections(std::shared_ptr comm); /// @return Vector of connections (one per peer) std::vector setupHybridConnections(std::shared_ptr comm, int localGpuIdx); -/// Check if a connection is intra-node (CudaIpc transport). -/// @param conn The connection to check -/// @return true if the connection uses CudaIpc transport -inline bool isIntraNodeConnection(const Connection& conn) { - return conn.transport() == Transport::CudaIpc; -} /// Get the IB transport for a given local GPU index. /// @param localGpuIdx Local GPU index (0-7) @@ -82,20 +76,6 @@ std::vector setupPortChannels( /// This follows the proven pattern from allgather_test_cpp.cu: /// - CudaIpc connections: proxy does cudaMemcpyD2D /// - IB connections: proxy does RDMA write -/// Creates one PortChannel per peer (dense indexing by peerIdx). -/// @param proxyService The ProxyService managing transfers -/// @param comm The communicator -/// @param connections All connections (mixed CudaIpc + IB) -/// @param remoteMemories Remote registered memories (one per peer) -/// @param localMemory Local registered memory -/// @return Vector of PortChannels (one per peer, in connection order) -std::vector setupAllPortChannels( - std::shared_ptr proxyService, - Communicator& comm, - const std::vector& connections, - const std::vector& remoteMemories, - RegisteredMemory localMemory); - /// Setup PortChannel device handles (GPU-allocated array). std::shared_ptr setupPortChannelDeviceHandles( const std::vector& portChannels); diff --git a/test/mscclpp-test/alltoallv_test.cu b/test/mscclpp-test/alltoallv_test.cu index a813e703..bebfcf53 100644 --- a/test/mscclpp-test/alltoallv_test.cu +++ b/test/mscclpp-test/alltoallv_test.cu @@ -65,44 +65,6 @@ void AllToAllVTestColl::runColl(const TestArgs& args, cudaStream_t stream) { const int nThreads = 1024; if (kernelNum == 0) { - // Use high-throughput kernel with all threads participating in each transfer - mscclpp::collective::alltoallvKernel<<<1, nThreads, 0, stream>>>( - d_memoryChannels, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 1) { - // Use ring-based kernel for larger world sizes - mscclpp::collective::alltoallvRingKernel<<<1, nThreads, 0, stream>>>( - d_memoryChannels, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 2) { - // Use pipelined kernel for imbalanced workloads (MoE) - mscclpp::collective::alltoallvPipelinedKernel<<<1, nThreads, 0, stream>>>( - d_memoryChannels, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 3) { - // Use legacy multi-block kernel (sequential peers) - const int nBlocks = mscclpp::collective::ALLTOALLV_DEFAULT_NBLOCKS; - mscclpp::collective::alltoallvMultiBlockKernel<<>>( - d_memoryChannels, - d_deviceSyncer, - rank, worldSize, - localSendBuffV, localRecvBuffV, - d_sendCounts, d_sendDispls, - d_recvCounts, d_recvDispls, - d_remoteRecvDispls); - } else if (kernelNum == 4) { // Peer-parallel kernel: small messages (1 block/peer, no barrier) const int nPeers = worldSize - 1; const int nBlocks = (nPeers > 0) ? nPeers : 1; @@ -114,7 +76,7 @@ void AllToAllVTestColl::runColl(const TestArgs& args, cudaStream_t stream) { d_sendCounts, d_sendDispls, d_recvCounts, d_recvDispls, d_remoteRecvDispls); - } else if (kernelNum == 5) { + } else if (kernelNum == 1) { // Peer-parallel kernel: large messages (multiple blocks/peer, barrier) const int nPeers = worldSize - 1; const int blocksPerPeer = mscclpp::collective::ALLTOALLV_DEFAULT_BLOCKS_PER_PEER; @@ -220,12 +182,8 @@ void AllToAllVTestColl::setupCollTest(size_t size) { std::vector AllToAllVTestColl::getKernelRestrictions() { return { - {0, "alltoallvKernel", true, 1, 4 * worldSize_}, - {1, "alltoallvRingKernel", true, 1, 4 * worldSize_}, - {2, "alltoallvPipelinedKernel", true, 1, 4 * worldSize_}, - {3, "alltoallvMultiBlockKernel", true, 1, 4 * worldSize_}, - {4, "alltoallvPeerParallel(small)", true, 1, 4 * worldSize_}, - {5, "alltoallvPeerParallel(large)", true, 1, 4 * worldSize_} + {0, "alltoallvPeerParallel(small)", true, 1, 4 * worldSize_}, + {1, "alltoallvPeerParallel(large)", true, 1, 4 * worldSize_} }; }