From 27fbddb707902210927b54b15a9bdad331689498 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 17 Mar 2026 21:00:48 +0000 Subject: [PATCH] update the executor so we have message size range --- python/test/executor_test.py | 165 +++++++++++++++++++++++++++++------ 1 file changed, 138 insertions(+), 27 deletions(-) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 250409d9..9649da7b 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -20,7 +20,7 @@ from mpi4py import MPI def parse_dtype(dtype_str): - """Convert a human-readable data type string to a numpy data type.""" + """Convert a human-readable data type string to a CuPy data type.""" dtype_str = dtype_str.strip().lower() if dtype_str == "float16": return cp.float16 @@ -33,18 +33,18 @@ def parse_dtype(dtype_str): def bench_time(n_iters: int, n_graph_iters: int, func): - # capture cuda graph for n_iters of the kernel launch + # Capture CUDA graph for n_iters of the kernel launch stream = cp.cuda.Stream(non_blocking=True) with stream: stream.begin_capture() - for i in range(n_iters): + for _ in range(n_iters): func(stream) graph = stream.end_capture() - # now run a warm up round + # Warm-up round graph.launch(stream) - # now run the benchmark and measure time + # Benchmark and measure time start = cp.cuda.Event() end = cp.cuda.Event() @@ -54,6 +54,7 @@ def bench_time(n_iters: int, n_graph_iters: int, func): end.record(stream) end.synchronize() + # Return average execution time in microseconds return cp.cuda.get_elapsed_time(start, end) / n_iters * 1000.0 / n_graph_iters @@ -84,11 +85,16 @@ def bench_correctness( file_dir = os.path.dirname(os.path.abspath(__file__)) fill_data_kernel = KernelBuilder( - file="executor_test_verifier.cu", kernel_name=fill_data_kernel_name, file_dir=file_dir + file="executor_test_verifier.cu", + kernel_name=fill_data_kernel_name, + file_dir=file_dir, ).get_compiled_kernel() test_data_kernel = KernelBuilder( - file="executor_test_verifier.cu", kernel_name=test_data_kernel_name, file_dir=file_dir + file="executor_test_verifier.cu", + kernel_name=test_data_kernel_name, + file_dir=file_dir, ).get_compiled_kernel() + nblocks = 64 nthreads = 1024 @@ -98,27 +104,72 @@ def bench_correctness( for i in range(n_iters): fill_data_params = pack(input_buf) + struct.pack("Q", input_buf.nbytes // type_size) + pack(rank, i) fill_data_kernel.launch_kernel(fill_data_params, nblocks, nthreads, 0, stream) + func(stream) + test_data_params = ( - pack(result_buf, test_buf) + struct.pack("Q", input_buf.nbytes // type_size) + pack(num_ranks, rank, i) + pack(result_buf, test_buf) + + struct.pack("Q", input_buf.nbytes // type_size) + + pack(num_ranks, rank, i) ) test_data_kernel.launch_kernel(test_data_params, nblocks, nthreads, 0, stream) + graph = stream.end_capture() + graph.launch(stream) stream.synchronize() def parse_size(size_str): - """Convert a human-readable buffer size string to an integer.""" + """Convert a human-readable buffer size string to an integer (bytes).""" size_str = size_str.strip() if not size_str: - raise ValueError("Size string can not be empty") + raise ValueError("Size string cannot be empty") + units = {"K": 1024, "M": 1024**2, "G": 1024**3} if size_str[-1].upper() in units: return int(size_str[:-1]) * units[size_str[-1].upper()] - else: - return int(size_str) + return int(size_str) +def parse_size_list(size_arg): + """ + Accept: + - single size: '1M' + - comma-separated list: '1K,2K,4K' + - geometric range: '1K:64K:2' -> start:end:factor + + Returns a list of integer sizes in bytes. + """ + size_arg = size_arg.strip() + + if "," in size_arg: + return [parse_size(x) for x in size_arg.split(",")] + + if ":" in size_arg: + parts = size_arg.split(":") + if len(parts) != 3: + raise ValueError("Range format must be start:end:factor, e.g. 1K:64K:2") + + start = parse_size(parts[0]) + end = parse_size(parts[1]) + factor = int(parts[2]) + + if start <= 0: + raise ValueError("Start must be positive") + if end < start: + raise ValueError("End must be >= start") + if factor <= 1: + raise ValueError("Factor must be greater than 1") + + sizes = [] + current = start + while current <= end: + sizes.append(current) + current *= factor + + return sizes + + return [parse_size(size_arg)] def dtype_to_mscclpp_dtype(dtype): if dtype == cp.float16: @@ -140,22 +191,23 @@ def build_bufs( num_ranks: int, ): type_size = cp.dtype(dtype).itemsize - assert (size % type_size) == 0, "size %d not multiple of type size %d" % (size, type_size) + assert (size % type_size) == 0, f"size {size} not multiple of type size {type_size}" nelems = size // type_size if "allgather" in collective: - assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) + assert (nelems % num_ranks) == 0, f"nelems {nelems} not multiple of num_ranks {num_ranks}" nelems_input = nelems if in_place else nelems // num_ranks else: nelems_input = nelems if "reducescatter" in collective: - assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) + assert (nelems % num_ranks) == 0, f"nelems {nelems} not multiple of num_ranks {num_ranks}" nelems_output = nelems // num_ranks else: nelems_output = nelems result_buf = GpuBuffer(nelems_output, dtype=dtype) + if in_place: if "allgather" in collective: input_buf = cp.split(result_buf, num_ranks)[rank] @@ -176,7 +228,7 @@ def build_bufs( def main( execution_plan_path: str, - size: int, + sizes: list[int], in_place: bool = True, dtype_str: str = "float16", packet_type: PacketType = PacketType.LL16, @@ -184,14 +236,18 @@ def main( n_graph_iters: int = 10, ): mscclpp_group = CommGroup(MPI.COMM_WORLD) - cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use() + nranks = mscclpp_group.nranks + my_rank = mscclpp_group.my_rank + + cp.cuda.Device(my_rank % mscclpp_group.nranks_per_node).use() + executor = Executor(mscclpp_group.communicator) npkit_dump_dir = env().npkit_dump_dir if npkit_dump_dir != "": - npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan(execution_plan_path, mscclpp_group.my_rank) - collective = execution_plan.collective + npkit.init(my_rank) + execution_plan = ExecutionPlan(execution_plan_path, my_rank) + collective = execution_plan.collective dtype = parse_dtype(dtype_str) input_buf, result_buf, test_buf, nelem = build_bufs( collective, @@ -300,9 +356,55 @@ def main( executor_func, ) - mscclpp_group.barrier() - execution_time = bench_time(n_iters, n_graph_iters, executor_func) - if npkit_dump_dir is not None: + executor_func = lambda stream, in_buf=input_buf, out_buf=result_buf: executor.execute( + my_rank, + in_buf.data.ptr, + out_buf.data.ptr, + in_buf.nbytes, + out_buf.nbytes, + dtype_to_mscclpp_dtype(dtype), + execution_plan, + stream.ptr, + packet_type, + ) + + mscclpp_group.barrier() + + # Optional correctness check + # bench_correctness( + # collective, + # input_buf, + # result_buf, + # test_buf, + # dtype_str, + # my_rank, + # nranks, + # n_iters, + # executor_func, + # ) + + mscclpp_group.barrier() + execution_time = bench_time(n_iters, n_graph_iters, executor_func) + mscclpp_group.barrier() + + if my_rank == 0: + msg_size = size + bw = result_buf.nbytes / execution_time / 1e3 # GB/s + latency = execution_time # us + + print( + f"{nranks:8d} {msg_size:18d} {bw:12.2f} " + f"{latency:14.2f} {str(packet_type):>12}" + ) + + # Release buffers for this size + input_buf = None + result_buf = None + test_buf = None + + mscclpp_group.barrier() + + if npkit_dump_dir != "": npkit.dump(npkit_dump_dir) npkit.shutdown() print( @@ -317,8 +419,16 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-path", "--execution_plan_path", type=str, required=True) - parser.add_argument("--size", type=str, required=True) - parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") + parser.add_argument( + "--size", + type=str, + required=True, + help=( + "Single size (e.g. 1M), comma-separated list (e.g. 1K,2K,4K), " + "or range start:end:factor (e.g. 1K:64K:2)" + ), + ) + parser.add_argument("--in_place", action="store_true", help="Flag to define an in-place operation") parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, float32, int32") parser.add_argument("--packet_type", type=str, default="LL16", help="Choose from LL8, LL16") parser.add_argument("--n_iters", type=int, default=10) @@ -329,10 +439,11 @@ if __name__ == "__main__": if args.packet_type == "LL8": packet_type = PacketType.LL8 - buffer_size = parse_size(args.size) + buffer_sizes = parse_size_list(args.size) + main( args.execution_plan_path, - buffer_size, + buffer_sizes, args.in_place, args.dtype, packet_type,