mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-11 17:00:22 +00:00
Remove legacy alltoallv kernels and dead code from collective utils.
This commit is contained in:
@@ -13,18 +13,11 @@
|
||||
#include <mscclpp/gpu_utils.hpp>
|
||||
#include <mscclpp/utils.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include "debug.h"
|
||||
|
||||
namespace mscclpp {
|
||||
namespace collective {
|
||||
|
||||
#if defined(__HIP_PLATFORM_AMD__)
|
||||
#define ALLTOALLV_WARP_SIZE 64
|
||||
#else
|
||||
#define ALLTOALLV_WARP_SIZE 32
|
||||
#endif
|
||||
|
||||
using MultiNodeMode = AlltoallvFullmesh::MultiNodeMode;
|
||||
|
||||
// Context to hold all necessary state for alltoallv execution
|
||||
@@ -397,7 +390,5 @@ AlgorithmCtxKey AlltoallvFullmesh::generateAlltoallvContextKey(
|
||||
return {(void*)input, output, inputSize, outputSize, 0};
|
||||
}
|
||||
|
||||
#undef ALLTOALLV_WARP_SIZE
|
||||
|
||||
} // namespace collective
|
||||
} // namespace mscclpp
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
#include "collective_utils.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <mscclpp/algorithm.hpp>
|
||||
#include <mscclpp/core.hpp>
|
||||
#include <mscclpp/memory_channel.hpp>
|
||||
#include <mscclpp/port_channel.hpp>
|
||||
@@ -124,26 +123,6 @@ std::vector<mscclpp::PortChannel> setupPortChannels(
|
||||
return channels;
|
||||
}
|
||||
|
||||
std::vector<mscclpp::PortChannel> setupAllPortChannels(
|
||||
std::shared_ptr<mscclpp::ProxyService> proxyService,
|
||||
mscclpp::Communicator& comm,
|
||||
const std::vector<mscclpp::Connection>& connections,
|
||||
const std::vector<mscclpp::RegisteredMemory>& remoteMemories,
|
||||
mscclpp::RegisteredMemory localMemory) {
|
||||
std::vector<mscclpp::PortChannel> channels;
|
||||
mscclpp::MemoryId srcMemId = proxyService->addMemory(localMemory);
|
||||
for (size_t cid = 0; cid < connections.size(); ++cid) {
|
||||
// Create PortChannel for EVERY connection (CudaIpc and IB alike).
|
||||
// The ProxyService proxy thread handles both connection types:
|
||||
// - CudaIpc: cudaMemcpyD2D via IPC-mapped pointer
|
||||
// - IB: RDMA write via ibv_post_send
|
||||
mscclpp::SemaphoreId semId = proxyService->buildAndAddSemaphore(comm, connections[cid]);
|
||||
mscclpp::MemoryId dstMemId = proxyService->addMemory(remoteMemories[cid]);
|
||||
channels.emplace_back(proxyService->portChannel(semId, dstMemId, srcMemId));
|
||||
}
|
||||
return channels;
|
||||
}
|
||||
|
||||
std::shared_ptr<mscclpp::PortChannelDeviceHandle> setupPortChannelDeviceHandles(
|
||||
const std::vector<mscclpp::PortChannel>& portChannels) {
|
||||
if (portChannels.empty()) return nullptr;
|
||||
|
||||
@@ -11,21 +11,6 @@
|
||||
namespace mscclpp {
|
||||
namespace collective {
|
||||
|
||||
#if defined(__HIP_PLATFORM_AMD__)
|
||||
#define ALLTOALLV_WARP_SIZE 64
|
||||
#else
|
||||
#define ALLTOALLV_WARP_SIZE 32
|
||||
#endif
|
||||
|
||||
// Chunk size for pipelined transfers (1MB)
|
||||
// Large enough to amortize overhead, small enough for good memory patterns
|
||||
constexpr size_t ALLTOALLV_CHUNK_SIZE = 1 << 20;
|
||||
|
||||
// Default number of blocks for multi-block kernels.
|
||||
// Tuned for H100 (132 SMs). Enough to saturate NVLink bandwidth without
|
||||
// excessive DeviceSyncer overhead.
|
||||
constexpr int ALLTOALLV_DEFAULT_NBLOCKS = 24;
|
||||
|
||||
// Default blocks per peer for the peer-parallel kernel.
|
||||
// Controls how many thread blocks cooperate on each peer's data transfer.
|
||||
constexpr int ALLTOALLV_DEFAULT_BLOCKS_PER_PEER = 16;
|
||||
@@ -239,352 +224,7 @@ __global__ void __launch_bounds__(1024)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Legacy multi-block AllToAllV kernel (sequential peers).
|
||||
*
|
||||
* All thread blocks cooperate on each peer's data transfer using global thread IDs.
|
||||
* Peers are processed sequentially. Kept for comparison; prefer alltoallvPeerParallelKernel.
|
||||
*
|
||||
* Launch config: <<<nBlocks, 1024>>>
|
||||
*/
|
||||
__global__ void __launch_bounds__(1024)
|
||||
alltoallvMultiBlockKernel(DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
DeviceSyncer* syncer,
|
||||
int rank,
|
||||
int worldSize,
|
||||
const void* sendBuff,
|
||||
void* recvBuff,
|
||||
const size_t* sendCounts,
|
||||
const size_t* sendDispls,
|
||||
const size_t* recvCounts,
|
||||
const size_t* recvDispls,
|
||||
const size_t* remoteRecvDispls) {
|
||||
const int gtid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
const int nThreads = blockDim.x * gridDim.x;
|
||||
const int nPeers = worldSize - 1;
|
||||
|
||||
// Phase 1: Local copy — all threads across all blocks cooperate
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], gtid, nThreads);
|
||||
}
|
||||
|
||||
// Phase 2: Remote puts — all blocks cooperate on each peer's transfer
|
||||
for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) {
|
||||
int peer = peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
int chanIdx = peerIdx;
|
||||
|
||||
if (sendCounts[peer] > 0) {
|
||||
memoryChannels[chanIdx].put(
|
||||
remoteRecvDispls[peer],
|
||||
sendDispls[peer],
|
||||
sendCounts[peer],
|
||||
gtid,
|
||||
nThreads
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 3: Grid-wide barrier
|
||||
syncer->sync(gridDim.x);
|
||||
|
||||
// Phase 4: Signal all peers, then wait (single thread)
|
||||
if (gtid == 0) {
|
||||
for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) {
|
||||
memoryChannels[peerIdx].signal();
|
||||
}
|
||||
for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) {
|
||||
int peer = peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
if (recvCounts[peer] > 0) {
|
||||
memoryChannels[peerIdx].wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* High-performance AllToAllV kernel using maximum thread parallelism.
|
||||
*
|
||||
* Processes each peer sequentially but uses ALL block threads (1024) for each
|
||||
* data transfer to maximize copy bandwidth. This provides much better performance
|
||||
* than the warp-per-peer approach for large message sizes.
|
||||
*
|
||||
* Launch config: <<<1, 1024>>> for maximum bandwidth within a single block.
|
||||
*
|
||||
* @param memoryChannels Array of MemoryChannel handles for each peer (worldSize-1 channels)
|
||||
* @param rank Current rank
|
||||
* @param worldSize Total number of ranks
|
||||
* @param sendBuff Source buffer containing data to send
|
||||
* @param recvBuff Destination buffer for received data
|
||||
* @param sendCounts Array of send counts for each rank (in bytes)
|
||||
* @param sendDispls Array of send displacements for each rank (in bytes)
|
||||
* @param recvCounts Array of receive counts for each rank (in bytes)
|
||||
* @param recvDispls Array of receive displacements for each rank (in bytes)
|
||||
*/
|
||||
__global__ void __launch_bounds__(1024)
|
||||
alltoallvKernel(DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
int rank,
|
||||
int worldSize,
|
||||
const void* sendBuff,
|
||||
void* recvBuff,
|
||||
const size_t* sendCounts,
|
||||
const size_t* sendDispls,
|
||||
const size_t* recvCounts,
|
||||
const size_t* recvDispls,
|
||||
const size_t* remoteRecvDispls) {
|
||||
int tid = threadIdx.x;
|
||||
int nThreads = blockDim.x;
|
||||
int nPeers = worldSize - 1;
|
||||
|
||||
// Step 1: Copy local data using ALL threads for maximum bandwidth
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], tid, nThreads);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Step 2: Process each peer sequentially, but use ALL threads for each transfer
|
||||
// This maximizes bandwidth for each transfer compared to warp-per-peer approach
|
||||
for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) {
|
||||
int peer = peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
int chanIdx = peerIdx;
|
||||
|
||||
if (sendCounts[peer] > 0) {
|
||||
// Use all threads for maximum copy throughput
|
||||
memoryChannels[chanIdx].put(
|
||||
remoteRecvDispls[peer], // dst offset in peer's buffer (peer's recvDispls[rank])
|
||||
sendDispls[peer], // src offset in our buffer
|
||||
sendCounts[peer], // size
|
||||
tid, // thread id
|
||||
nThreads // total threads
|
||||
);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Only one thread signals per peer
|
||||
if (tid == 0) {
|
||||
memoryChannels[chanIdx].signal();
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Wait for incoming data from this peer
|
||||
if (tid == 0 && recvCounts[peer] > 0) {
|
||||
memoryChannels[chanIdx].wait();
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pipelined AllToAllV kernel for imbalanced workloads.
|
||||
*
|
||||
* For large messages, breaks transfers into chunks to improve memory access
|
||||
* patterns, but avoids excessive signaling overhead by signaling only once
|
||||
* per peer after all chunks are sent.
|
||||
*
|
||||
* Optimized for MoE workloads where message sizes can vary by 100x+ between ranks.
|
||||
*
|
||||
* Launch config: <<<1, 1024>>>
|
||||
*/
|
||||
__global__ void __launch_bounds__(1024)
|
||||
alltoallvPipelinedKernel(DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
int rank,
|
||||
int worldSize,
|
||||
const void* sendBuff,
|
||||
void* recvBuff,
|
||||
const size_t* sendCounts,
|
||||
const size_t* sendDispls,
|
||||
const size_t* recvCounts,
|
||||
const size_t* recvDispls,
|
||||
const size_t* remoteRecvDispls) {
|
||||
int tid = threadIdx.x;
|
||||
int nThreads = blockDim.x;
|
||||
int nPeers = worldSize - 1;
|
||||
|
||||
// Step 1: Copy local data
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], tid, nThreads);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Step 2: Process each peer - send all data in chunks, then signal once
|
||||
for (int peerIdx = 0; peerIdx < nPeers; peerIdx++) {
|
||||
int peer = peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
int chanIdx = peerIdx;
|
||||
|
||||
size_t sendSize = sendCounts[peer];
|
||||
size_t recvSize = recvCounts[peer];
|
||||
size_t dstOffset = remoteRecvDispls[peer]; // peer's recvDispls[rank]
|
||||
size_t srcOffset = sendDispls[peer];
|
||||
|
||||
// Send data in chunks for better memory access patterns
|
||||
// But only signal ONCE after all chunks are sent (avoids signaling overhead)
|
||||
if (sendSize > 0) {
|
||||
for (size_t offset = 0; offset < sendSize; offset += ALLTOALLV_CHUNK_SIZE) {
|
||||
size_t chunkSize = (sendSize - offset < ALLTOALLV_CHUNK_SIZE)
|
||||
? (sendSize - offset) : ALLTOALLV_CHUNK_SIZE;
|
||||
memoryChannels[chanIdx].put(
|
||||
dstOffset + offset,
|
||||
srcOffset + offset,
|
||||
chunkSize,
|
||||
tid,
|
||||
nThreads
|
||||
);
|
||||
__syncthreads();
|
||||
}
|
||||
}
|
||||
|
||||
// Signal ONCE after all data is sent
|
||||
if (tid == 0 && sendSize > 0) {
|
||||
memoryChannels[chanIdx].signal();
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Wait ONCE for all peer's data
|
||||
if (tid == 0 && recvSize > 0) {
|
||||
memoryChannels[chanIdx].wait();
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ring-based AllToAllV kernel with maximum thread parallelism.
|
||||
*
|
||||
* Uses step-by-step ring pattern with ALL threads for maximum bandwidth.
|
||||
* Each step processes one peer pair, with correct semaphore handling.
|
||||
*/
|
||||
__global__ void __launch_bounds__(1024)
|
||||
alltoallvRingKernel(DeviceHandle<MemoryChannel>* memoryChannels,
|
||||
int rank,
|
||||
int worldSize,
|
||||
const void* sendBuff,
|
||||
void* recvBuff,
|
||||
const size_t* sendCounts,
|
||||
const size_t* sendDispls,
|
||||
const size_t* recvCounts,
|
||||
const size_t* recvDispls,
|
||||
const size_t* remoteRecvDispls) {
|
||||
int tid = threadIdx.x;
|
||||
int nThreads = blockDim.x;
|
||||
|
||||
// Copy local data first using ALL threads
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], tid, nThreads);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Ring-based exchange - process each peer sequentially
|
||||
// Key fix: use the SAME channel for both signal and wait (peer-pair symmetry)
|
||||
for (int step = 1; step < worldSize; step++) {
|
||||
int sendPeer = (rank + step) % worldSize;
|
||||
int chanIdx = sendPeer < rank ? sendPeer : sendPeer - 1;
|
||||
|
||||
// Send data to sendPeer using ALL threads
|
||||
if (sendCounts[sendPeer] > 0) {
|
||||
memoryChannels[chanIdx].put(
|
||||
remoteRecvDispls[sendPeer], // dst offset in peer's buffer (peer's recvDispls[rank])
|
||||
sendDispls[sendPeer],
|
||||
sendCounts[sendPeer],
|
||||
tid,
|
||||
nThreads
|
||||
);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Signal completion on the SAME channel we'll wait on
|
||||
if (tid == 0) {
|
||||
memoryChannels[chanIdx].signal();
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Wait for peer's data on the SAME channel (correct semaphore pairing)
|
||||
if (tid == 0 && recvCounts[sendPeer] > 0) {
|
||||
memoryChannels[chanIdx].wait();
|
||||
}
|
||||
__syncthreads();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* PortChannel-only AllToAllV kernel for multi-node.
|
||||
*
|
||||
* Uses PortChannel (proxy-based) for ALL peers — both intra-node and inter-node.
|
||||
* This follows the proven pattern from allgather_test_cpp.cu which works reliably
|
||||
* on GB200 multi-node NVSwitch systems.
|
||||
*
|
||||
* For intra-node CudaIpc connections, the proxy performs cudaMemcpyD2D.
|
||||
* For inter-node IB connections, the proxy performs RDMA writes.
|
||||
*
|
||||
* Each block handles one peer. Thread 0 pushes a put descriptor to the FIFO
|
||||
* (single-threaded), which triggers the proxy to perform the data transfer.
|
||||
*
|
||||
* Launch config: <<<nPeers, 1024>>>
|
||||
*/
|
||||
__global__ void __launch_bounds__(1024)
|
||||
alltoallvPortChannelKernel(PortChannelDeviceHandle* portChannels,
|
||||
int rank,
|
||||
int worldSize,
|
||||
const void* sendBuff,
|
||||
void* recvBuff,
|
||||
const size_t* sendCounts,
|
||||
const size_t* sendDispls,
|
||||
const size_t* recvCounts,
|
||||
const size_t* recvDispls,
|
||||
const size_t* remoteRecvDispls) {
|
||||
const int nPeers = worldSize - 1;
|
||||
|
||||
// Handle trivial case (single rank)
|
||||
if (nPeers == 0) {
|
||||
const int gtid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
const int nThreads = blockDim.x * gridDim.x;
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], gtid, nThreads);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Phase 1: Local copy — all blocks cooperate using global thread IDs
|
||||
const int gtid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
const int nThreads = blockDim.x * gridDim.x;
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], gtid, nThreads);
|
||||
}
|
||||
|
||||
// Phase 2: Per-peer data transfer via PortChannel (proxy-based).
|
||||
// Each block handles one peer: blockIdx.x == peerIdx.
|
||||
const int peerIdx = blockIdx.x;
|
||||
if (peerIdx >= nPeers) return;
|
||||
|
||||
const int peer = peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
|
||||
// Thread 0 pushes a put+signal+flush descriptor to the proxy FIFO.
|
||||
// The proxy thread performs the actual data transfer (cudaMemcpy or RDMA).
|
||||
if (threadIdx.x == 0 && sendCounts[peer] > 0) {
|
||||
portChannels[peerIdx].putWithSignalAndFlush(
|
||||
remoteRecvDispls[peer], // dst offset in peer's output buffer
|
||||
sendDispls[peer], // src offset in our input buffer
|
||||
sendCounts[peer] // bytes to transfer
|
||||
);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Wait for incoming data from this peer
|
||||
if (threadIdx.x == 0 && recvCounts[peer] > 0) {
|
||||
portChannels[peerIdx].wait();
|
||||
}
|
||||
}
|
||||
|
||||
#undef ALLTOALLV_WARP_SIZE
|
||||
} // namespace collective
|
||||
} // namespace mscclpp
|
||||
@@ -51,12 +51,6 @@ std::vector<Connection> setupConnections(std::shared_ptr<Communicator> comm);
|
||||
/// @return Vector of connections (one per peer)
|
||||
std::vector<Connection> setupHybridConnections(std::shared_ptr<Communicator> comm, int localGpuIdx);
|
||||
|
||||
/// Check if a connection is intra-node (CudaIpc transport).
|
||||
/// @param conn The connection to check
|
||||
/// @return true if the connection uses CudaIpc transport
|
||||
inline bool isIntraNodeConnection(const Connection& conn) {
|
||||
return conn.transport() == Transport::CudaIpc;
|
||||
}
|
||||
|
||||
/// Get the IB transport for a given local GPU index.
|
||||
/// @param localGpuIdx Local GPU index (0-7)
|
||||
@@ -82,20 +76,6 @@ std::vector<PortChannel> setupPortChannels(
|
||||
/// This follows the proven pattern from allgather_test_cpp.cu:
|
||||
/// - CudaIpc connections: proxy does cudaMemcpyD2D
|
||||
/// - IB connections: proxy does RDMA write
|
||||
/// Creates one PortChannel per peer (dense indexing by peerIdx).
|
||||
/// @param proxyService The ProxyService managing transfers
|
||||
/// @param comm The communicator
|
||||
/// @param connections All connections (mixed CudaIpc + IB)
|
||||
/// @param remoteMemories Remote registered memories (one per peer)
|
||||
/// @param localMemory Local registered memory
|
||||
/// @return Vector of PortChannels (one per peer, in connection order)
|
||||
std::vector<PortChannel> setupAllPortChannels(
|
||||
std::shared_ptr<ProxyService> proxyService,
|
||||
Communicator& comm,
|
||||
const std::vector<Connection>& connections,
|
||||
const std::vector<RegisteredMemory>& remoteMemories,
|
||||
RegisteredMemory localMemory);
|
||||
|
||||
/// Setup PortChannel device handles (GPU-allocated array).
|
||||
std::shared_ptr<PortChannelDeviceHandle> setupPortChannelDeviceHandles(
|
||||
const std::vector<PortChannel>& portChannels);
|
||||
|
||||
@@ -65,44 +65,6 @@ void AllToAllVTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
const int nThreads = 1024;
|
||||
|
||||
if (kernelNum == 0) {
|
||||
// Use high-throughput kernel with all threads participating in each transfer
|
||||
mscclpp::collective::alltoallvKernel<<<1, nThreads, 0, stream>>>(
|
||||
d_memoryChannels,
|
||||
rank, worldSize,
|
||||
localSendBuffV, localRecvBuffV,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else if (kernelNum == 1) {
|
||||
// Use ring-based kernel for larger world sizes
|
||||
mscclpp::collective::alltoallvRingKernel<<<1, nThreads, 0, stream>>>(
|
||||
d_memoryChannels,
|
||||
rank, worldSize,
|
||||
localSendBuffV, localRecvBuffV,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else if (kernelNum == 2) {
|
||||
// Use pipelined kernel for imbalanced workloads (MoE)
|
||||
mscclpp::collective::alltoallvPipelinedKernel<<<1, nThreads, 0, stream>>>(
|
||||
d_memoryChannels,
|
||||
rank, worldSize,
|
||||
localSendBuffV, localRecvBuffV,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else if (kernelNum == 3) {
|
||||
// Use legacy multi-block kernel (sequential peers)
|
||||
const int nBlocks = mscclpp::collective::ALLTOALLV_DEFAULT_NBLOCKS;
|
||||
mscclpp::collective::alltoallvMultiBlockKernel<<<nBlocks, nThreads, 0, stream>>>(
|
||||
d_memoryChannels,
|
||||
d_deviceSyncer,
|
||||
rank, worldSize,
|
||||
localSendBuffV, localRecvBuffV,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else if (kernelNum == 4) {
|
||||
// Peer-parallel kernel: small messages (1 block/peer, no barrier)
|
||||
const int nPeers = worldSize - 1;
|
||||
const int nBlocks = (nPeers > 0) ? nPeers : 1;
|
||||
@@ -114,7 +76,7 @@ void AllToAllVTestColl::runColl(const TestArgs& args, cudaStream_t stream) {
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else if (kernelNum == 5) {
|
||||
} else if (kernelNum == 1) {
|
||||
// Peer-parallel kernel: large messages (multiple blocks/peer, barrier)
|
||||
const int nPeers = worldSize - 1;
|
||||
const int blocksPerPeer = mscclpp::collective::ALLTOALLV_DEFAULT_BLOCKS_PER_PEER;
|
||||
@@ -220,12 +182,8 @@ void AllToAllVTestColl::setupCollTest(size_t size) {
|
||||
|
||||
std::vector<KernelRestriction> AllToAllVTestColl::getKernelRestrictions() {
|
||||
return {
|
||||
{0, "alltoallvKernel", true, 1, 4 * worldSize_},
|
||||
{1, "alltoallvRingKernel", true, 1, 4 * worldSize_},
|
||||
{2, "alltoallvPipelinedKernel", true, 1, 4 * worldSize_},
|
||||
{3, "alltoallvMultiBlockKernel", true, 1, 4 * worldSize_},
|
||||
{4, "alltoallvPeerParallel(small)", true, 1, 4 * worldSize_},
|
||||
{5, "alltoallvPeerParallel(large)", true, 1, 4 * worldSize_}
|
||||
{0, "alltoallvPeerParallel(small)", true, 1, 4 * worldSize_},
|
||||
{1, "alltoallvPeerParallel(large)", true, 1, 4 * worldSize_}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user