From f4fbd093dbbaa0fe514a45a21e5c97efcd7f8e92 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 16 Jun 2026 15:58:10 +0000 Subject: [PATCH] lint --- src/ext/ep/buffer.cc | 28 ++++---- src/ext/ep/kernels/internode.cu | 63 +++++++++-------- src/ext/ep/kernels/internode_ncclep.cuh | 68 +++++++++---------- src/ext/ep/kernels/intranode_kernel.cu | 7 +- .../python/ext/ep/test_internode_multirank.py | 2 +- 5 files changed, 86 insertions(+), 82 deletions(-) diff --git a/src/ext/ep/buffer.cc b/src/ext/ep/buffer.cc index e76bed9c..5e298073 100644 --- a/src/ext/ep/buffer.cc +++ b/src/ext/ep/buffer.cc @@ -836,8 +836,8 @@ void Buffer::sync(const std::vector& device_ids, recv_pool_global_ptrs_[r] = recv_pool_global_remote_mems_[r].data(); } CUDA_CHECK(cudaMalloc(&recv_pool_global_ptrs_gpu, sizeof(void*) * num_ranks)); - CUDA_CHECK(cudaMemcpy(recv_pool_global_ptrs_gpu, recv_pool_global_ptrs_.data(), - sizeof(void*) * num_ranks, cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(recv_pool_global_ptrs_gpu, recv_pool_global_ptrs_.data(), sizeof(void*) * num_ranks, + cudaMemcpyHostToDevice)); if (rank == 0) { printf("[mscclpp_ep] inc5 domain-wide recv-pool bases (rank 0):"); for (int r = 0; r < num_ranks; ++r) printf(" [%d]=%p", r, recv_pool_global_ptrs_[r]); @@ -1503,7 +1503,7 @@ Buffer::internode_dispatch( } // Allocate new tensors - void** ep_recv_pool_ptrs = nullptr; // non-null selects the increment-4 VMM direct-write path + void** ep_recv_pool_ptrs = nullptr; // non-null selects the increment-4 VMM direct-write path void** ep_recv_pool_global_ptrs = nullptr; // inc5: domain-wide pool bases (sender direct-write) #ifdef EP_DISPATCH_NCCLEP // Increment 4 (VMM pool): when num_recv_tokens fits the fixed pool, back recv_x @@ -1717,17 +1717,17 @@ std::tuple, std::optional(static_cast(nvls_ht_mc_ptr) + nvls_ht_off_tail) : nullptr; void* combine_nvls_tail_dev = nvls_ht_enabled ? static_cast(static_cast(nvls_ht_dev_ptr) + nvls_ht_off_tail) : nullptr; - internode::combine( - at::cuda::ScalarTypeToCudaDataType(x.scalar_type()), combined_x.data_ptr(), combined_topk_weights_ptr, - is_combined_token_in_rank.data_ptr(), x.data_ptr(), topk_weights_ptr, combined_rdma_head.data_ptr(), - combined_nvl_head.data_ptr(), src_meta.data_ptr(), rdma_channel_prefix_matrix.data_ptr(), - rdma_rank_prefix_sum.data_ptr(), gbl_channel_prefix_matrix.data_ptr(), num_tokens, num_combined_tokens, - hidden, num_topk, rdma_buffer_ptr, config.num_max_rdma_chunked_send_tokens, - config.num_max_rdma_chunked_recv_tokens, buffer_ptrs_gpu, config.num_max_nvl_chunked_send_tokens, - config.num_max_nvl_chunked_recv_tokens, rank, num_ranks, comm_stream, num_channels, low_latency_mode, - port_channel_handles_device_ptr.get(), memory_channel_handles_device_ptr.get(), combine_nvls_head_mc, - combine_nvls_head_dev, combine_nvls_tail_mc, combine_nvls_tail_dev, peer_rdma_bases_gpu, - recv_pool_global_ptrs_gpu, ep_combine_recv_idx_gpu); + internode::combine(at::cuda::ScalarTypeToCudaDataType(x.scalar_type()), combined_x.data_ptr(), + combined_topk_weights_ptr, is_combined_token_in_rank.data_ptr(), x.data_ptr(), + topk_weights_ptr, combined_rdma_head.data_ptr(), combined_nvl_head.data_ptr(), + src_meta.data_ptr(), rdma_channel_prefix_matrix.data_ptr(), + rdma_rank_prefix_sum.data_ptr(), gbl_channel_prefix_matrix.data_ptr(), num_tokens, + num_combined_tokens, hidden, num_topk, rdma_buffer_ptr, config.num_max_rdma_chunked_send_tokens, + config.num_max_rdma_chunked_recv_tokens, buffer_ptrs_gpu, config.num_max_nvl_chunked_send_tokens, + config.num_max_nvl_chunked_recv_tokens, rank, num_ranks, comm_stream, num_channels, + low_latency_mode, port_channel_handles_device_ptr.get(), memory_channel_handles_device_ptr.get(), + combine_nvls_head_mc, combine_nvls_head_dev, combine_nvls_tail_mc, combine_nvls_tail_dev, + peer_rdma_bases_gpu, recv_pool_global_ptrs_gpu, ep_combine_recv_idx_gpu); std::optional event; if (async) { diff --git a/src/ext/ep/kernels/internode.cu b/src/ext/ep/kernels/internode.cu index b541ae94..5c3e2225 100644 --- a/src/ext/ep/kernels/internode.cu +++ b/src/ext/ep/kernels/internode.cu @@ -635,9 +635,12 @@ void notify_dispatch(const int* num_tokens_per_rank, int* moe_recv_counter_mappe // Get clean meta. inc5: under MSCCLPP_EP_DIRECT the rdma ring slot excludes // hidden (kEpDirect), so the clean region must match the SMALL slot the kernel // uses, else the cleaned head/tail region != the one the kernel reads. - const bool ep_direct = []() { const char* e = std::getenv("MSCCLPP_EP_DIRECT"); return e && std::atoi(e) != 0; }(); - auto rdma_clean_meta = get_rdma_clean_meta(ep_direct ? 0 : hidden_int4, num_scales, num_topk, num_topk, num_rdma_ranks, - num_max_rdma_chunked_recv_tokens, num_channels); + const bool ep_direct = []() { + const char* e = std::getenv("MSCCLPP_EP_DIRECT"); + return e && std::atoi(e) != 0; + }(); + auto rdma_clean_meta = get_rdma_clean_meta(ep_direct ? 0 : hidden_int4, num_scales, num_topk, num_topk, + num_rdma_ranks, num_max_rdma_chunked_recv_tokens, num_channels); auto nvl_clean_meta = get_nvl_clean_meta(hidden_int4, num_scales, num_topk, num_topk, num_rdma_ranks, NUM_MAX_NVL_PEERS, num_max_nvl_chunked_recv_tokens, num_channels); EP_HOST_ASSERT((rdma_clean_meta.first + rdma_clean_meta.second) * sizeof(int) <= num_rdma_bytes); @@ -1552,23 +1555,24 @@ void dispatch(void* recv_x, float* recv_x_scales, int64_t* recv_topk_idx, float* } #endif -#define DISPATCH_LAUNCH_CASE(num_rdma_ranks) \ - { \ - auto dispatch_func = \ - low_latency_mode ? (is_cached_dispatch ? EP_DISPATCH_KERNEL \ - : EP_DISPATCH_KERNEL) \ - : (is_cached_dispatch ? EP_DISPATCH_KERNEL \ - : EP_DISPATCH_KERNEL); \ - LAUNCH_KERNEL(&cfg, dispatch_func, reinterpret_cast(recv_x), recv_x_scales, recv_topk_idx, \ - recv_topk_weights, reinterpret_cast(recv_src_meta), reinterpret_cast(x), \ - x_scales, topk_idx, topk_weights, send_rdma_head, send_nvl_head, recv_rdma_channel_prefix_matrix, \ - recv_gbl_channel_prefix_matrix, rdma_channel_prefix_matrix, recv_rdma_rank_prefix_sum, \ - gbl_channel_prefix_matrix, recv_gbl_rank_prefix_sum, num_tokens, hidden_int4, num_scales, num_topk, \ - num_experts, is_token_in_rank, rdma_buffer_ptr, num_max_rdma_chunked_send_tokens, \ - num_max_rdma_chunked_recv_tokens, buffer_ptrs, num_max_nvl_chunked_send_tokens, \ - num_max_nvl_chunked_recv_tokens, rank, num_ranks, port_channel_handles, memory_channel_handles, \ - nvls_head_mc, nvls_head_dev, nvls_tail_mc, nvls_tail_dev, peer_rdma_bases EP_DISPATCH_EXTRA_ARGS); \ - } \ +#define DISPATCH_LAUNCH_CASE(num_rdma_ranks) \ + { \ + auto dispatch_func = \ + low_latency_mode \ + ? (is_cached_dispatch ? EP_DISPATCH_KERNEL \ + : EP_DISPATCH_KERNEL) \ + : (is_cached_dispatch ? EP_DISPATCH_KERNEL \ + : EP_DISPATCH_KERNEL); \ + LAUNCH_KERNEL(&cfg, dispatch_func, reinterpret_cast(recv_x), recv_x_scales, recv_topk_idx, \ + recv_topk_weights, reinterpret_cast(recv_src_meta), reinterpret_cast(x), \ + x_scales, topk_idx, topk_weights, send_rdma_head, send_nvl_head, recv_rdma_channel_prefix_matrix, \ + recv_gbl_channel_prefix_matrix, rdma_channel_prefix_matrix, recv_rdma_rank_prefix_sum, \ + gbl_channel_prefix_matrix, recv_gbl_rank_prefix_sum, num_tokens, hidden_int4, num_scales, num_topk, \ + num_experts, is_token_in_rank, rdma_buffer_ptr, num_max_rdma_chunked_send_tokens, \ + num_max_rdma_chunked_recv_tokens, buffer_ptrs, num_max_nvl_chunked_send_tokens, \ + num_max_nvl_chunked_recv_tokens, rank, num_ranks, port_channel_handles, memory_channel_handles, \ + nvls_head_mc, nvls_head_dev, nvls_tail_mc, nvls_tail_dev, peer_rdma_bases EP_DISPATCH_EXTRA_ARGS); \ + } \ break EP_HOST_ASSERT((topk_idx == nullptr) == (topk_weights == nullptr)); @@ -1754,10 +1758,13 @@ void cached_notify(int hidden_int4, int num_scales, int num_topk_idx, int num_to // carries hidden through the rdma channel, so its clean must stay full-slot. // is_cached_dispatch distinguishes the two callers (true=cached dispatch, // false=combine). - const bool ep_direct = []() { const char* e = std::getenv("MSCCLPP_EP_DIRECT"); return e && std::atoi(e) != 0; }(); + const bool ep_direct = []() { + const char* e = std::getenv("MSCCLPP_EP_DIRECT"); + return e && std::atoi(e) != 0; + }(); const int clean_hidden_int4 = (ep_direct && is_cached_dispatch) ? 0 : hidden_int4; - auto rdma_clean_meta = get_rdma_clean_meta(clean_hidden_int4, num_scales, num_topk_idx, num_topk_weights, num_rdma_ranks, - num_max_rdma_chunked_recv_tokens, num_channels); + auto rdma_clean_meta = get_rdma_clean_meta(clean_hidden_int4, num_scales, num_topk_idx, num_topk_weights, + num_rdma_ranks, num_max_rdma_chunked_recv_tokens, num_channels); auto nvl_clean_meta = get_nvl_clean_meta(hidden_int4, num_scales, num_topk_idx, num_topk_weights, num_rdma_ranks, NUM_MAX_NVL_PEERS, num_max_nvl_chunked_recv_tokens, num_channels); EP_HOST_ASSERT((rdma_clean_meta.first + rdma_clean_meta.second) * sizeof(int) <= num_rdma_bytes); @@ -1913,10 +1920,9 @@ __global__ void __launch_bounds__((NUM_MAX_NVL_PEERS + 1 + kNumForwarders) * 32, const bool is_in = (lane_id < kEpNumRanks) and is_combined_token_in_rank[static_cast(t) * num_ranks + lane_id]; const int recv_idx = is_in ? ep_combine_recv_idx[static_cast(t) * num_ranks + lane_id] : 0; - combine_token(is_in, recv_idx, lane_id, hidden_int4, num_topk, - combined_x + static_cast(t) * hidden_int4, - combined_topk_weights + static_cast(t) * num_topk, 1 << 30, - recv_fn, recv_tw_fn); + combine_token( + is_in, recv_idx, lane_id, hidden_int4, num_topk, combined_x + static_cast(t) * hidden_int4, + combined_topk_weights + static_cast(t) * num_topk, 1 << 30, recv_fn, recv_tw_fn); } } else { // >= 16 nodes (kEpNumRanks > 32): chunked-ballot discovery (lane-per-rank aliases @@ -1929,8 +1935,7 @@ __global__ void __launch_bounds__((NUM_MAX_NVL_PEERS + 1 + kNumForwarders) * 32, int topk_ranks[kEpMaxContrib], slot_indices[kEpMaxContrib], num_topk_ranks = 0; for (int base = 0; base < kEpNumRanks; base += 32) { const int r = base + lane_id; - const bool is_in = - (r < kEpNumRanks) and is_combined_token_in_rank[static_cast(t) * num_ranks + r]; + const bool is_in = (r < kEpNumRanks) and is_combined_token_in_rank[static_cast(t) * num_ranks + r]; const int slot = is_in ? ep_combine_recv_idx[static_cast(t) * num_ranks + r] : 0; unsigned ballot = __ballot_sync(0xffffffffu, is_in); while (ballot != 0u) { diff --git a/src/ext/ep/kernels/internode_ncclep.cuh b/src/ext/ep/kernels/internode_ncclep.cuh index d5c395b2..6edc5fd4 100644 --- a/src/ext/ep/kernels/internode_ncclep.cuh +++ b/src/ext/ep/kernels/internode_ncclep.cuh @@ -39,32 +39,32 @@ template __global__ void __launch_bounds__(((kNumDispatchRDMASenderWarps + 1 + NUM_MAX_NVL_PEERS) * 32), 1) dispatch_ncclep(int4* recv_x, float* recv_x_scales, int64_t* recv_topk_idx, float* recv_topk_weights, - SourceMeta* recv_src_meta, const int4* x, const float* x_scales, const int64_t* topk_idx, - const float* topk_weights, int* send_rdma_head, int* send_nvl_head, int* recv_rdma_channel_prefix_matrix, - int* recv_gbl_channel_prefix_matrix, const int* rdma_channel_prefix_matrix, - const int* recv_rdma_rank_prefix_sum, const int* gbl_channel_prefix_matrix, - const int* recv_gbl_rank_prefix_sum, int num_tokens, int hidden_int4, int num_scales, int num_topk, - int num_experts, const bool* is_token_in_rank, void* rdma_buffer_ptr, int num_max_rdma_chunked_send_tokens, - int num_max_rdma_chunked_recv_tokens, void** buffer_ptrs, int num_max_nvl_chunked_send_tokens, - int num_max_nvl_chunked_recv_tokens, int rank, int num_ranks, - mscclpp::PortChannelDeviceHandle* port_channel_handles, - mscclpp::MemoryChannelDeviceHandle* memory_channel_handles, - // Phase 3 NVLS counter pointers (nullptr → fall back to PortChannel/atomicAdd path). - void* nvls_head_mc, void* nvls_head_dev, void* nvls_tail_mc, void* nvls_tail_dev, - // Phase 4: per-peer fabric-IPC base pointers; when non-null, cross-node data - // PUTs go directly through NVL72 fabric VA instead of `handle.put` over IB. - void* const* peer_rdma_bases, - // Increment 4: per-peer base pointers of the VMM-allocated (cuMem FABRIC/POSIX-FD) - // recv-output pool. recv_pool_ptrs[peer] points at peer's pool header; non-null - // enables the cross-GPU forwarder direct-write of hidden into the destination's - // final recv_x (TMA-eligible peer VA); nullptr = legacy receiver-drain path. - void* const* recv_pool_ptrs, - // Increment 5 (inc5): domain-wide recv-pool bases indexed by GLOBAL rank - // (all num_ranks). Non-null + kEpDirect => sender writes hidden direct here. - void* const* recv_pool_global_ptrs, - // Increment 5 combine-direct (Stage 1): [num_tokens * num_ranks] gather map; - // sender writes recv_idx per (token, dst global rank) for combine to gather. - int* ep_combine_recv_idx) { + SourceMeta* recv_src_meta, const int4* x, const float* x_scales, const int64_t* topk_idx, + const float* topk_weights, int* send_rdma_head, int* send_nvl_head, + int* recv_rdma_channel_prefix_matrix, int* recv_gbl_channel_prefix_matrix, + const int* rdma_channel_prefix_matrix, const int* recv_rdma_rank_prefix_sum, + const int* gbl_channel_prefix_matrix, const int* recv_gbl_rank_prefix_sum, int num_tokens, + int hidden_int4, int num_scales, int num_topk, int num_experts, const bool* is_token_in_rank, + void* rdma_buffer_ptr, int num_max_rdma_chunked_send_tokens, int num_max_rdma_chunked_recv_tokens, + void** buffer_ptrs, int num_max_nvl_chunked_send_tokens, int num_max_nvl_chunked_recv_tokens, + int rank, int num_ranks, mscclpp::PortChannelDeviceHandle* port_channel_handles, + mscclpp::MemoryChannelDeviceHandle* memory_channel_handles, + // Phase 3 NVLS counter pointers (nullptr → fall back to PortChannel/atomicAdd path). + void* nvls_head_mc, void* nvls_head_dev, void* nvls_tail_mc, void* nvls_tail_dev, + // Phase 4: per-peer fabric-IPC base pointers; when non-null, cross-node data + // PUTs go directly through NVL72 fabric VA instead of `handle.put` over IB. + void* const* peer_rdma_bases, + // Increment 4: per-peer base pointers of the VMM-allocated (cuMem FABRIC/POSIX-FD) + // recv-output pool. recv_pool_ptrs[peer] points at peer's pool header; non-null + // enables the cross-GPU forwarder direct-write of hidden into the destination's + // final recv_x (TMA-eligible peer VA); nullptr = legacy receiver-drain path. + void* const* recv_pool_ptrs, + // Increment 5 (inc5): domain-wide recv-pool bases indexed by GLOBAL rank + // (all num_ranks). Non-null + kEpDirect => sender writes hidden direct here. + void* const* recv_pool_global_ptrs, + // Increment 5 combine-direct (Stage 1): [num_tokens * num_ranks] gather map; + // sender writes recv_idx per (token, dst global rank) for combine to gather. + int* ep_combine_recv_idx) { enum class WarpRole { kRDMASender, kRDMASenderCoordinator, @@ -308,8 +308,7 @@ __global__ void __launch_bounds__(((kNumDispatchRDMASenderWarps + 1 + NUM_MAX_NV *reinterpret_cast(is_token_in_rank + token_idx * num_ranks + lane_id * NUM_MAX_NVL_PEERS); // Acquire sequential lock - while (lane_id == 0 and rdma_send_next_token_idx != token_idx) - ; + while (lane_id == 0 and rdma_send_next_token_idx != token_idx); __syncwarp(); // Acquire next tail @@ -401,9 +400,9 @@ __global__ void __launch_bounds__(((kNumDispatchRDMASenderWarps + 1 + NUM_MAX_NV if (not bvals_j[g]) continue; const int dg = node * NUM_MAX_NVL_PEERS + g; const int recv_idx = __shfl_sync(0xffffffff, ep_my_idx[g], node); - ep_dst_pools[ep_num_pools++] = reinterpret_cast( - reinterpret_cast(recv_pool_global_ptrs[dg]) + ep_pool_header_bytes + - static_cast(recv_idx) * hidden_bytes); + ep_dst_pools[ep_num_pools++] = + reinterpret_cast(reinterpret_cast(recv_pool_global_ptrs[dg]) + ep_pool_header_bytes + + static_cast(recv_idx) * hidden_bytes); } } auto st_pool_broadcast = [=](const int key, const int4& value) { @@ -453,8 +452,7 @@ __global__ void __launch_bounds__(((kNumDispatchRDMASenderWarps + 1 + NUM_MAX_NV // Epilogue // Acquire sequential lock - while (lane_id == 0 and rdma_send_next_token_idx != token_idx) - ; + while (lane_id == 0 and rdma_send_next_token_idx != token_idx); __syncwarp(); // Update last token tail (epilogue). See in-loop note on atomicMax. @@ -803,7 +801,8 @@ __global__ void __launch_bounds__(((kNumDispatchRDMASenderWarps + 1 + NUM_MAX_NV for (int i = src_rdma_head, num_tokens_sent = 0; i < src_rdma_tail; ++i) { auto rdma_slot_idx = i % num_max_rdma_chunked_recv_tokens; void* shifted = rdma_channel_data.recv_buffer(src_rdma_rank) + rdma_slot_idx * num_bytes_per_rdma_token; - auto src_meta = ld_nc_global(reinterpret_cast(reinterpret_cast(shifted) + ring_hidden_bytes)); + auto src_meta = + ld_nc_global(reinterpret_cast(reinterpret_cast(shifted) + ring_hidden_bytes)); lane_id == src_rdma_rank ? (num_tokens_to_recv_from_rdma -= 1) : 0; bool is_in_dst_nvl_rank = src_meta.is_token_in_nvl_rank(dst_nvl_rank); if (lane_id == src_rdma_rank) { @@ -1160,6 +1159,5 @@ __global__ void __launch_bounds__(((kNumDispatchRDMASenderWarps + 1 + NUM_MAX_NV } } - #endif // EP_DISPATCH_NCCLEP #endif // MSCCLPP_EP_INTERNODE_NCCLEP_CUH_ diff --git a/src/ext/ep/kernels/intranode_kernel.cu b/src/ext/ep/kernels/intranode_kernel.cu index 65379150..00e8ce2c 100644 --- a/src/ext/ep/kernels/intranode_kernel.cu +++ b/src/ext/ep/kernels/intranode_kernel.cu @@ -173,7 +173,8 @@ __global__ void __launch_bounds__(kNumThreads, 1) int* recv_channel_offset, int* send_head, const int4* x, const float* x_scales, const int64_t* topk_idx, const float* topk_weights, const bool* is_token_in_rank, const int* channel_prefix_matrix, int num_tokens, int hidden_int4, int num_topk, int num_experts, int num_scales, void** buffer_ptrs, int rank, - int num_max_send_tokens, int num_recv_buffer_tokens, void** recv_pool_ptrs, int64_t recv_pool_header_bytes) { + int num_max_send_tokens, int num_recv_buffer_tokens, void** recv_pool_ptrs, + int64_t recv_pool_header_bytes) { const auto num_sms = static_cast(gridDim.x), sm_id = static_cast(blockIdx.x); const auto thread_id = static_cast(threadIdx.x); const bool is_sender = sm_id % 2 == 0; @@ -266,8 +267,8 @@ __global__ void __launch_bounds__(kNumThreads, 1) ? channel_prefix_matrix[responsible_rank * num_channels + responsible_channel - 1] : 0; direct_base = static_cast(rank_off + ch_start); - direct_dst_pool = - reinterpret_cast(reinterpret_cast(recv_pool_ptrs[responsible_rank]) + recv_pool_header_bytes); + direct_dst_pool = reinterpret_cast(reinterpret_cast(recv_pool_ptrs[responsible_rank]) + + recv_pool_header_bytes); } // Get tasks diff --git a/test/python/ext/ep/test_internode_multirank.py b/test/python/ext/ep/test_internode_multirank.py index 3e0dc2f9..366bee4e 100644 --- a/test/python/ext/ep/test_internode_multirank.py +++ b/test/python/ext/ep/test_internode_multirank.py @@ -226,7 +226,7 @@ def main(): ) dist.barrier(group=group) - _skip_verify = os.environ.get("MSCCLPP_EP_SKIP_VERIFY","0") in ("1","true","True") + _skip_verify = os.environ.get("MSCCLPP_EP_SKIP_VERIFY", "0") in ("1", "true", "True") # Validate recv buffer: for each source rank i, the block carries value i. assert recv_x.dim() == 2 and recv_x.size(1) == hidden start = 0