mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-12 01:10:22 +00:00
Add tri-modal multi-node support for alltoallv: SingleNode, NVSwitch (GpuBuffer staging), and IB (PortChannel)
This commit is contained in:
@@ -24,32 +24,38 @@ namespace collective {
|
||||
#define ALLTOALLV_WARP_SIZE 32
|
||||
#endif
|
||||
|
||||
using MultiNodeMode = AlltoallvFullmesh::MultiNodeMode;
|
||||
|
||||
// Context to hold all necessary state for alltoallv execution
|
||||
struct AllToAllVContext {
|
||||
int rank;
|
||||
int worldSize;
|
||||
int nRanksPerNode;
|
||||
|
||||
// Intra-node (CudaIpc) channels — MemoryChannel for direct NVLink copy
|
||||
// MemoryChannel (CudaIpc) — used for intra-node (always) and cross-node (NVSwitch mode)
|
||||
std::vector<RegisteredMemory> registeredMemories;
|
||||
std::vector<MemoryChannel> memoryChannels;
|
||||
std::vector<std::shared_ptr<MemoryDevice2DeviceSemaphore>> memorySemaphores;
|
||||
std::shared_ptr<DeviceHandle<MemoryChannel>> memoryChannelDeviceHandles;
|
||||
|
||||
// Inter-node (IB) channels — PortChannel via ProxyService
|
||||
// PortChannel (IB) — used for cross-node peers in IB mode only
|
||||
std::shared_ptr<ProxyService> proxyService;
|
||||
std::vector<PortChannel> portChannels;
|
||||
std::shared_ptr<PortChannelDeviceHandle> portChannelDeviceHandles;
|
||||
|
||||
// Peer locality map: peerIsLocal[peerIdx] = 1 if intra-node, 0 if inter-node
|
||||
// peerIdx is the index into the channel arrays (0..nPeers-1), NOT the rank
|
||||
// Peer locality map (IB mode only)
|
||||
std::shared_ptr<int> d_peerIsLocal; // GPU array [nPeers]
|
||||
// For inter-node peers, maps peerIdx → portChannel index (dense indexing)
|
||||
std::shared_ptr<int> d_peerToPortChannelIdx; // GPU array [nPeers]
|
||||
|
||||
bool hasRemotePeers; // true if any inter-node connections exist
|
||||
// Staging buffers (NVSwitch mode only): allocated via GpuBuffer (cuMemCreate → Fabric handles)
|
||||
bool useStaging;
|
||||
std::shared_ptr<GpuBuffer<char>> inputStaging;
|
||||
std::shared_ptr<GpuBuffer<char>> outputStaging;
|
||||
|
||||
std::shared_ptr<DeviceSyncer> deviceSyncer; // GPU-allocated, for multi-block grid sync
|
||||
// Which kernel dispatch path to use
|
||||
AlltoallvFullmesh::MultiNodeMode mode;
|
||||
|
||||
std::shared_ptr<DeviceSyncer> deviceSyncer;
|
||||
};
|
||||
|
||||
AlltoallvFullmesh::~AlltoallvFullmesh() = default;
|
||||
@@ -89,28 +95,27 @@ void AlltoallvFullmesh::initialize(std::shared_ptr<Communicator> comm) {
|
||||
int rank = comm->bootstrap()->getRank();
|
||||
int nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
int localGpuIdx = rank % nRanksPerNode;
|
||||
|
||||
// Use hybrid connections: CudaIpc for intra-node, IB for inter-node.
|
||||
// On systems where CudaIpc works across nodes (e.g. GB200 NVSwitch),
|
||||
// set MSCCLPP_FORCE_CUDAIPC=1 to skip IB and use CudaIpc for all peers.
|
||||
const char* forceCudaIpc = std::getenv("MSCCLPP_FORCE_CUDAIPC");
|
||||
bool useIB = (getIBDeviceCount() > 0) && !(forceCudaIpc && std::string(forceCudaIpc) == "1");
|
||||
bool isMultiNode = (worldSize_ > nRanksPerNode);
|
||||
|
||||
if (useIB && isMultiNode) {
|
||||
this->conns_ = setupHybridConnections(comm, localGpuIdx);
|
||||
// Check if any connections are actually inter-node
|
||||
hasRemotePeers_ = false;
|
||||
for (const auto& conn : this->conns_) {
|
||||
if (!isIntraNodeConnection(conn)) {
|
||||
hasRemotePeers_ = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Single-node or no IB: use CudaIpc for all
|
||||
if (!isMultiNode) {
|
||||
// ── Single-node: CudaIpc for all peers ─────────────────────────────
|
||||
multiNodeMode_ = MultiNodeMode::SingleNode;
|
||||
this->conns_ = setupConnections(comm);
|
||||
hasRemotePeers_ = false;
|
||||
} else if (isNvlsSupported()) {
|
||||
// ── GB200 NVSwitch: CudaIpc for ALL peers + staging GpuBuffers ─────
|
||||
// GpuBuffer uses cuMemCreate → Fabric handles → cross-node CudaIpc works.
|
||||
multiNodeMode_ = MultiNodeMode::NVSwitch;
|
||||
this->conns_ = setupConnections(comm);
|
||||
} else {
|
||||
// ── IB: CudaIpc intra-node + IB inter-node ────────────────────────
|
||||
// For non-NVSwitch systems (H100 etc.) where CudaIpc doesn't work cross-node.
|
||||
if (getIBDeviceCount() <= 0) {
|
||||
throw Error("Multi-node alltoallv requires IB transport but no IB devices found. "
|
||||
"Ensure IB drivers are loaded and devices are available.",
|
||||
ErrorCode::InvalidUsage);
|
||||
}
|
||||
multiNodeMode_ = MultiNodeMode::IB;
|
||||
this->conns_ = setupHybridConnections(comm, localGpuIdx);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,41 +154,47 @@ CommResult AlltoallvFullmesh::alltoallvKernelFunc(
|
||||
int nPeers = worldSize - 1;
|
||||
if (nPeers < 1) nPeers = 1;
|
||||
|
||||
if (algoCtx->hasRemotePeers) {
|
||||
// Multi-node: use hybrid kernel with MemoryChannel (intra) + PortChannel (inter)
|
||||
// PortChannel put() is single-threaded (FIFO push), so we use 1 block per peer.
|
||||
// For large intra-node messages, multiple blocks per local peer would help,
|
||||
// but keeping it simple for now: 1 block per peer for both local and remote.
|
||||
// Determine send/recv buffer pointers.
|
||||
// NVSwitch mode: copy PyTorch data to/from GpuBuffer staging buffers.
|
||||
const void* sendBuff = input;
|
||||
void* recvBuff = output;
|
||||
|
||||
if (algoCtx->useStaging) {
|
||||
sendBuff = algoCtx->inputStaging->data();
|
||||
recvBuff = algoCtx->outputStaging->data();
|
||||
MSCCLPP_CUDATHROW(cudaMemcpyAsync(
|
||||
const_cast<void*>(sendBuff), input,
|
||||
inputSize, cudaMemcpyDeviceToDevice, stream));
|
||||
}
|
||||
|
||||
if (algoCtx->mode == MultiNodeMode::IB) {
|
||||
// ── IB mode: PortChannel kernel for ALL peers ──────────────────────
|
||||
// PortChannel handles both CudaIpc (intra) and IB (inter) connections
|
||||
// via the ProxyService proxy thread.
|
||||
int numBlocks = nPeers;
|
||||
alltoallvHybridKernel<<<numBlocks, threadsPerBlock, 0, stream>>>(
|
||||
algoCtx->memoryChannelDeviceHandles.get(),
|
||||
alltoallvPortChannelKernel<<<numBlocks, threadsPerBlock, 0, stream>>>(
|
||||
algoCtx->portChannelDeviceHandles.get(),
|
||||
algoCtx->d_peerIsLocal.get(),
|
||||
algoCtx->d_peerToPortChannelIdx.get(),
|
||||
algoCtx->deviceSyncer.get(),
|
||||
rank, worldSize,
|
||||
input, output,
|
||||
sendBuff, recvBuff,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else {
|
||||
// Single-node: use the optimized peer-parallel kernel (MemoryChannel only)
|
||||
// ── SingleNode / NVSwitch mode: MemoryChannel kernel ───────────────
|
||||
constexpr size_t SIZE_THRESHOLD = 1 << 20; // 1MB
|
||||
size_t avgMsgSize = inputSize / worldSize;
|
||||
|
||||
if (avgMsgSize < SIZE_THRESHOLD) {
|
||||
// Small messages: 1 block per peer, parallel signal/wait, no barrier
|
||||
int numBlocks = nPeers;
|
||||
alltoallvPeerParallelKernel<<<numBlocks, threadsPerBlock, 0, stream>>>(
|
||||
algoCtx->memoryChannelDeviceHandles.get(),
|
||||
algoCtx->deviceSyncer.get(),
|
||||
rank, worldSize,
|
||||
input, output,
|
||||
sendBuff, recvBuff,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
} else {
|
||||
// Large messages: multiple blocks per peer for maximum put bandwidth.
|
||||
int blocksPerPeer = (nBlocks > 0 && nBlocks <= 128)
|
||||
? ((nBlocks + nPeers - 1) / nPeers)
|
||||
: ALLTOALLV_DEFAULT_BLOCKS_PER_PEER;
|
||||
@@ -194,13 +205,19 @@ CommResult AlltoallvFullmesh::alltoallvKernelFunc(
|
||||
algoCtx->memoryChannelDeviceHandles.get(),
|
||||
algoCtx->deviceSyncer.get(),
|
||||
rank, worldSize,
|
||||
input, output,
|
||||
sendBuff, recvBuff,
|
||||
d_sendCounts, d_sendDispls,
|
||||
d_recvCounts, d_recvDispls,
|
||||
d_remoteRecvDispls);
|
||||
}
|
||||
}
|
||||
|
||||
if (algoCtx->useStaging) {
|
||||
MSCCLPP_CUDATHROW(cudaMemcpyAsync(
|
||||
output, recvBuff,
|
||||
outputSize, cudaMemcpyDeviceToDevice, stream));
|
||||
}
|
||||
|
||||
if (cudaGetLastError() == cudaSuccess) {
|
||||
return CommResult::CommSuccess;
|
||||
}
|
||||
@@ -215,73 +232,75 @@ std::shared_ptr<void> AlltoallvFullmesh::initAlltoallvContext(
|
||||
ctx->rank = comm->bootstrap()->getRank();
|
||||
ctx->worldSize = comm->bootstrap()->getNranks();
|
||||
ctx->nRanksPerNode = comm->bootstrap()->getNranksPerNode();
|
||||
ctx->hasRemotePeers = this->hasRemotePeers_;
|
||||
ctx->mode = this->multiNodeMode_;
|
||||
ctx->useStaging = (ctx->mode == MultiNodeMode::NVSwitch);
|
||||
|
||||
int rank = ctx->rank;
|
||||
int nRanksPerNode = ctx->nRanksPerNode;
|
||||
int localGpuIdx = rank % nRanksPerNode;
|
||||
int localGpuIdx = rank % ctx->nRanksPerNode;
|
||||
|
||||
// Determine transport flags for memory registration.
|
||||
// If we have remote peers, register with both CudaIpc and IB transports.
|
||||
TransportFlags allTransports = Transport::CudaIpc;
|
||||
if (ctx->hasRemotePeers) {
|
||||
allTransports |= getIBTransportForGpu(localGpuIdx);
|
||||
}
|
||||
if (ctx->mode == MultiNodeMode::NVSwitch) {
|
||||
// ── NVSwitch (GB200): staging GpuBuffers + CudaIpc MemoryChannel for all peers
|
||||
ctx->inputStaging = std::make_shared<GpuBuffer<char>>(inputSize);
|
||||
ctx->outputStaging = std::make_shared<GpuBuffer<char>>(outputSize);
|
||||
|
||||
// Register memories for input and output buffers
|
||||
RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports);
|
||||
RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports);
|
||||
TransportFlags allTransports = Transport::CudaIpc;
|
||||
RegisteredMemory inputBufRegMem = comm->registerMemory(
|
||||
ctx->inputStaging->data(), ctx->inputStaging->bytes(), allTransports);
|
||||
RegisteredMemory outputBufRegMem = comm->registerMemory(
|
||||
ctx->outputStaging->data(), ctx->outputStaging->bytes(), allTransports);
|
||||
|
||||
// Exchange output buffer registration with all peers (we write to peer's output buffer)
|
||||
std::vector<RegisteredMemory> remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem);
|
||||
std::vector<RegisteredMemory> remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem);
|
||||
|
||||
// Build peer locality map and channel index mappings
|
||||
int nPeers = ctx->worldSize - 1;
|
||||
std::vector<int> peerIsLocal(nPeers, 1);
|
||||
std::vector<int> peerToPortChannelIdx(nPeers, -1);
|
||||
int portChannelCount = 0;
|
||||
constexpr int nChannelsPerConnection = 1;
|
||||
ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection);
|
||||
ctx->memoryChannels = setupMemoryChannels(
|
||||
this->conns_, ctx->memorySemaphores, remoteOutputMemories, inputBufRegMem, nChannelsPerConnection);
|
||||
ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels);
|
||||
|
||||
for (size_t cid = 0; cid < this->conns_.size(); ++cid) {
|
||||
if (!isIntraNodeConnection(this->conns_[cid])) {
|
||||
peerIsLocal[cid] = 0;
|
||||
peerToPortChannelIdx[cid] = portChannelCount++;
|
||||
}
|
||||
}
|
||||
ctx->registeredMemories = std::move(remoteOutputMemories);
|
||||
ctx->registeredMemories.push_back(inputBufRegMem);
|
||||
ctx->registeredMemories.push_back(outputBufRegMem);
|
||||
|
||||
// Setup intra-node MemoryChannels (CudaIpc connections only)
|
||||
constexpr int nChannelsPerConnection = 1;
|
||||
ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection);
|
||||
ctx->memoryChannels = setupMemoryChannels(
|
||||
this->conns_,
|
||||
ctx->memorySemaphores,
|
||||
remoteOutputMemories, // remote output buffers (where we write)
|
||||
inputBufRegMem, // local input buffer (where we read from)
|
||||
nChannelsPerConnection);
|
||||
ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels);
|
||||
} else if (ctx->mode == MultiNodeMode::IB) {
|
||||
// ── IB: PortChannel for ALL peers (CudaIpc intra + IB inter connections)
|
||||
TransportFlags allTransports = Transport::CudaIpc | getIBTransportForGpu(localGpuIdx);
|
||||
RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports);
|
||||
RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports);
|
||||
|
||||
std::vector<RegisteredMemory> remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem);
|
||||
|
||||
// Setup inter-node PortChannels (IB connections only)
|
||||
if (ctx->hasRemotePeers) {
|
||||
ctx->proxyService = std::make_shared<ProxyService>();
|
||||
ctx->portChannels = setupPortChannels(
|
||||
ctx->portChannels = setupAllPortChannels(
|
||||
ctx->proxyService, *comm, this->conns_, remoteOutputMemories, inputBufRegMem);
|
||||
ctx->portChannelDeviceHandles = setupPortChannelDeviceHandles(ctx->portChannels);
|
||||
ctx->proxyService->startProxy(true);
|
||||
|
||||
ctx->registeredMemories = std::move(remoteOutputMemories);
|
||||
ctx->registeredMemories.push_back(inputBufRegMem);
|
||||
ctx->registeredMemories.push_back(outputBufRegMem);
|
||||
|
||||
} else {
|
||||
// ── SingleNode: CudaIpc MemoryChannel (direct PyTorch buffers)
|
||||
TransportFlags allTransports = Transport::CudaIpc;
|
||||
RegisteredMemory inputBufRegMem = comm->registerMemory((void*)input, inputSize, allTransports);
|
||||
RegisteredMemory outputBufRegMem = comm->registerMemory(output, outputSize, allTransports);
|
||||
|
||||
std::vector<RegisteredMemory> remoteOutputMemories = setupRemoteMemories(comm, rank, outputBufRegMem);
|
||||
|
||||
constexpr int nChannelsPerConnection = 1;
|
||||
ctx->memorySemaphores = setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection);
|
||||
ctx->memoryChannels = setupMemoryChannels(
|
||||
this->conns_, ctx->memorySemaphores, remoteOutputMemories, inputBufRegMem, nChannelsPerConnection);
|
||||
ctx->memoryChannelDeviceHandles = setupMemoryChannelDeviceHandles(ctx->memoryChannels);
|
||||
|
||||
ctx->registeredMemories = std::move(remoteOutputMemories);
|
||||
ctx->registeredMemories.push_back(inputBufRegMem);
|
||||
ctx->registeredMemories.push_back(outputBufRegMem);
|
||||
}
|
||||
|
||||
// Copy peer locality info to GPU
|
||||
ctx->d_peerIsLocal = mscclpp::detail::gpuCallocShared<int>(nPeers);
|
||||
mscclpp::gpuMemcpy<int>(ctx->d_peerIsLocal.get(), peerIsLocal.data(), nPeers, cudaMemcpyHostToDevice);
|
||||
ctx->d_peerToPortChannelIdx = mscclpp::detail::gpuCallocShared<int>(nPeers);
|
||||
mscclpp::gpuMemcpy<int>(ctx->d_peerToPortChannelIdx.get(), peerToPortChannelIdx.data(), nPeers, cudaMemcpyHostToDevice);
|
||||
|
||||
// Allocate GPU DeviceSyncer for multi-block grid-wide barrier (zero-initialized)
|
||||
// Allocate GPU DeviceSyncer for multi-block grid-wide barrier
|
||||
ctx->deviceSyncer = mscclpp::detail::gpuCallocShared<DeviceSyncer>();
|
||||
|
||||
// Keep registered memory references to prevent deallocation
|
||||
ctx->registeredMemories = std::move(remoteOutputMemories);
|
||||
ctx->registeredMemories.push_back(inputBufRegMem);
|
||||
ctx->registeredMemories.push_back(outputBufRegMem);
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
|
||||
@@ -124,6 +124,26 @@ std::vector<mscclpp::PortChannel> setupPortChannels(
|
||||
return channels;
|
||||
}
|
||||
|
||||
std::vector<mscclpp::PortChannel> setupAllPortChannels(
|
||||
std::shared_ptr<mscclpp::ProxyService> proxyService,
|
||||
mscclpp::Communicator& comm,
|
||||
const std::vector<mscclpp::Connection>& connections,
|
||||
const std::vector<mscclpp::RegisteredMemory>& remoteMemories,
|
||||
mscclpp::RegisteredMemory localMemory) {
|
||||
std::vector<mscclpp::PortChannel> channels;
|
||||
mscclpp::MemoryId srcMemId = proxyService->addMemory(localMemory);
|
||||
for (size_t cid = 0; cid < connections.size(); ++cid) {
|
||||
// Create PortChannel for EVERY connection (CudaIpc and IB alike).
|
||||
// The ProxyService proxy thread handles both connection types:
|
||||
// - CudaIpc: cudaMemcpyD2D via IPC-mapped pointer
|
||||
// - IB: RDMA write via ibv_post_send
|
||||
mscclpp::SemaphoreId semId = proxyService->buildAndAddSemaphore(comm, connections[cid]);
|
||||
mscclpp::MemoryId dstMemId = proxyService->addMemory(remoteMemories[cid]);
|
||||
channels.emplace_back(proxyService->portChannel(semId, dstMemId, srcMemId));
|
||||
}
|
||||
return channels;
|
||||
}
|
||||
|
||||
std::shared_ptr<mscclpp::PortChannelDeviceHandle> setupPortChannelDeviceHandles(
|
||||
const std::vector<mscclpp::PortChannel>& portChannels) {
|
||||
if (portChannels.empty()) return nullptr;
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
#include <mscclpp/algorithm.hpp>
|
||||
#include <mscclpp/core.hpp>
|
||||
#include <mscclpp/gpu_utils.hpp>
|
||||
#include <mscclpp/memory_channel.hpp>
|
||||
#include <mscclpp/port_channel.hpp>
|
||||
#include <mscclpp/semaphore.hpp>
|
||||
@@ -34,6 +35,9 @@ class AlltoallvFullmesh : public AlgorithmBuilder {
|
||||
|
||||
std::shared_ptr<Algorithm> build() override;
|
||||
|
||||
// Multi-node transport mode, decided at initialize() time
|
||||
enum class MultiNodeMode { SingleNode, NVSwitch, IB };
|
||||
|
||||
private:
|
||||
void initialize(std::shared_ptr<Communicator> comm);
|
||||
|
||||
@@ -51,7 +55,7 @@ class AlltoallvFullmesh : public AlgorithmBuilder {
|
||||
|
||||
std::vector<Connection> conns_;
|
||||
int worldSize_;
|
||||
bool hasRemotePeers_; // true if any inter-node connections
|
||||
MultiNodeMode multiNodeMode_ = MultiNodeMode::SingleNode;
|
||||
};
|
||||
|
||||
} // namespace collective
|
||||
|
||||
@@ -512,6 +512,79 @@ __global__ void __launch_bounds__(1024)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* PortChannel-only AllToAllV kernel for multi-node.
|
||||
*
|
||||
* Uses PortChannel (proxy-based) for ALL peers — both intra-node and inter-node.
|
||||
* This follows the proven pattern from allgather_test_cpp.cu which works reliably
|
||||
* on GB200 multi-node NVSwitch systems.
|
||||
*
|
||||
* For intra-node CudaIpc connections, the proxy performs cudaMemcpyD2D.
|
||||
* For inter-node IB connections, the proxy performs RDMA writes.
|
||||
*
|
||||
* Each block handles one peer. Thread 0 pushes a put descriptor to the FIFO
|
||||
* (single-threaded), which triggers the proxy to perform the data transfer.
|
||||
*
|
||||
* Launch config: <<<nPeers, 1024>>>
|
||||
*/
|
||||
__global__ void __launch_bounds__(1024)
|
||||
alltoallvPortChannelKernel(PortChannelDeviceHandle* portChannels,
|
||||
int rank,
|
||||
int worldSize,
|
||||
const void* sendBuff,
|
||||
void* recvBuff,
|
||||
const size_t* sendCounts,
|
||||
const size_t* sendDispls,
|
||||
const size_t* recvCounts,
|
||||
const size_t* recvDispls,
|
||||
const size_t* remoteRecvDispls) {
|
||||
const int nPeers = worldSize - 1;
|
||||
|
||||
// Handle trivial case (single rank)
|
||||
if (nPeers == 0) {
|
||||
const int gtid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
const int nThreads = blockDim.x * gridDim.x;
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], gtid, nThreads);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Phase 1: Local copy — all blocks cooperate using global thread IDs
|
||||
const int gtid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
const int nThreads = blockDim.x * gridDim.x;
|
||||
if (sendCounts[rank] > 0) {
|
||||
mscclpp::copy((char*)recvBuff + recvDispls[rank],
|
||||
(void*)((const char*)sendBuff + sendDispls[rank]),
|
||||
sendCounts[rank], gtid, nThreads);
|
||||
}
|
||||
|
||||
// Phase 2: Per-peer data transfer via PortChannel (proxy-based).
|
||||
// Each block handles one peer: blockIdx.x == peerIdx.
|
||||
const int peerIdx = blockIdx.x;
|
||||
if (peerIdx >= nPeers) return;
|
||||
|
||||
const int peer = peerIdx < rank ? peerIdx : peerIdx + 1;
|
||||
|
||||
// Thread 0 pushes a put+signal+flush descriptor to the proxy FIFO.
|
||||
// The proxy thread performs the actual data transfer (cudaMemcpy or RDMA).
|
||||
if (threadIdx.x == 0 && sendCounts[peer] > 0) {
|
||||
portChannels[peerIdx].putWithSignalAndFlush(
|
||||
remoteRecvDispls[peer], // dst offset in peer's output buffer
|
||||
sendDispls[peer], // src offset in our input buffer
|
||||
sendCounts[peer] // bytes to transfer
|
||||
);
|
||||
}
|
||||
__syncthreads();
|
||||
|
||||
// Wait for incoming data from this peer
|
||||
if (threadIdx.x == 0 && recvCounts[peer] > 0) {
|
||||
portChannels[peerIdx].wait();
|
||||
}
|
||||
}
|
||||
|
||||
#undef ALLTOALLV_WARP_SIZE
|
||||
} // namespace collective
|
||||
} // namespace mscclpp
|
||||
@@ -78,6 +78,24 @@ std::vector<PortChannel> setupPortChannels(
|
||||
const std::vector<RegisteredMemory>& remoteMemories,
|
||||
RegisteredMemory localMemory);
|
||||
|
||||
/// Setup PortChannels for ALL connections (both CudaIpc and IB) via ProxyService.
|
||||
/// This follows the proven pattern from allgather_test_cpp.cu:
|
||||
/// - CudaIpc connections: proxy does cudaMemcpyD2D
|
||||
/// - IB connections: proxy does RDMA write
|
||||
/// Creates one PortChannel per peer (dense indexing by peerIdx).
|
||||
/// @param proxyService The ProxyService managing transfers
|
||||
/// @param comm The communicator
|
||||
/// @param connections All connections (mixed CudaIpc + IB)
|
||||
/// @param remoteMemories Remote registered memories (one per peer)
|
||||
/// @param localMemory Local registered memory
|
||||
/// @return Vector of PortChannels (one per peer, in connection order)
|
||||
std::vector<PortChannel> setupAllPortChannels(
|
||||
std::shared_ptr<ProxyService> proxyService,
|
||||
Communicator& comm,
|
||||
const std::vector<Connection>& connections,
|
||||
const std::vector<RegisteredMemory>& remoteMemories,
|
||||
RegisteredMemory localMemory);
|
||||
|
||||
/// Setup PortChannel device handles (GPU-allocated array).
|
||||
std::shared_ptr<PortChannelDeviceHandle> setupPortChannelDeviceHandles(
|
||||
const std::vector<PortChannel>& portChannels);
|
||||
|
||||
Reference in New Issue
Block a user