ep: fix internode combine in multirank test

Two issues prevented internode HT combine from completing on 2x8 H100:

1. Wrong prefix matrices passed to internode_combine. Combine runs in the
   reverse direction of dispatch, so it must consume the receiver-side
   matrices returned by dispatch (recv_rdma_channel_prefix_matrix,
   recv_rdma_rank_prefix_sum, recv_gbl_channel_prefix_matrix), not the
   sender-side rdma_channel_prefix_matrix / gbl_channel_prefix_matrix.
   This matches DeepEP's deep_ep/buffer.py::internode_combine handle
   unpacking. Without the fix the NVL forwarder's 'NVL check' timed out
   because token_start_idx/token_end_idx were computed against the wrong
   per-channel layout.

2. Cross-rank race between dispatch and combine. Even with the correct
   matrices, launching combine immediately after dispatch deadlocked the
   forwarder NVL check (tail stuck one short of expected_head) because
   peers still had in-flight dispatch proxy traffic while fast ranks had
   already started combine. A torch.cuda.synchronize() + dist.barrier()
   between the two calls makes the test pass deterministically on 16
   ranks (combine diff == 0, max|expected| up to 60.0).

The barrier in the test is a workaround; the real fix belongs in
Buffer::internode_dispatch / Buffer::internode_combine so the
dispatch->combine handoff fully fences outstanding proxy work across
ranks. Marked with an XXX comment in the test.
This commit is contained in:
Qinghua Zhou
2026-04-22 02:21:29 +00:00
parent 393d6e2673
commit c351b871a1

View File

@@ -164,15 +164,28 @@ def main():
if rank == 0:
print(f"[dispatch] OK (recv {recv_x.size(0)} tokens)", flush=True)
# XXX: forcing a device+group sync here is currently required for combine
# to see consistent dispatch outputs. Without this both send_nvl_head and
# the various *_channel_prefix_matrix tensors can still be in flight on
# the comm stream when combine launches, producing a deadlock inside the
# combine forwarder (NVL check never advances). Investigate proper
# stream-dependency hand-off in Buffer::internode_dispatch.
torch.cuda.synchronize()
dist.barrier(group=group)
# internode_combine signature:
# (x, topk_weights,
# src_meta, is_combined_token_in_rank,
# rdma_channel_prefix_matrix, rdma_rank_prefix_sum, gbl_channel_prefix_matrix,
# combined_rdma_head, combined_nvl_head, config, previous_event, async, allocate_on_comm_stream)
# NOTE: combine goes in the reverse direction of dispatch, so the prefix
# matrices passed here must be the RECEIVER-side ones returned by dispatch
# (`recv_rdma_channel_prefix_matrix`, `recv_rdma_rank_prefix_sum`,
# `recv_gbl_channel_prefix_matrix`) — not the sender-side ones.
combined_x, combined_topk_weights, _ = buf.runtime.internode_combine(
recv_x, recv_topk_weights,
recv_src_meta, is_token_in_rank,
rdma_channel_prefix_matrix, recv_rdma_rank_prefix_sum, gbl_channel_prefix_matrix,
recv_rdma_channel_prefix_matrix, recv_rdma_rank_prefix_sum, recv_gbl_channel_prefix_matrix,
send_rdma_head, send_nvl_head,
cfg, None, False, False,
)
@@ -182,8 +195,7 @@ def main():
got = combined_x.float().mean(dim=1)
diff = (got - expected).abs().max().item()
max_exp = expected.abs().max().item()
if rank == 0:
print(f"[combine] max|got-expected|={diff:.4e} max|expected|={max_exp:.4e}", flush=True)
print(f"[combine r{rank}] max|got-expected|={diff:.4e} max|expected|={max_exp:.4e}", flush=True)
assert diff < 1e-2, f"rank{rank}: combine mismatch max diff {diff}"
dist.barrier(group=group)