revert dsl

This commit is contained in:
Ubuntu
2026-04-10 17:21:50 +00:00
parent 96defbd8a8
commit 68690ecdcd
2 changed files with 57 additions and 488 deletions

View File

@@ -1,323 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
from mscclpp import (
DataType,
Executor,
ExecutionPlan,
PacketType,
npkit,
env,
)
from mscclpp import CommGroup, GpuBuffer
from mscclpp.utils import KernelBuilder, pack
import os
import struct
import cupy as cp
from mpi4py import MPI
def parse_dtype(dtype_str):
dtype_str = dtype_str.strip().lower()
if dtype_str == "float16":
return cp.float16
elif dtype_str == "float32":
return cp.float32
elif dtype_str == "int32":
return cp.int32
else:
raise ValueError(f"Unknown data type: {dtype_str}")
def parse_size(size_str):
size_str = size_str.strip()
if not size_str:
raise ValueError("Size string can not be empty")
units = {"K": 1024, "M": 1024**2, "G": 1024**3}
if size_str[-1].upper() in units:
return int(size_str[:-1]) * units[size_str[-1].upper()]
else:
return int(size_str)
def dtype_to_mscclpp_dtype(dtype):
if dtype == cp.float16:
return DataType.float16
elif dtype == cp.float32:
return DataType.float32
elif dtype == cp.int32:
return DataType.int32
else:
raise ValueError(f"Unknown data type: {dtype}")
def bench_time(n_iters: int, n_graph_iters: int, func_iter):
"""
Capture CUDA graph for n_iters launches. func_iter(stream, i) must vary slot by i.
"""
stream = cp.cuda.Stream(non_blocking=True)
with stream:
stream.begin_capture()
for i in range(n_iters):
func_iter(stream, i)
graph = stream.end_capture()
# warmup
graph.launch(stream)
start = cp.cuda.Event()
end = cp.cuda.Event()
start.record(stream)
for _ in range(n_graph_iters):
graph.launch(stream)
end.record(stream)
end.synchronize()
# us per iteration
return cp.cuda.get_elapsed_time(start, end) / n_iters * 1000.0 / n_graph_iters
def bench_correctness(
collective: str,
input_slot: cp.ndarray,
result_slot: cp.ndarray,
test_buf: cp.ndarray,
dtype_str: str,
rank: int,
num_ranks: int,
n_iters: int,
func_iter,
):
"""
Correctness check on ONE per-iteration slot view (input_slot/result_slot change per i via func_iter).
We pass the per-iteration element count to verifier kernels.
"""
type_size = cp.dtype(parse_dtype(dtype_str)).itemsize
nelems_per_iter = input_slot.nbytes // type_size
print("collective: ", collective)
fill_data_kernel_name = "fill_data_%s" % dtype_str
if "allgather" in collective:
coll = "all_gather"
elif "reducescatter" in collective:
coll = "reduce_scatter"
elif "allreduce" in collective:
coll = "all_reduce"
else:
coll = "sendrecv"
test_data_kernel_name = "test_data_%s_%s" % (coll, dtype_str)
file_dir = os.path.dirname(os.path.abspath(__file__))
fill_data_kernel = KernelBuilder(
file="executor_test_verifier.cu", kernel_name=fill_data_kernel_name, file_dir=file_dir
).get_compiled_kernel()
test_data_kernel = KernelBuilder(
file="executor_test_verifier.cu", kernel_name=test_data_kernel_name, file_dir=file_dir
).get_compiled_kernel()
nblocks = 64
nthreads = 1024
stream = cp.cuda.Stream(non_blocking=True)
with stream:
stream.begin_capture()
for i in range(n_iters):
# WARNING: input_slot/result_slot variables are placeholders; actual slot views are chosen inside func_iter.
# We only use these kernels with the CURRENT slot views computed below for this iteration.
func_iter(stream, i, do_verify=True, fill_kernel=fill_data_kernel, test_kernel=test_data_kernel,
nblocks=nblocks, nthreads=nthreads, nelems_per_iter=nelems_per_iter,
test_buf=test_buf, rank=rank, num_ranks=num_ranks)
graph = stream.end_capture()
graph.launch(stream)
stream.synchronize()
def build_bufs_sendrecv_ring(size_bytes: int, slots: int, dtype: cp.dtype):
"""
Build ring buffers for sendrecv:
- per-iteration message bytes = size_bytes
- total allocated bytes per buffer = slots * size_bytes
"""
type_size = cp.dtype(dtype).itemsize
assert (size_bytes % type_size) == 0, "size not multiple of dtype size"
nelems_per_iter = size_bytes // type_size
total_nelems = nelems_per_iter * slots
input_buf = GpuBuffer(total_nelems, dtype=dtype)
result_buf = GpuBuffer(total_nelems, dtype=dtype)
test_buf = cp.zeros(nelems_per_iter, dtype=dtype) # expected for one iteration
return input_buf, result_buf, test_buf, nelems_per_iter
def main(
execution_plan_path: str,
size: int, # per-iteration bytes
in_place: bool = True,
dtype_str: str = "float16",
packet_type: PacketType = PacketType.LL16,
n_iters: int = 10,
n_graph_iters: int = 10,
slots: int = 4, # ring buffer depth
):
mscclpp_group = CommGroup(MPI.COMM_WORLD)
cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use()
executor = Executor(mscclpp_group.communicator)
npkit_dump_dir = env().npkit_dump_dir
if npkit_dump_dir != "":
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(execution_plan_path, mscclpp_group.my_rank)
collective = execution_plan.collective
dtype = parse_dtype(dtype_str)
# We only change allocation/behavior for sendrecv
if "sendrecv" in collective.lower():
input_buf, result_buf, test_buf, nelems_per_iter = build_bufs_sendrecv_ring(size, slots, dtype)
type_size = cp.dtype(dtype).itemsize
bytes_per_iter = nelems_per_iter * type_size
def slot_view(buf, slot_idx):
start = slot_idx * nelems_per_iter
end = start + nelems_per_iter
return buf[start:end]
# Iteration-aware executor call (rotates slot each iteration)
def executor_func_iter(stream, i, do_verify=False, **vk):
slot = i % slots
in_slot = slot_view(input_buf, slot)
out_slot = slot_view(result_buf, slot)
if do_verify:
# Fill per-iteration input slot with unique (rank, i) pattern
fill_data_kernel = vk["fill_kernel"]
test_data_kernel = vk["test_kernel"]
nblocks = vk["nblocks"]
nthreads = vk["nthreads"]
nelems = vk["nelems_per_iter"]
test_buf_local = vk["test_buf"]
rank = vk["rank"]
num_ranks = vk["num_ranks"]
fill_params = pack(in_slot) + struct.pack("Q", nelems) + pack(rank, i)
fill_data_kernel.launch_kernel(fill_params, nblocks, nthreads, 0, stream)
# Execute exactly one per-iteration message: bytes_per_iter == user --size
executor.execute(
mscclpp_group.my_rank,
in_slot.data.ptr,
out_slot.data.ptr,
in_slot.nbytes,
out_slot.nbytes,
dtype_to_mscclpp_dtype(dtype),
execution_plan,
stream.ptr,
packet_type,
)
if do_verify:
# Validate the output slot for this iteration i
test_params = (
pack(out_slot, test_buf_local)
+ struct.pack("Q", nelems)
+ pack(num_ranks, rank, i)
)
test_data_kernel.launch_kernel(test_params, nblocks, nthreads, 0, stream)
# One-shot sentinel check (slot 0)
mscclpp_group.barrier()
print("per-iter size= ", bytes_per_iter, "bytes, slots=", slots, "total buffer bytes=", input_buf.nbytes)
# Fill whole result with sentinel then run ONE iter (i=0)
result_buf.fill(cp.asarray(123.0, dtype=dtype))
cp.cuda.runtime.deviceSynchronize()
stream = cp.cuda.Stream(non_blocking=True)
with stream:
executor_func_iter(stream, 0)
stream.synchronize()
# Count changes only in slot 0 region
out0 = slot_view(result_buf, 0)
changed = cp.count_nonzero(out0 != cp.asarray(123.0, dtype=dtype)).item()
print("changed elements in slot0:", changed, "out of", out0.size)
cp.cuda.runtime.deviceSynchronize()
mscclpp_group.barrier()
# Correctness: fills + executes + tests with unique i and rotating slots
bench_correctness(
collective,
slot_view(input_buf, 0), # placeholder; real slot chosen per i
slot_view(result_buf, 0), # placeholder; real slot chosen per i
test_buf,
dtype_str,
mscclpp_group.my_rank,
mscclpp_group.nranks,
n_iters,
executor_func_iter,
)
mscclpp_group.barrier()
# Timing (CUDA graph captures n_iters launches with varying slot pointers)
execution_time = bench_time(n_iters, n_graph_iters, executor_func_iter)
if npkit_dump_dir is not None:
npkit.dump(npkit_dump_dir)
npkit.shutdown()
print(
f"Rank: {mscclpp_group.my_rank} Execution time: {execution_time} us, "
f"per-iter data size: {bytes_per_iter} bytes dtype: {dtype().dtype.name} "
f"bandwidth: {bytes_per_iter / (execution_time * 1e-6) / (1024**3):.2f} GB/s, "
f"packet type: {packet_type}, slots: {slots}"
)
else:
raise RuntimeError(
f"This rewritten executor_test.py currently specializes sendrecv. "
f"Plan collective was: {collective}"
)
executor = None
mscclpp_group = None
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-path", "--execution_plan_path", type=str, required=True)
parser.add_argument("--size", type=str, required=True, help="PER-ITERATION bytes (e.g., 1K, 4M, 1G)")
parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation")
parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, float32, int32")
parser.add_argument("--packet_type", type=str, default="LL16", help="Choose from LL8, LL16")
parser.add_argument("--n_iters", type=int, default=10)
parser.add_argument("--n_graph_iters", type=int, default=10)
parser.add_argument("--slots", type=int, default=4, help="ring buffer depth; rotates slot = iter % slots")
args = parser.parse_args()
packet_type = PacketType.LL16
if args.packet_type == "LL8":
packet_type = PacketType.LL8
per_iter_size = parse_size(args.size)
main(
args.execution_plan_path,
per_iter_size,
args.in_place,
args.dtype,
packet_type,
args.n_iters,
args.n_graph_iters,
args.slots,
)

View File

@@ -9,190 +9,82 @@ from mscclpp.language.program import *
from mscclpp.language.collectives import *
def send_recv_test_ring_even_ranks(name, nnodes, gpus_per_node):
nranks = nnodes * gpus_per_node
if nranks < 2:
raise ValueError("This test requires at least 2 ranks")
if nranks % 2 != 0:
raise ValueError(
f"This odd/even ring schedule requires an even number of ranks, got {nranks}"
)
collective = TestCollective(nranks, 1, 1)
def send_recv_test(name, nnodes, gpus_per_node, split_mask):
gpu_size = nnodes * gpus_per_node
collective = TestCollective(gpu_size, 1, 1)
with CollectiveProgram(
name,
collective,
nranks,
gpu_size,
protocol="Simple",
num_threads_per_block=1024,
use_double_scratch_buffer=False,
min_message_size=0,
max_message_size=2**64 - 1,
instances=2,
instances=4
):
next_channels = {}
prev_channels = {}
# Creating separate port channels for next and prev directions.
# When prev and next are the same peer (e.g., 2-node ring), both channels go to the same peer
# and get distinct tags. To ensure cross-rank tag matching (rank A's prev_channel signal
# arrives at rank B's next_channel wait), we create channels in opposite order for the
# "higher" rank so that tags cross-match:
# Lower rank: [next(tag0), prev(tag1)]
# Higher rank: [prev(tag0), next(tag1)]
# Then lower.prev(tag1) == higher.next(tag1) ✓ and higher.prev(tag0) == lower.next(tag0) ✓
# When prev != next (3+ nodes), each channel targets a different peer so each gets tag 0
# and this ordering doesn't matter.
group_size = split_mask + 1
num_groups = gpu_size // group_size
next_channels = {} # channel for sending to next rank
prev_channels = {} # channel for receiving from prev rank
prev_next_ids = {}
for node in range(nnodes):
for gpu in range(gpus_per_node):
global_rank_id = gpu + gpus_per_node * node
position_in_group = global_rank_id & split_mask
group_id = global_rank_id // group_size
next_group_id = (group_id + 1) % num_groups
next_global_rank_id = next_group_id * group_size + position_in_group
prev_group_id = (group_id - 1 + num_groups) % num_groups
prev_global_rank_id = prev_group_id * group_size + position_in_group
if prev_global_rank_id == next_global_rank_id and global_rank_id > prev_global_rank_id:
# Higher rank: create prev first, then next (swapped order)
prev_channels[global_rank_id] = PortChannel(prev_global_rank_id, global_rank_id)
next_channels[global_rank_id] = PortChannel(next_global_rank_id, global_rank_id)
else:
# Lower rank or different peers: create next first, then prev
next_channels[global_rank_id] = PortChannel(next_global_rank_id, global_rank_id)
prev_channels[global_rank_id] = PortChannel(prev_global_rank_id, global_rank_id)
prev_next_ids[global_rank_id] = (prev_global_rank_id, next_global_rank_id)
# --------------------------------------------------------------
# Classic ring across all ranks:
# prev = (rank - 1 + nranks) % nranks
# next = (rank + 1) % nranks
# --------------------------------------------------------------
for rank in range(nranks):
prev_rank = (rank - 1 + nranks) % nranks
next_rank = (rank + 1) % nranks
# sync with the next rank and the previous rank in the group
for node in range(nnodes):
for gpu in range(gpus_per_node):
global_rank_id = gpu + gpus_per_node * node
prev_global_rank_id, next_global_rank_id = prev_next_ids[global_rank_id]
prev_channels[global_rank_id].signal(tb=0, data_sync=SyncType.none)
next_channels[global_rank_id].wait(tb=0, data_sync=SyncType.after)
# Deterministic channel creation order
if (rank & 1) == 0:
next_channels[rank] = PortChannel(next_rank, rank)
prev_channels[rank] = PortChannel(prev_rank, rank)
else:
prev_channels[rank] = PortChannel(prev_rank, rank)
next_channels[rank] = PortChannel(next_rank, rank)
src_rank = Rank(global_rank_id)
src_buffer = src_rank.get_input_buffer()
dst_rank = Rank(next_global_rank_id)
dst_buffer = dst_rank.get_output_buffer()
# --------------------------------------------------------------
# --------------------------------------------------------------
# Ring send/recv with explicit ACK
#
# Data path:
# sender: put_with_signal() to next
# receiver: wait() from prev
#
# ACK path:
# receiver: signal() back to prev after data is available
# sender: wait() for ACK from next before proceeding
#
# Even ranks: send first, then recv, then ACK prev, then wait ACK
# Odd ranks : recv first, then ACK prev, then send, then wait ACK
# --------------------------------------------------------------
for rank in range(nranks):
prev_rank = (rank - 1 + nranks) % nranks
next_rank = (rank + 1) % nranks
next_channels[global_rank_id].put_with_signal(dst_buffer[:], src_buffer[:], tb=0)
prev_channels[global_rank_id].wait(tb=0, data_sync=SyncType.none)
src_rank = Rank(rank)
next_rank_obj = Rank(next_rank)
src_buf = src_rank.get_input_buffer()
next_out_buf = next_rank_obj.get_output_buffer()
src_chunk = src_buf[0:src_buf.size]
dst_chunk = next_out_buf[0:next_out_buf.size]
ch_to_next = next_channels[rank]
ch_from_prev = prev_channels[rank]
if (rank & 1) == 0:
# Send data to next and signal arrival
ch_to_next.put_with_signal(
dst_chunk,
src_chunk,
tb=0,
)
# Wait for data from prev to become visible locally
ch_from_prev.wait(
tb=0,
data_sync=SyncType.after,
)
# Ack back to prev that this rank has observed/consumed input
ch_from_prev.signal(
tb=0,
)
# Wait for next rank to ack our outgoing transfer
ch_to_next.wait(
tb=0,
)
else:
# Wait for data from prev first
ch_from_prev.wait(
tb=0,
data_sync=SyncType.after,
)
# Ack back to prev that this rank has observed/consumed input
ch_from_prev.signal(
tb=0,
)
# Then send data to next
ch_to_next.put_with_signal(
dst_chunk,
src_chunk,
tb=0,
)
# Wait for next rank to ack our outgoing transfer
ch_to_next.wait(
tb=0,
)
# --------------------------------------------------------------
# Ring send/recv
#
# Even ranks: send first, then wait
# Odd ranks : wait first, then send
#
# This is safe for an even-sized ring and avoids the
# single-rank-starter wave.
# --------------------------------------------------------------
'''
for rank in range(nranks):
prev_rank = (rank - 1 + nranks) % nranks
next_rank = (rank + 1) % nranks
src_rank = Rank(rank)
next_rank_obj = Rank(next_rank)
src_buf = src_rank.get_input_buffer()
next_out_buf = next_rank_obj.get_output_buffer()
src_chunk = src_buf[0:src_buf.size]
dst_chunk = next_out_buf[0:next_out_buf.size]
ch_to_next = next_channels[rank]
ch_from_prev = prev_channels[rank]
if (rank & 1) == 0:
ch_to_next.put_with_signal_and_flush(
dst_chunk,
src_chunk,
tb=0,
)
ch_from_prev.wait(
tb=0,
data_sync=SyncType.after,
)
else:
ch_from_prev.wait(
tb=0,
data_sync=SyncType.after,
)
ch_to_next.put_with_signal_and_flush(
dst_chunk,
src_chunk,
tb=0,
)
'''
print(JSON())
# ----------------------------------------------------------------------
# CLI
# ----------------------------------------------------------------------
parser = argparse.ArgumentParser()
parser.add_argument("--name", type=str, required=True, help="name of the program")
parser.add_argument("--name", type=str, help="name of the program")
parser.add_argument("--nnodes", type=int, default=1, help="number of nodes")
parser.add_argument("--gpus_per_node", type=int, required=True, help="number of GPUs per node")
parser.add_argument("--gpus_per_node", type=int, help="number of gpus per node")
parser.add_argument("--split_mask", type=lambda x: int(x, 0), default=0x3, help="split mask (e.g. 0x3)")
args = parser.parse_args()
send_recv_test_ring_even_ranks(
args.name,
args.nnodes,
args.gpus_per_node,
send_recv_test(
args.name, args.nnodes, args.gpus_per_node, args.split_mask
)