From 68690ecdcd5c8e5a9184463b54434157c4efc8dc Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 10 Apr 2026 17:21:50 +0000 Subject: [PATCH] revert dsl --- executor_test.py | 323 ------------------ .../default_algos/mscclpp_send_recv.py | 222 ++++-------- 2 files changed, 57 insertions(+), 488 deletions(-) delete mode 100644 executor_test.py diff --git a/executor_test.py b/executor_test.py deleted file mode 100644 index 232f2f8b..00000000 --- a/executor_test.py +++ /dev/null @@ -1,323 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import argparse -from mscclpp import ( - DataType, - Executor, - ExecutionPlan, - PacketType, - npkit, - env, -) -from mscclpp import CommGroup, GpuBuffer -from mscclpp.utils import KernelBuilder, pack -import os -import struct - -import cupy as cp -from mpi4py import MPI - - -def parse_dtype(dtype_str): - dtype_str = dtype_str.strip().lower() - if dtype_str == "float16": - return cp.float16 - elif dtype_str == "float32": - return cp.float32 - elif dtype_str == "int32": - return cp.int32 - else: - raise ValueError(f"Unknown data type: {dtype_str}") - - -def parse_size(size_str): - size_str = size_str.strip() - if not size_str: - raise ValueError("Size string can not be empty") - units = {"K": 1024, "M": 1024**2, "G": 1024**3} - if size_str[-1].upper() in units: - return int(size_str[:-1]) * units[size_str[-1].upper()] - else: - return int(size_str) - - -def dtype_to_mscclpp_dtype(dtype): - if dtype == cp.float16: - return DataType.float16 - elif dtype == cp.float32: - return DataType.float32 - elif dtype == cp.int32: - return DataType.int32 - else: - raise ValueError(f"Unknown data type: {dtype}") - - -def bench_time(n_iters: int, n_graph_iters: int, func_iter): - """ - Capture CUDA graph for n_iters launches. func_iter(stream, i) must vary slot by i. - """ - stream = cp.cuda.Stream(non_blocking=True) - with stream: - stream.begin_capture() - for i in range(n_iters): - func_iter(stream, i) - graph = stream.end_capture() - - # warmup - graph.launch(stream) - - start = cp.cuda.Event() - end = cp.cuda.Event() - - start.record(stream) - for _ in range(n_graph_iters): - graph.launch(stream) - end.record(stream) - end.synchronize() - - # us per iteration - return cp.cuda.get_elapsed_time(start, end) / n_iters * 1000.0 / n_graph_iters - - -def bench_correctness( - collective: str, - input_slot: cp.ndarray, - result_slot: cp.ndarray, - test_buf: cp.ndarray, - dtype_str: str, - rank: int, - num_ranks: int, - n_iters: int, - func_iter, -): - """ - Correctness check on ONE per-iteration slot view (input_slot/result_slot change per i via func_iter). - We pass the per-iteration element count to verifier kernels. - """ - type_size = cp.dtype(parse_dtype(dtype_str)).itemsize - nelems_per_iter = input_slot.nbytes // type_size - - print("collective: ", collective) - - fill_data_kernel_name = "fill_data_%s" % dtype_str - if "allgather" in collective: - coll = "all_gather" - elif "reducescatter" in collective: - coll = "reduce_scatter" - elif "allreduce" in collective: - coll = "all_reduce" - else: - coll = "sendrecv" - test_data_kernel_name = "test_data_%s_%s" % (coll, dtype_str) - - file_dir = os.path.dirname(os.path.abspath(__file__)) - fill_data_kernel = KernelBuilder( - file="executor_test_verifier.cu", kernel_name=fill_data_kernel_name, file_dir=file_dir - ).get_compiled_kernel() - test_data_kernel = KernelBuilder( - file="executor_test_verifier.cu", kernel_name=test_data_kernel_name, file_dir=file_dir - ).get_compiled_kernel() - - nblocks = 64 - nthreads = 1024 - - stream = cp.cuda.Stream(non_blocking=True) - with stream: - stream.begin_capture() - for i in range(n_iters): - # WARNING: input_slot/result_slot variables are placeholders; actual slot views are chosen inside func_iter. - # We only use these kernels with the CURRENT slot views computed below for this iteration. - func_iter(stream, i, do_verify=True, fill_kernel=fill_data_kernel, test_kernel=test_data_kernel, - nblocks=nblocks, nthreads=nthreads, nelems_per_iter=nelems_per_iter, - test_buf=test_buf, rank=rank, num_ranks=num_ranks) - graph = stream.end_capture() - - graph.launch(stream) - stream.synchronize() - - -def build_bufs_sendrecv_ring(size_bytes: int, slots: int, dtype: cp.dtype): - """ - Build ring buffers for sendrecv: - - per-iteration message bytes = size_bytes - - total allocated bytes per buffer = slots * size_bytes - """ - type_size = cp.dtype(dtype).itemsize - assert (size_bytes % type_size) == 0, "size not multiple of dtype size" - - nelems_per_iter = size_bytes // type_size - total_nelems = nelems_per_iter * slots - - input_buf = GpuBuffer(total_nelems, dtype=dtype) - result_buf = GpuBuffer(total_nelems, dtype=dtype) - test_buf = cp.zeros(nelems_per_iter, dtype=dtype) # expected for one iteration - - return input_buf, result_buf, test_buf, nelems_per_iter - - -def main( - execution_plan_path: str, - size: int, # per-iteration bytes - in_place: bool = True, - dtype_str: str = "float16", - packet_type: PacketType = PacketType.LL16, - n_iters: int = 10, - n_graph_iters: int = 10, - slots: int = 4, # ring buffer depth -): - mscclpp_group = CommGroup(MPI.COMM_WORLD) - cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use() - - executor = Executor(mscclpp_group.communicator) - - npkit_dump_dir = env().npkit_dump_dir - if npkit_dump_dir != "": - npkit.init(mscclpp_group.my_rank) - - execution_plan = ExecutionPlan(execution_plan_path, mscclpp_group.my_rank) - collective = execution_plan.collective - - dtype = parse_dtype(dtype_str) - - # We only change allocation/behavior for sendrecv - if "sendrecv" in collective.lower(): - input_buf, result_buf, test_buf, nelems_per_iter = build_bufs_sendrecv_ring(size, slots, dtype) - type_size = cp.dtype(dtype).itemsize - bytes_per_iter = nelems_per_iter * type_size - - def slot_view(buf, slot_idx): - start = slot_idx * nelems_per_iter - end = start + nelems_per_iter - return buf[start:end] - - # Iteration-aware executor call (rotates slot each iteration) - def executor_func_iter(stream, i, do_verify=False, **vk): - slot = i % slots - in_slot = slot_view(input_buf, slot) - out_slot = slot_view(result_buf, slot) - - if do_verify: - # Fill per-iteration input slot with unique (rank, i) pattern - fill_data_kernel = vk["fill_kernel"] - test_data_kernel = vk["test_kernel"] - nblocks = vk["nblocks"] - nthreads = vk["nthreads"] - nelems = vk["nelems_per_iter"] - test_buf_local = vk["test_buf"] - rank = vk["rank"] - num_ranks = vk["num_ranks"] - - fill_params = pack(in_slot) + struct.pack("Q", nelems) + pack(rank, i) - fill_data_kernel.launch_kernel(fill_params, nblocks, nthreads, 0, stream) - - # Execute exactly one per-iteration message: bytes_per_iter == user --size - executor.execute( - mscclpp_group.my_rank, - in_slot.data.ptr, - out_slot.data.ptr, - in_slot.nbytes, - out_slot.nbytes, - dtype_to_mscclpp_dtype(dtype), - execution_plan, - stream.ptr, - packet_type, - ) - - if do_verify: - # Validate the output slot for this iteration i - test_params = ( - pack(out_slot, test_buf_local) - + struct.pack("Q", nelems) - + pack(num_ranks, rank, i) - ) - test_data_kernel.launch_kernel(test_params, nblocks, nthreads, 0, stream) - - # One-shot sentinel check (slot 0) - mscclpp_group.barrier() - print("per-iter size= ", bytes_per_iter, "bytes, slots=", slots, "total buffer bytes=", input_buf.nbytes) - - # Fill whole result with sentinel then run ONE iter (i=0) - result_buf.fill(cp.asarray(123.0, dtype=dtype)) - cp.cuda.runtime.deviceSynchronize() - - stream = cp.cuda.Stream(non_blocking=True) - with stream: - executor_func_iter(stream, 0) - stream.synchronize() - - # Count changes only in slot 0 region - out0 = slot_view(result_buf, 0) - changed = cp.count_nonzero(out0 != cp.asarray(123.0, dtype=dtype)).item() - print("changed elements in slot0:", changed, "out of", out0.size) - - cp.cuda.runtime.deviceSynchronize() - mscclpp_group.barrier() - - # Correctness: fills + executes + tests with unique i and rotating slots - bench_correctness( - collective, - slot_view(input_buf, 0), # placeholder; real slot chosen per i - slot_view(result_buf, 0), # placeholder; real slot chosen per i - test_buf, - dtype_str, - mscclpp_group.my_rank, - mscclpp_group.nranks, - n_iters, - executor_func_iter, - ) - - mscclpp_group.barrier() - - # Timing (CUDA graph captures n_iters launches with varying slot pointers) - execution_time = bench_time(n_iters, n_graph_iters, executor_func_iter) - - if npkit_dump_dir is not None: - npkit.dump(npkit_dump_dir) - npkit.shutdown() - - print( - f"Rank: {mscclpp_group.my_rank} Execution time: {execution_time} us, " - f"per-iter data size: {bytes_per_iter} bytes dtype: {dtype().dtype.name} " - f"bandwidth: {bytes_per_iter / (execution_time * 1e-6) / (1024**3):.2f} GB/s, " - f"packet type: {packet_type}, slots: {slots}" - ) - - else: - raise RuntimeError( - f"This rewritten executor_test.py currently specializes sendrecv. " - f"Plan collective was: {collective}" - ) - - executor = None - mscclpp_group = None - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-path", "--execution_plan_path", type=str, required=True) - parser.add_argument("--size", type=str, required=True, help="PER-ITERATION bytes (e.g., 1K, 4M, 1G)") - parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") - parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, float32, int32") - parser.add_argument("--packet_type", type=str, default="LL16", help="Choose from LL8, LL16") - parser.add_argument("--n_iters", type=int, default=10) - parser.add_argument("--n_graph_iters", type=int, default=10) - parser.add_argument("--slots", type=int, default=4, help="ring buffer depth; rotates slot = iter % slots") - args = parser.parse_args() - - packet_type = PacketType.LL16 - if args.packet_type == "LL8": - packet_type = PacketType.LL8 - - per_iter_size = parse_size(args.size) - - main( - args.execution_plan_path, - per_iter_size, - args.in_place, - args.dtype, - packet_type, - args.n_iters, - args.n_graph_iters, - args.slots, - ) diff --git a/python/mscclpp/default_algos/mscclpp_send_recv.py b/python/mscclpp/default_algos/mscclpp_send_recv.py index d4ce0004..caa0575d 100644 --- a/python/mscclpp/default_algos/mscclpp_send_recv.py +++ b/python/mscclpp/default_algos/mscclpp_send_recv.py @@ -9,190 +9,82 @@ from mscclpp.language.program import * from mscclpp.language.collectives import * -def send_recv_test_ring_even_ranks(name, nnodes, gpus_per_node): - nranks = nnodes * gpus_per_node - - if nranks < 2: - raise ValueError("This test requires at least 2 ranks") - if nranks % 2 != 0: - raise ValueError( - f"This odd/even ring schedule requires an even number of ranks, got {nranks}" - ) - - collective = TestCollective(nranks, 1, 1) - +def send_recv_test(name, nnodes, gpus_per_node, split_mask): + gpu_size = nnodes * gpus_per_node + collective = TestCollective(gpu_size, 1, 1) with CollectiveProgram( name, collective, - nranks, + gpu_size, protocol="Simple", num_threads_per_block=1024, use_double_scratch_buffer=False, min_message_size=0, max_message_size=2**64 - 1, - instances=2, + instances=4 ): - next_channels = {} - prev_channels = {} + # Creating separate port channels for next and prev directions. + # When prev and next are the same peer (e.g., 2-node ring), both channels go to the same peer + # and get distinct tags. To ensure cross-rank tag matching (rank A's prev_channel signal + # arrives at rank B's next_channel wait), we create channels in opposite order for the + # "higher" rank so that tags cross-match: + # Lower rank: [next(tag0), prev(tag1)] + # Higher rank: [prev(tag0), next(tag1)] + # Then lower.prev(tag1) == higher.next(tag1) ✓ and higher.prev(tag0) == lower.next(tag0) ✓ + # When prev != next (3+ nodes), each channel targets a different peer so each gets tag 0 + # and this ordering doesn't matter. + group_size = split_mask + 1 + num_groups = gpu_size // group_size + next_channels = {} # channel for sending to next rank + prev_channels = {} # channel for receiving from prev rank + prev_next_ids = {} + for node in range(nnodes): + for gpu in range(gpus_per_node): + global_rank_id = gpu + gpus_per_node * node + position_in_group = global_rank_id & split_mask + group_id = global_rank_id // group_size + next_group_id = (group_id + 1) % num_groups + next_global_rank_id = next_group_id * group_size + position_in_group + prev_group_id = (group_id - 1 + num_groups) % num_groups + prev_global_rank_id = prev_group_id * group_size + position_in_group + if prev_global_rank_id == next_global_rank_id and global_rank_id > prev_global_rank_id: + # Higher rank: create prev first, then next (swapped order) + prev_channels[global_rank_id] = PortChannel(prev_global_rank_id, global_rank_id) + next_channels[global_rank_id] = PortChannel(next_global_rank_id, global_rank_id) + else: + # Lower rank or different peers: create next first, then prev + next_channels[global_rank_id] = PortChannel(next_global_rank_id, global_rank_id) + prev_channels[global_rank_id] = PortChannel(prev_global_rank_id, global_rank_id) + prev_next_ids[global_rank_id] = (prev_global_rank_id, next_global_rank_id) - # -------------------------------------------------------------- - # Classic ring across all ranks: - # prev = (rank - 1 + nranks) % nranks - # next = (rank + 1) % nranks - # -------------------------------------------------------------- - for rank in range(nranks): - prev_rank = (rank - 1 + nranks) % nranks - next_rank = (rank + 1) % nranks + # sync with the next rank and the previous rank in the group + for node in range(nnodes): + for gpu in range(gpus_per_node): + global_rank_id = gpu + gpus_per_node * node + prev_global_rank_id, next_global_rank_id = prev_next_ids[global_rank_id] + prev_channels[global_rank_id].signal(tb=0, data_sync=SyncType.none) + next_channels[global_rank_id].wait(tb=0, data_sync=SyncType.after) - # Deterministic channel creation order - if (rank & 1) == 0: - next_channels[rank] = PortChannel(next_rank, rank) - prev_channels[rank] = PortChannel(prev_rank, rank) - else: - prev_channels[rank] = PortChannel(prev_rank, rank) - next_channels[rank] = PortChannel(next_rank, rank) + src_rank = Rank(global_rank_id) + src_buffer = src_rank.get_input_buffer() + dst_rank = Rank(next_global_rank_id) + dst_buffer = dst_rank.get_output_buffer() - # -------------------------------------------------------------- - # -------------------------------------------------------------- - # Ring send/recv with explicit ACK - # - # Data path: - # sender: put_with_signal() to next - # receiver: wait() from prev - # - # ACK path: - # receiver: signal() back to prev after data is available - # sender: wait() for ACK from next before proceeding - # - # Even ranks: send first, then recv, then ACK prev, then wait ACK - # Odd ranks : recv first, then ACK prev, then send, then wait ACK - # -------------------------------------------------------------- - for rank in range(nranks): - prev_rank = (rank - 1 + nranks) % nranks - next_rank = (rank + 1) % nranks + next_channels[global_rank_id].put_with_signal(dst_buffer[:], src_buffer[:], tb=0) + prev_channels[global_rank_id].wait(tb=0, data_sync=SyncType.none) - src_rank = Rank(rank) - next_rank_obj = Rank(next_rank) - - src_buf = src_rank.get_input_buffer() - next_out_buf = next_rank_obj.get_output_buffer() - - src_chunk = src_buf[0:src_buf.size] - dst_chunk = next_out_buf[0:next_out_buf.size] - - ch_to_next = next_channels[rank] - ch_from_prev = prev_channels[rank] - - if (rank & 1) == 0: - # Send data to next and signal arrival - ch_to_next.put_with_signal( - dst_chunk, - src_chunk, - tb=0, - ) - - # Wait for data from prev to become visible locally - ch_from_prev.wait( - tb=0, - data_sync=SyncType.after, - ) - - # Ack back to prev that this rank has observed/consumed input - ch_from_prev.signal( - tb=0, - ) - - # Wait for next rank to ack our outgoing transfer - ch_to_next.wait( - tb=0, - ) - - else: - # Wait for data from prev first - ch_from_prev.wait( - tb=0, - data_sync=SyncType.after, - ) - - # Ack back to prev that this rank has observed/consumed input - ch_from_prev.signal( - tb=0, - ) - - # Then send data to next - ch_to_next.put_with_signal( - dst_chunk, - src_chunk, - tb=0, - ) - - # Wait for next rank to ack our outgoing transfer - ch_to_next.wait( - tb=0, - ) - # -------------------------------------------------------------- - # Ring send/recv - # - # Even ranks: send first, then wait - # Odd ranks : wait first, then send - # - # This is safe for an even-sized ring and avoids the - # single-rank-starter wave. - # -------------------------------------------------------------- - ''' - for rank in range(nranks): - prev_rank = (rank - 1 + nranks) % nranks - next_rank = (rank + 1) % nranks - - src_rank = Rank(rank) - next_rank_obj = Rank(next_rank) - - src_buf = src_rank.get_input_buffer() - next_out_buf = next_rank_obj.get_output_buffer() - - src_chunk = src_buf[0:src_buf.size] - dst_chunk = next_out_buf[0:next_out_buf.size] - - ch_to_next = next_channels[rank] - ch_from_prev = prev_channels[rank] - - if (rank & 1) == 0: - ch_to_next.put_with_signal_and_flush( - dst_chunk, - src_chunk, - tb=0, - ) - ch_from_prev.wait( - tb=0, - data_sync=SyncType.after, - ) - else: - ch_from_prev.wait( - tb=0, - data_sync=SyncType.after, - ) - ch_to_next.put_with_signal_and_flush( - dst_chunk, - src_chunk, - tb=0, - ) - - ''' print(JSON()) -# ---------------------------------------------------------------------- -# CLI -# ---------------------------------------------------------------------- parser = argparse.ArgumentParser() -parser.add_argument("--name", type=str, required=True, help="name of the program") + +parser.add_argument("--name", type=str, help="name of the program") parser.add_argument("--nnodes", type=int, default=1, help="number of nodes") -parser.add_argument("--gpus_per_node", type=int, required=True, help="number of GPUs per node") +parser.add_argument("--gpus_per_node", type=int, help="number of gpus per node") +parser.add_argument("--split_mask", type=lambda x: int(x, 0), default=0x3, help="split mask (e.g. 0x3)") args = parser.parse_args() -send_recv_test_ring_even_ranks( - args.name, - args.nnodes, - args.gpus_per_node, +send_recv_test( + args.name, args.nnodes, args.gpus_per_node, args.split_mask )