From ae59eab6a210525c3d9c534f6edd1ce013a4e626 Mon Sep 17 00:00:00 2001 From: Qinghua Zhou Date: Tue, 24 Feb 2026 07:17:17 +0000 Subject: [PATCH] Add unified benchmarking function to test all_to_all_single of mscclpp and torch --- python/test/test_alltoallv_mscclpp.py | 152 ++++++++++---------------- 1 file changed, 59 insertions(+), 93 deletions(-) diff --git a/python/test/test_alltoallv_mscclpp.py b/python/test/test_alltoallv_mscclpp.py index 955ce4b9..611726bb 100644 --- a/python/test/test_alltoallv_mscclpp.py +++ b/python/test/test_alltoallv_mscclpp.py @@ -13,6 +13,8 @@ import torch import torch.distributed as dist import os import time +import random +from typing import Callable, List, Optional # Must init torch.distributed before importing mscclpp modules # to set rank/world_size environment variables @@ -153,99 +155,75 @@ def main(): print(f" Input total: {total_input}, Output total: {total_output}") print(f" Local copy verified: {local_ok}") print(f" {'PASS' if local_ok else 'FAIL'}") - - # Test 3: Performance benchmark with variable sizes (1KB to 128MB avg per peer) - if rank == 0: - print("\n[Test 3] Variable-size performance benchmark (1KB to 128MB avg per peer)") - print(f" {'Avg Size':>10s} {'Iters':>5s} {'Total (ms)':>10s} {'Lat (us)':>10s} {'algBW(GB/s)':>12s}") - print(f" {'-'*10} {'-'*5} {'-'*10} {'-'*10} {'-'*12}") - # Message sizes: average bytes sent to each peer - msg_sizes = [1 << s for s in range(10, 28) if s % 2 == 0] # powers of 4 from 1KB to 64MB - msg_sizes.append(128 * 1024 * 1024) # add 128MB - - for avg_msg_size in msg_sizes: - # Build a variable send matrix: send_matrix[i][j] = bytes rank i sends to rank j. - # Use a deterministic seed so all ranks compute the same matrix. - # Sizes vary from 0.5× to 1.5× of avg_msg_size (in float32 elements). - import random + # ── Unified benchmark helper ────────────────────────────────────────── + def build_variable_send_matrix(avg_msg_size: int, world_size: int): + """Build a deterministic variable-size send matrix (0.5×–1.5× of avg).""" random.seed(12345) - avg_elems = avg_msg_size // 4 # float32 = 4 bytes + avg_elems = avg_msg_size // 4 # float32 send_matrix = [] for i in range(world_size): row = [] for j in range(world_size): - # Random factor between 0.5 and 1.5 factor = 0.5 + random.random() elems = max(1, int(avg_elems * factor)) row.append(elems) send_matrix.append(row) + return send_matrix - input_split_sizes = send_matrix[rank] - output_split_sizes = [send_matrix[j][rank] for j in range(world_size)] - - total_send = sum(input_split_sizes) - total_recv = sum(output_split_sizes) - - input_tensor = torch.randn(total_send, dtype=torch.float32, device='cuda') - output_tensor = torch.empty(total_recv, dtype=torch.float32, device='cuda') - - # Fewer warmup/iters for very large sizes - n_warmup = 3 if avg_msg_size >= 16 * 1024 * 1024 else 5 - n_iters = 5 if avg_msg_size >= 64 * 1024 * 1024 else (10 if avg_msg_size >= 4 * 1024 * 1024 else 20) - - # Warmup + def bench_alltoallv( + fn: Callable, + input_tensor: torch.Tensor, + output_tensor: torch.Tensor, + input_split_sizes: List[int], + output_split_sizes: List[int], + n_warmup: int, + n_iters: int, + ) -> tuple: + """Benchmark an all_to_all_single implementation. Returns (latency_us, algbw_gbps).""" for _ in range(n_warmup): - alltoallv.all_to_all_single( - input_tensor, output=output_tensor, - input_split_sizes=input_split_sizes, - output_split_sizes=output_split_sizes) + fn(input_tensor, output_tensor, input_split_sizes, output_split_sizes) torch.cuda.synchronize() - # Benchmark start = time.perf_counter() for _ in range(n_iters): - alltoallv.all_to_all_single( - input_tensor, output=output_tensor, - input_split_sizes=input_split_sizes, - output_split_sizes=output_split_sizes) + fn(input_tensor, output_tensor, input_split_sizes, output_split_sizes) torch.cuda.synchronize() elapsed = time.perf_counter() - start - # Algorithm bandwidth: total bytes received per rank / time (unidirectional) - total_recv_bytes = total_recv * 4 # float32 - total_bytes = total_recv_bytes * n_iters - bandwidth_gbps = total_bytes / elapsed / 1e9 - latency_us = elapsed / n_iters * 1e6 + total_recv_bytes = sum(output_split_sizes) * 4 # float32 + algbw = total_recv_bytes * n_iters / elapsed / 1e9 + lat = elapsed / n_iters * 1e6 + return lat, algbw - if rank == 0: - if avg_msg_size >= 1024 * 1024: - size_str = f"{avg_msg_size // (1024*1024)}MB" - elif avg_msg_size >= 1024: - size_str = f"{avg_msg_size // 1024}KB" - else: - size_str = f"{avg_msg_size}B" - print(f" {size_str:>10s} {n_iters:>5d} {elapsed*1000:>10.2f} {latency_us:>10.1f} {bandwidth_gbps:>12.2f}") - - # Test 4: torch.distributed.all_to_all_single baseline (same variable-size data) + # Wrap mscclpp and torch.dist into the same calling convention + def mscclpp_fn(inp, out, in_splits, out_splits): + alltoallv.all_to_all_single(inp, output=out, + input_split_sizes=in_splits, + output_split_sizes=out_splits) + + def torch_fn(inp, out, in_splits, out_splits): + dist.all_to_all_single(out, inp, + output_split_sizes=out_splits, + input_split_sizes=in_splits) + + # ── Test 3: Side-by-side comparison ─────────────────────────────────── if rank == 0: - print("\n[Test 4] torch.dist.all_to_all_single baseline (same variable sizes)") - print(f" {'Avg Size':>10s} {'Iters':>5s} {'Total (ms)':>10s} {'Lat (us)':>10s} {'algBW(GB/s)':>12s}") - print(f" {'-'*10} {'-'*5} {'-'*10} {'-'*10} {'-'*12}") + print("\n[Test 3] Variable-size benchmark: mscclpp vs torch.dist (1KB–128MB avg/peer)") + print(f" {'Avg Size':>10s} " + f"{'mscclpp Lat':>12s} {'mscclpp BW':>11s} " + f"{'torch Lat':>10s} {'torch BW':>9s} " + f"{'Speedup':>7s}") + print(f" {'-'*10} " + f"{'-'*12} {'-'*11} " + f"{'-'*10} {'-'*9} " + f"{'-'*7}") + + msg_sizes = [1 << s for s in range(10, 28) if s % 2 == 0] + msg_sizes.append(128 * 1024 * 1024) for avg_msg_size in msg_sizes: - # Rebuild the same send_matrix (same seed → same data) - import random - random.seed(12345) - avg_elems = avg_msg_size // 4 - send_matrix = [] - for i in range(world_size): - row = [] - for j in range(world_size): - factor = 0.5 + random.random() - elems = max(1, int(avg_elems * factor)) - row.append(elems) - send_matrix.append(row) + send_matrix = build_variable_send_matrix(avg_msg_size, world_size) input_split_sizes = send_matrix[rank] output_split_sizes = [send_matrix[j][rank] for j in range(world_size)] @@ -259,28 +237,12 @@ def main(): n_warmup = 3 if avg_msg_size >= 16 * 1024 * 1024 else 5 n_iters = 5 if avg_msg_size >= 64 * 1024 * 1024 else (10 if avg_msg_size >= 4 * 1024 * 1024 else 20) - # Warmup - for _ in range(n_warmup): - dist.all_to_all_single( - output_tensor, input_tensor, - output_split_sizes=output_split_sizes, - input_split_sizes=input_split_sizes) - torch.cuda.synchronize() - - # Benchmark - start = time.perf_counter() - for _ in range(n_iters): - dist.all_to_all_single( - output_tensor, input_tensor, - output_split_sizes=output_split_sizes, - input_split_sizes=input_split_sizes) - torch.cuda.synchronize() - elapsed = time.perf_counter() - start - - total_recv_bytes = total_recv * 4 - total_bytes = total_recv_bytes * n_iters - bandwidth_gbps = total_bytes / elapsed / 1e9 - latency_us = elapsed / n_iters * 1e6 + m_lat, m_bw = bench_alltoallv(mscclpp_fn, input_tensor, output_tensor, + input_split_sizes, output_split_sizes, + n_warmup, n_iters) + t_lat, t_bw = bench_alltoallv(torch_fn, input_tensor, output_tensor, + input_split_sizes, output_split_sizes, + n_warmup, n_iters) if rank == 0: if avg_msg_size >= 1024 * 1024: @@ -289,7 +251,11 @@ def main(): size_str = f"{avg_msg_size // 1024}KB" else: size_str = f"{avg_msg_size}B" - print(f" {size_str:>10s} {n_iters:>5d} {elapsed*1000:>10.2f} {latency_us:>10.1f} {bandwidth_gbps:>12.2f}") + speedup = m_bw / t_bw if t_bw > 0 else float('inf') + print(f" {size_str:>10s} " + f"{m_lat:>10.1f}us {m_bw:>9.2f}GB " + f"{t_lat:>8.1f}us {t_bw:>7.2f}GB " + f"{speedup:>6.2f}x") # Cleanup dist.barrier()