Files
mscclpp/src/ext/ep/buffer.hpp
Qinghua Zhou 4569c4e751 Phase 11: hybrid NVLink + RDMA LL dispatch (+70% throughput)
Inside the IBGDA template branch, runtime-check whether the host has
opened a CUDA IPC peer pointer for the destination rank. If yes, do
the send via NVLink (warp copy / st_na_release on the peer-mapped
pointer); else fall through to the existing port_put / rdma_write_inl8.

Host: in sync(), when low_latency_mode && num_rdma_ranks > 1 && IBGDA
is up, allgather rdma_buffer_ptr IPC handles and cudaIpcOpenMemHandle
only for same-node peers. Sparse pointer table is mirrored to GPU and
threaded into the launchers as peer_bases.

Kernel: per-peer branch added at all four RDMA send sites (dispatch
send-data, dispatch send-count, combine send-data, combine send-flag).
Recv-side polling is transport-agnostic and unchanged.

Result on 16-rank/2-node LL bench:
  baseline (IBGDA only):   38.7 / 39.4 GB/s
  Phase 11 hybrid:         65.9 / 67.0 GB/s   (+70%)
Now matches nccl-ep default-mode numbers (63-71 / 62-72 GB/s).
Validation max diff = 0.

Gated by MSCCLPP_EP_HYBRID_LL env (default on). Single-node LL is
untouched (num_rdma_ranks>1 gate).
2026-05-09 23:04:15 +00:00

245 lines
12 KiB
C++

// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <torch/types.h>
#include <mscclpp/core.hpp>
#include <mscclpp/memory_channel.hpp>
#include <mscclpp/port_channel.hpp>
#include <tuple>
#include <vector>
#include "config.hpp"
#include "event.hpp"
#include "kernels/configs.cuh"
#include "kernels/exception.cuh"
#if defined(USE_IBVERBS) && defined(MSCCLPP_USE_MLX5DV) && !defined(MSCCLPP_USE_ROCM)
#define MSCCLPP_EP_HAVE_IBGDA 1
namespace mscclpp { namespace ep { struct IbgdaSetup; } }
#endif
#ifndef TORCH_EXTENSION_NAME
#define TORCH_EXTENSION_NAME mscclpp_ep_cpp
#endif
namespace mscclpp {
namespace ep {
struct Buffer {
EP_STATIC_ASSERT(NUM_MAX_NVL_PEERS == 8, "The number of maximum NVLink peers must be 8");
private:
// Low-latency mode buffer
int low_latency_buffer_idx = 0;
bool low_latency_mode = false;
// NVLink Buffer
int64_t num_nvl_bytes;
void* buffer_ptrs[NUM_MAX_NVL_PEERS] = {nullptr};
void** buffer_ptrs_gpu = nullptr;
// NVSHMEM Buffer
int64_t num_rdma_bytes;
void* rdma_buffer_ptr = nullptr;
// Device info and communication
int device_id;
int rank, rdma_rank, nvl_rank;
int num_ranks, num_rdma_ranks, num_nvl_ranks;
cudaIpcMemHandle_t ipc_handles[NUM_MAX_NVL_PEERS];
// Stream for communication
at::cuda::CUDAStream comm_stream;
// After IPC/NVSHMEM synchronization, this flag will be true
bool available = false;
// Task fifo
int head = 0;
int* task_fifo_ptrs[NUM_MAX_NVL_PEERS] = {nullptr};
int** task_fifo_ptrs_gpu = nullptr;
// Workspace
void* workspace = nullptr;
// Host-side MoE info
volatile int* moe_recv_counter = nullptr;
int* moe_recv_counter_mapped = nullptr;
// Host-side expert-level MoE info
volatile int* moe_recv_expert_counter = nullptr;
int* moe_recv_expert_counter_mapped = nullptr;
// Host-side RDMA-level MoE info
volatile int* moe_recv_rdma_counter = nullptr;
int* moe_recv_rdma_counter_mapped = nullptr;
std::shared_ptr<mscclpp::TcpBootstrap> bootstrap;
// One ProxyService spawns a single proxy thread that drains every PortChannel
// FIFO it owns. With LL combine pushing thousands of triggers per iter, the
// single thread becomes the wall-clock bottleneck on cross-node runs. We
// shard channels across `proxy_services` so each gets its own thread/FIFO,
// increasing host-side dispatch parallelism (no kernel changes required).
// Count is resolved at construction (env `MSCCLPP_EP_NUM_PROXIES` or
// arch-aware default).
int num_proxy_services = 1;
std::vector<std::shared_ptr<mscclpp::ProxyService>> proxy_services;
std::shared_ptr<mscclpp::Communicator> communicator;
std::vector<mscclpp::PortChannel> port_channels;
std::vector<mscclpp::MemoryChannel> memory_channels;
std::shared_ptr<mscclpp::PortChannelDeviceHandle> port_channel_handles_device_ptr;
std::shared_ptr<mscclpp::MemoryChannelDeviceHandle> memory_channel_handles_device_ptr;
// Intra-node LL only: peer-mapped RDMA buffer pointers (CUDA IPC).
// ``peer_rdma_bases[r]`` aliases rank ``r``'s ``rdma_buffer_ptr`` via
// ``cudaIpcOpenMemHandle`` (lazy peer access). Populated in ``sync()`` when
// ``low_latency_mode && num_rdma_ranks == 1``; null otherwise.
cudaIpcMemHandle_t rdma_ipc_handles[NUM_MAX_NVL_PEERS];
void* peer_rdma_bases[NUM_MAX_NVL_PEERS] = {nullptr};
void** peer_rdma_bases_gpu = nullptr;
// MemoryChannels over CUDA IPC used only for the LL barrier ring.
std::vector<mscclpp::MemoryChannel> ll_memory_channels;
std::shared_ptr<mscclpp::MemoryChannelDeviceHandle> ll_memory_channel_handles_device_ptr;
bool ll_ipc_ready = false;
// ------------------------------------------------------------------
// Phase 11 — Hybrid LL fast path.
//
// In multi-node LL with IBGDA, also open CUDA IPC peer pointers for
// same-node neighbors so the kernel can prefer NVLink for intranode
// peers and IBGDA for internode peers (matching nccl-ep's behavior).
//
// `hybrid_peer_bases` is sparse: indexed by global rank, populated
// only for same-node peers (rank' / NUM_MAX_NVL_PEERS == rdma_rank
// && rank' != rank). Cross-node and self entries are nullptr; the
// kernel checks for nullptr to decide IPC vs IBGDA per peer.
//
// Built lazily in `sync()` when:
// - low_latency_mode && num_rdma_ranks > 1
// - env MSCCLPP_EP_USE_IBGDA=1 && IBGDA setup succeeds
// - env MSCCLPP_EP_HYBRID_LL is not set to "0"
// ------------------------------------------------------------------
std::vector<cudaIpcMemHandle_t> hybrid_ipc_handles;
std::vector<void*> hybrid_peer_bases; // size num_ranks; same-node entries non-null
void** hybrid_peer_bases_gpu = nullptr; // GPU array of size num_ranks
bool hybrid_ll_ready = false;
// ------------------------------------------------------------------
// Native IBGDA path (Stage 4b). Built lazily in `sync()` when env
// `MSCCLPP_EP_USE_IBGDA=1` is set AND the run is cross-node.
// The kernels do NOT consume `ibgda_setup_` until 4b.2 lands; for now
// it is constructed-but-unused, so existing tests are unaffected.
// ------------------------------------------------------------------
bool use_ibgda_path_ = false;
#ifdef MSCCLPP_EP_HAVE_IBGDA
std::unique_ptr<mscclpp::ep::IbgdaSetup> ibgda_setup_;
#endif
private:
void move_fifo_slots(int num_slots = 1);
public:
Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode);
~Buffer() noexcept(false);
bool is_available() const;
bool is_internode_available() const;
int get_num_rdma_ranks() const;
int get_rdma_rank() const;
int get_root_rdma_rank(bool global) const;
int get_local_device_id() const;
pybind11::bytearray get_local_ipc_handle() const;
pybind11::bytearray get_local_nvshmem_unique_id() const;
torch::Tensor get_local_buffer_tensor(const pybind11::object& dtype, int64_t offset, bool use_rdma_buffer) const;
mscclpp::UniqueId create_unique_id() const;
void connect(mscclpp::UniqueId root_id);
void sync(const std::vector<int>& device_ids,
const std::vector<std::optional<pybind11::bytearray>>& all_gathered_handles,
const std::optional<pybind11::bytearray>& root_unique_id_opt);
std::tuple<torch::Tensor, std::optional<torch::Tensor>, torch::Tensor, torch::Tensor, std::optional<EventHandle>>
get_dispatch_layout(const torch::Tensor& topk_idx, int num_experts, std::optional<EventHandle>& previous_event,
bool async, bool allocate_on_comm_stream);
std::tuple<torch::Tensor, std::optional<torch::Tensor>, std::optional<torch::Tensor>, std::optional<torch::Tensor>,
std::vector<int>, torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor,
std::optional<EventHandle>>
intranode_dispatch(const torch::Tensor& x, const std::optional<torch::Tensor>& x_scales,
const std::optional<torch::Tensor>& topk_idx, const std::optional<torch::Tensor>& topk_weights,
const std::optional<torch::Tensor>& num_tokens_per_rank, const torch::Tensor& is_token_in_rank,
const std::optional<torch::Tensor>& num_tokens_per_expert, int cached_num_recv_tokens,
const std::optional<torch::Tensor>& cached_rank_prefix_matrix,
const std::optional<torch::Tensor>& cached_channel_prefix_matrix, int expert_alignment,
const Config& config, std::optional<EventHandle>& previous_event, bool async,
bool allocate_on_comm_stream);
std::tuple<torch::Tensor, std::optional<torch::Tensor>, std::optional<EventHandle>> intranode_combine(
const torch::Tensor& x, const std::optional<torch::Tensor>& topk_weights, const torch::Tensor& src_idx,
const torch::Tensor& rank_prefix_matrix, const torch::Tensor& channel_prefix_matrix,
const torch::Tensor& send_head, const Config& config, std::optional<EventHandle>& previous_event, bool async,
bool allocate_on_comm_stream);
std::tuple<torch::Tensor, std::optional<torch::Tensor>, std::optional<torch::Tensor>, std::optional<torch::Tensor>,
std::vector<int>, torch::Tensor, torch::Tensor, std::optional<torch::Tensor>, torch::Tensor,
std::optional<torch::Tensor>, torch::Tensor, std::optional<torch::Tensor>, std::optional<torch::Tensor>,
std::optional<torch::Tensor>, std::optional<EventHandle>>
internode_dispatch(const torch::Tensor& x, const std::optional<torch::Tensor>& x_scales,
const std::optional<torch::Tensor>& topk_idx, const std::optional<torch::Tensor>& topk_weights,
const std::optional<torch::Tensor>& num_tokens_per_rank,
const std::optional<torch::Tensor>& num_tokens_per_rdma_rank,
const torch::Tensor& is_token_in_rank, const std::optional<torch::Tensor>& num_tokens_per_expert,
int cached_num_recv_tokens, int cached_num_rdma_recv_tokens,
const std::optional<torch::Tensor>& cached_rdma_channel_prefix_matrix,
const std::optional<torch::Tensor>& cached_recv_rdma_rank_prefix_sum,
const std::optional<torch::Tensor>& cached_gbl_channel_prefix_matrix,
const std::optional<torch::Tensor>& cached_recv_gbl_rank_prefix_sum, int expert_alignment,
const Config& config, std::optional<EventHandle>& previous_event, bool async,
bool allocate_on_comm_stream);
std::tuple<torch::Tensor, std::optional<torch::Tensor>, std::optional<EventHandle>> internode_combine(
const torch::Tensor& x, const std::optional<torch::Tensor>& topk_weights, const torch::Tensor& src_meta,
const torch::Tensor& is_combined_token_in_rank, const torch::Tensor& rdma_channel_prefix_matrix,
const torch::Tensor& rdma_rank_prefix_sum, const torch::Tensor& gbl_channel_prefix_matrix,
const torch::Tensor& combined_rdma_head, const torch::Tensor& combined_nvl_head, const Config& config,
std::optional<EventHandle>& previous_event, bool async, bool allocate_on_comm_stream);
void clean_low_latency_buffer(int num_max_dispatch_tokens_per_rank, int hidden, int num_experts);
std::tuple<torch::Tensor, std::optional<torch::Tensor>, torch::Tensor, torch::Tensor, torch::Tensor,
std::optional<EventHandle>, std::optional<std::function<void()>>>
low_latency_dispatch(const torch::Tensor& x, const torch::Tensor& topk_idx, int num_max_dispatch_tokens_per_rank,
int num_experts, bool use_fp8, bool async, bool return_recv_hook,
const std::optional<torch::Tensor>& out_packed_recv_x = std::nullopt,
const std::optional<torch::Tensor>& out_packed_recv_x_scales = std::nullopt,
const std::optional<torch::Tensor>& out_packed_recv_src_info = std::nullopt,
const std::optional<torch::Tensor>& out_packed_recv_layout_range = std::nullopt,
const std::optional<torch::Tensor>& out_packed_recv_count = std::nullopt);
std::tuple<torch::Tensor, std::optional<EventHandle>, std::optional<std::function<void()>>> low_latency_combine(
const torch::Tensor& x, const torch::Tensor& topk_idx, const torch::Tensor& topk_weights,
const torch::Tensor& src_info, const torch::Tensor& layout_range, int num_max_dispatch_tokens_per_rank,
int num_experts, bool zero_copy, bool async, bool return_recv_hook,
const std::optional<torch::Tensor>& out = std::nullopt);
torch::Tensor get_next_low_latency_combine_buffer(int num_max_dispatch_tokens_per_rank, int hidden, int num_experts);
};
} // namespace ep
} // namespace mscclpp