Add unified benchmarking function to test all_to_all_single of mscclpp and torch

This commit is contained in:
Qinghua Zhou
2026-02-24 07:17:17 +00:00
parent 715ecd91cf
commit ae59eab6a2

View File

@@ -13,6 +13,8 @@ import torch
import torch.distributed as dist
import os
import time
import random
from typing import Callable, List, Optional
# Must init torch.distributed before importing mscclpp modules
# to set rank/world_size environment variables
@@ -153,99 +155,75 @@ def main():
print(f" Input total: {total_input}, Output total: {total_output}")
print(f" Local copy verified: {local_ok}")
print(f" {'PASS' if local_ok else 'FAIL'}")
# Test 3: Performance benchmark with variable sizes (1KB to 128MB avg per peer)
if rank == 0:
print("\n[Test 3] Variable-size performance benchmark (1KB to 128MB avg per peer)")
print(f" {'Avg Size':>10s} {'Iters':>5s} {'Total (ms)':>10s} {'Lat (us)':>10s} {'algBW(GB/s)':>12s}")
print(f" {'-'*10} {'-'*5} {'-'*10} {'-'*10} {'-'*12}")
# Message sizes: average bytes sent to each peer
msg_sizes = [1 << s for s in range(10, 28) if s % 2 == 0] # powers of 4 from 1KB to 64MB
msg_sizes.append(128 * 1024 * 1024) # add 128MB
for avg_msg_size in msg_sizes:
# Build a variable send matrix: send_matrix[i][j] = bytes rank i sends to rank j.
# Use a deterministic seed so all ranks compute the same matrix.
# Sizes vary from 0.5× to 1.5× of avg_msg_size (in float32 elements).
import random
# ── Unified benchmark helper ──────────────────────────────────────────
def build_variable_send_matrix(avg_msg_size: int, world_size: int):
"""Build a deterministic variable-size send matrix (0.5×1.5× of avg)."""
random.seed(12345)
avg_elems = avg_msg_size // 4 # float32 = 4 bytes
avg_elems = avg_msg_size // 4 # float32
send_matrix = []
for i in range(world_size):
row = []
for j in range(world_size):
# Random factor between 0.5 and 1.5
factor = 0.5 + random.random()
elems = max(1, int(avg_elems * factor))
row.append(elems)
send_matrix.append(row)
return send_matrix
input_split_sizes = send_matrix[rank]
output_split_sizes = [send_matrix[j][rank] for j in range(world_size)]
total_send = sum(input_split_sizes)
total_recv = sum(output_split_sizes)
input_tensor = torch.randn(total_send, dtype=torch.float32, device='cuda')
output_tensor = torch.empty(total_recv, dtype=torch.float32, device='cuda')
# Fewer warmup/iters for very large sizes
n_warmup = 3 if avg_msg_size >= 16 * 1024 * 1024 else 5
n_iters = 5 if avg_msg_size >= 64 * 1024 * 1024 else (10 if avg_msg_size >= 4 * 1024 * 1024 else 20)
# Warmup
def bench_alltoallv(
fn: Callable,
input_tensor: torch.Tensor,
output_tensor: torch.Tensor,
input_split_sizes: List[int],
output_split_sizes: List[int],
n_warmup: int,
n_iters: int,
) -> tuple:
"""Benchmark an all_to_all_single implementation. Returns (latency_us, algbw_gbps)."""
for _ in range(n_warmup):
alltoallv.all_to_all_single(
input_tensor, output=output_tensor,
input_split_sizes=input_split_sizes,
output_split_sizes=output_split_sizes)
fn(input_tensor, output_tensor, input_split_sizes, output_split_sizes)
torch.cuda.synchronize()
# Benchmark
start = time.perf_counter()
for _ in range(n_iters):
alltoallv.all_to_all_single(
input_tensor, output=output_tensor,
input_split_sizes=input_split_sizes,
output_split_sizes=output_split_sizes)
fn(input_tensor, output_tensor, input_split_sizes, output_split_sizes)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
# Algorithm bandwidth: total bytes received per rank / time (unidirectional)
total_recv_bytes = total_recv * 4 # float32
total_bytes = total_recv_bytes * n_iters
bandwidth_gbps = total_bytes / elapsed / 1e9
latency_us = elapsed / n_iters * 1e6
total_recv_bytes = sum(output_split_sizes) * 4 # float32
algbw = total_recv_bytes * n_iters / elapsed / 1e9
lat = elapsed / n_iters * 1e6
return lat, algbw
if rank == 0:
if avg_msg_size >= 1024 * 1024:
size_str = f"{avg_msg_size // (1024*1024)}MB"
elif avg_msg_size >= 1024:
size_str = f"{avg_msg_size // 1024}KB"
else:
size_str = f"{avg_msg_size}B"
print(f" {size_str:>10s} {n_iters:>5d} {elapsed*1000:>10.2f} {latency_us:>10.1f} {bandwidth_gbps:>12.2f}")
# Test 4: torch.distributed.all_to_all_single baseline (same variable-size data)
# Wrap mscclpp and torch.dist into the same calling convention
def mscclpp_fn(inp, out, in_splits, out_splits):
alltoallv.all_to_all_single(inp, output=out,
input_split_sizes=in_splits,
output_split_sizes=out_splits)
def torch_fn(inp, out, in_splits, out_splits):
dist.all_to_all_single(out, inp,
output_split_sizes=out_splits,
input_split_sizes=in_splits)
# ── Test 3: Side-by-side comparison ───────────────────────────────────
if rank == 0:
print("\n[Test 4] torch.dist.all_to_all_single baseline (same variable sizes)")
print(f" {'Avg Size':>10s} {'Iters':>5s} {'Total (ms)':>10s} {'Lat (us)':>10s} {'algBW(GB/s)':>12s}")
print(f" {'-'*10} {'-'*5} {'-'*10} {'-'*10} {'-'*12}")
print("\n[Test 3] Variable-size benchmark: mscclpp vs torch.dist (1KB128MB avg/peer)")
print(f" {'Avg Size':>10s} "
f"{'mscclpp Lat':>12s} {'mscclpp BW':>11s} "
f"{'torch Lat':>10s} {'torch BW':>9s} "
f"{'Speedup':>7s}")
print(f" {'-'*10} "
f"{'-'*12} {'-'*11} "
f"{'-'*10} {'-'*9} "
f"{'-'*7}")
msg_sizes = [1 << s for s in range(10, 28) if s % 2 == 0]
msg_sizes.append(128 * 1024 * 1024)
for avg_msg_size in msg_sizes:
# Rebuild the same send_matrix (same seed → same data)
import random
random.seed(12345)
avg_elems = avg_msg_size // 4
send_matrix = []
for i in range(world_size):
row = []
for j in range(world_size):
factor = 0.5 + random.random()
elems = max(1, int(avg_elems * factor))
row.append(elems)
send_matrix.append(row)
send_matrix = build_variable_send_matrix(avg_msg_size, world_size)
input_split_sizes = send_matrix[rank]
output_split_sizes = [send_matrix[j][rank] for j in range(world_size)]
@@ -259,28 +237,12 @@ def main():
n_warmup = 3 if avg_msg_size >= 16 * 1024 * 1024 else 5
n_iters = 5 if avg_msg_size >= 64 * 1024 * 1024 else (10 if avg_msg_size >= 4 * 1024 * 1024 else 20)
# Warmup
for _ in range(n_warmup):
dist.all_to_all_single(
output_tensor, input_tensor,
output_split_sizes=output_split_sizes,
input_split_sizes=input_split_sizes)
torch.cuda.synchronize()
# Benchmark
start = time.perf_counter()
for _ in range(n_iters):
dist.all_to_all_single(
output_tensor, input_tensor,
output_split_sizes=output_split_sizes,
input_split_sizes=input_split_sizes)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
total_recv_bytes = total_recv * 4
total_bytes = total_recv_bytes * n_iters
bandwidth_gbps = total_bytes / elapsed / 1e9
latency_us = elapsed / n_iters * 1e6
m_lat, m_bw = bench_alltoallv(mscclpp_fn, input_tensor, output_tensor,
input_split_sizes, output_split_sizes,
n_warmup, n_iters)
t_lat, t_bw = bench_alltoallv(torch_fn, input_tensor, output_tensor,
input_split_sizes, output_split_sizes,
n_warmup, n_iters)
if rank == 0:
if avg_msg_size >= 1024 * 1024:
@@ -289,7 +251,11 @@ def main():
size_str = f"{avg_msg_size // 1024}KB"
else:
size_str = f"{avg_msg_size}B"
print(f" {size_str:>10s} {n_iters:>5d} {elapsed*1000:>10.2f} {latency_us:>10.1f} {bandwidth_gbps:>12.2f}")
speedup = m_bw / t_bw if t_bw > 0 else float('inf')
print(f" {size_str:>10s} "
f"{m_lat:>10.1f}us {m_bw:>9.2f}GB "
f"{t_lat:>8.1f}us {t_bw:>7.2f}GB "
f"{speedup:>6.2f}x")
# Cleanup
dist.barrier()