From 1a065dd6ada25cc337135ba2a0f75d1e36122dff Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 6 Apr 2026 20:06:21 +0000 Subject: [PATCH] add help scripts --- copyjson.sh | 17 +++ python/test/executor_test.py | 265 ++++++----------------------------- run-sendrecv2.sh | 12 ++ 3 files changed, 75 insertions(+), 219 deletions(-) create mode 100755 copyjson.sh create mode 100755 run-sendrecv2.sh diff --git a/copyjson.sh b/copyjson.sh new file mode 100755 index 00000000..9e0771e1 --- /dev/null +++ b/copyjson.sh @@ -0,0 +1,17 @@ +#!/bin/bash +set -ex + +# Check if the number of arguments is exactly 1 +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi +export MSCCLPPHOME=/home/azhpcuser/mahdieh/mscclpp-unittest/mscclpp/ + +HOSTFILE=$1 + +parallel-scp -h "$HOSTFILE" -p128 -t1800 -r ./*.json $MSCCLPPHOME + +parallel-scp -h "$HOSTFILE" -p128 -t1800 -r ./python/test/executor_test.py $MSCCLPPHOME/python/test/ + +parallel-scp -h "$HOSTFILE" -p128 -t1800 -r ./python/test/executor_test_verifier.cu $MSCCLPPHOME/python/test/ diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 9773be5b..eeace1a1 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -20,7 +20,7 @@ from mpi4py import MPI def parse_dtype(dtype_str): - """Convert a human-readable data type string to a CuPy data type.""" + """Convert a human-readable data type string to a numpy data type.""" dtype_str = dtype_str.strip().lower() if dtype_str == "float16": return cp.float16 @@ -33,18 +33,18 @@ def parse_dtype(dtype_str): def bench_time(n_iters: int, n_graph_iters: int, func): - # Capture CUDA graph for n_iters of the kernel launch + # capture cuda graph for n_iters of the kernel launch stream = cp.cuda.Stream(non_blocking=True) with stream: stream.begin_capture() - for _ in range(n_iters): + for i in range(n_iters): func(stream) graph = stream.end_capture() - # Warm-up round + # now run a warm up round graph.launch(stream) - # Benchmark and measure time + # now run the benchmark and measure time start = cp.cuda.Event() end = cp.cuda.Event() @@ -54,7 +54,6 @@ def bench_time(n_iters: int, n_graph_iters: int, func): end.record(stream) end.synchronize() - # Return average execution time in microseconds return cp.cuda.get_elapsed_time(start, end) / n_iters * 1000.0 / n_graph_iters @@ -85,16 +84,11 @@ def bench_correctness( file_dir = os.path.dirname(os.path.abspath(__file__)) fill_data_kernel = KernelBuilder( - file="executor_test_verifier.cu", - kernel_name=fill_data_kernel_name, - file_dir=file_dir, + file="executor_test_verifier.cu", kernel_name=fill_data_kernel_name, file_dir=file_dir ).get_compiled_kernel() test_data_kernel = KernelBuilder( - file="executor_test_verifier.cu", - kernel_name=test_data_kernel_name, - file_dir=file_dir, + file="executor_test_verifier.cu", kernel_name=test_data_kernel_name, file_dir=file_dir ).get_compiled_kernel() - nblocks = 64 nthreads = 1024 @@ -104,72 +98,27 @@ def bench_correctness( for i in range(n_iters): fill_data_params = pack(input_buf) + struct.pack("Q", input_buf.nbytes // type_size) + pack(rank, i) fill_data_kernel.launch_kernel(fill_data_params, nblocks, nthreads, 0, stream) - func(stream) - test_data_params = ( - pack(result_buf, test_buf) - + struct.pack("Q", input_buf.nbytes // type_size) - + pack(num_ranks, rank, i) + pack(result_buf, test_buf) + struct.pack("Q", input_buf.nbytes // type_size) + pack(num_ranks, rank, i) ) test_data_kernel.launch_kernel(test_data_params, nblocks, nthreads, 0, stream) - graph = stream.end_capture() - graph.launch(stream) stream.synchronize() def parse_size(size_str): - """Convert a human-readable buffer size string to an integer (bytes).""" + """Convert a human-readable buffer size string to an integer.""" size_str = size_str.strip() if not size_str: - raise ValueError("Size string cannot be empty") - + raise ValueError("Size string can not be empty") units = {"K": 1024, "M": 1024**2, "G": 1024**3} if size_str[-1].upper() in units: return int(size_str[:-1]) * units[size_str[-1].upper()] - return int(size_str) + else: + return int(size_str) -def parse_size_list(size_arg): - """ - Accept: - - single size: '1M' - - comma-separated list: '1K,2K,4K' - - geometric range: '1K:64K:2' -> start:end:factor - - Returns a list of integer sizes in bytes. - """ - size_arg = size_arg.strip() - - if "," in size_arg: - return [parse_size(x) for x in size_arg.split(",")] - - if ":" in size_arg: - parts = size_arg.split(":") - if len(parts) != 3: - raise ValueError("Range format must be start:end:factor, e.g. 1K:64K:2") - - start = parse_size(parts[0]) - end = parse_size(parts[1]) - factor = int(parts[2]) - - if start <= 0: - raise ValueError("Start must be positive") - if end < start: - raise ValueError("End must be >= start") - if factor <= 1: - raise ValueError("Factor must be greater than 1") - - sizes = [] - current = start - while current <= end: - sizes.append(current) - current *= factor - - return sizes - - return [parse_size(size_arg)] def dtype_to_mscclpp_dtype(dtype): if dtype == cp.float16: @@ -191,23 +140,22 @@ def build_bufs( num_ranks: int, ): type_size = cp.dtype(dtype).itemsize - assert (size % type_size) == 0, f"size {size} not multiple of type size {type_size}" + assert (size % type_size) == 0, "size %d not multiple of type size %d" % (size, type_size) nelems = size // type_size if "allgather" in collective: - assert (nelems % num_ranks) == 0, f"nelems {nelems} not multiple of num_ranks {num_ranks}" + assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) nelems_input = nelems if in_place else nelems // num_ranks else: nelems_input = nelems if "reducescatter" in collective: - assert (nelems % num_ranks) == 0, f"nelems {nelems} not multiple of num_ranks {num_ranks}" + assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) nelems_output = nelems // num_ranks else: nelems_output = nelems result_buf = GpuBuffer(nelems_output, dtype=dtype) - if in_place: if "allgather" in collective: input_buf = cp.split(result_buf, num_ranks)[rank] @@ -228,7 +176,7 @@ def build_bufs( def main( execution_plan_path: str, - sizes: list[int], + size: int, in_place: bool = True, dtype_str: str = "float16", packet_type: PacketType = PacketType.LL16, @@ -236,18 +184,14 @@ def main( n_graph_iters: int = 10, ): mscclpp_group = CommGroup(MPI.COMM_WORLD) - nranks = mscclpp_group.nranks - my_rank = mscclpp_group.my_rank - - cp.cuda.Device(my_rank % mscclpp_group.nranks_per_node).use() - + cp.cuda.Device(mscclpp_group.my_rank % mscclpp_group.nranks_per_node).use() executor = Executor(mscclpp_group.communicator) npkit_dump_dir = env().npkit_dump_dir if npkit_dump_dir != "": - npkit.init(my_rank) - - execution_plan = ExecutionPlan(execution_plan_path, my_rank) + npkit.init(mscclpp_group.my_rank) + execution_plan = ExecutionPlan(execution_plan_path, mscclpp_group.my_rank) collective = execution_plan.collective + dtype = parse_dtype(dtype_str) input_buf, result_buf, test_buf, nelem = build_bufs( collective, @@ -258,78 +202,20 @@ def main( mscclpp_group.nranks, ) - # Print header once - if my_rank == 0: - print( - f"{'NRanks':>8} {'Message Size (B)':>18} {'BW (GB/s)':>12} " - f"{'Latency (us)':>14} {'Packet Type':>12}" - ) + executor_func = lambda stream: executor.execute( + mscclpp_group.my_rank, + input_buf.data.ptr, + result_buf.data.ptr, + input_buf.nbytes, + result_buf.nbytes, + dtype_to_mscclpp_dtype(dtype), + execution_plan, + stream.ptr, + packet_type, + ) - for size in sizes: - input_buf, result_buf, test_buf = build_bufs( - collective, - size, - in_place, - dtype, - my_rank, - nranks, - ) - - executor_func = lambda stream, in_buf=input_buf, out_buf=result_buf: executor.execute( - my_rank, - in_buf.data.ptr, - out_buf.data.ptr, - in_buf.nbytes, - out_buf.nbytes, - dtype_to_mscclpp_dtype(dtype), - execution_plan, - stream.ptr, - packet_type, - ) - - #mscclpp_group.barrier() - - # Optional correctness check - # bench_correctness( - # collective, - # input_buf, - # result_buf, - # test_buf, - # dtype_str, - # my_rank, - # nranks, - # n_iters, - # executor_func, - # ) - - mscclpp_group.barrier() - execution_time = bench_time(n_iters, n_graph_iters, executor_func) - #mscclpp_group.barrier() - - if my_rank == 0: - msg_size = size - bw = result_buf.nbytes / execution_time / 1e3 # GB/s - latency = execution_time # us - - print( - f"{nranks:8d} {msg_size:18d} {bw:12.2f} " - f"{latency:14.2f} {str(packet_type):>12}" - ) - - # Release buffers for this size - input_buf = None - result_buf = None - test_buf = None - - #mscclpp_group.barrier() - - if npkit_dump_dir != "": - npkit.dump(npkit_dump_dir) - npkit.shutdown() - - # Print header once - print(f"{'NRanks':>8} {'Message Size (B)':>18} {'BW (GB/s)':>12} {'Latency (us)':>14} {'Packet Type':>12}") - print(f"{nranks:8d} {msg_size:18d} {bw:12.2f} {latency:14.2f} {str(packet_type):>12}") + mscclpp_group.barrier() + print("size= ", size, "nelem= ", nelem) # Sentinel fill: choose something unlikely in your pattern result_buf.fill(cp.float16(123.0)) @@ -357,67 +243,17 @@ def main( executor_func, ) - executor_func = lambda stream, in_buf=input_buf, out_buf=result_buf: executor.execute( - my_rank, - in_buf.data.ptr, - out_buf.data.ptr, - in_buf.nbytes, - out_buf.nbytes, - dtype_to_mscclpp_dtype(dtype), - execution_plan, - stream.ptr, - packet_type, - ) - - mscclpp_group.barrier() - - # Optional correctness check - # bench_correctness( - # collective, - # input_buf, - # result_buf, - # test_buf, - # dtype_str, - # my_rank, - # nranks, - # n_iters, - # executor_func, - # ) - - mscclpp_group.barrier() - execution_time = bench_time(n_iters, n_graph_iters, executor_func) - mscclpp_group.barrier() - - if my_rank == 0: - msg_size = size - bw = result_buf.nbytes / execution_time / 1e3 # GB/s - latency = execution_time # us - - print( - f"{nranks:8d} {msg_size:18d} {bw:12.2f} " - f"{latency:14.2f} {str(packet_type):>12}" - ) - - # Release buffers for this size - input_buf = None - result_buf = None - test_buf = None - - mscclpp_group.barrier() - - if npkit_dump_dir != "": + mscclpp_group.barrier() + execution_time = bench_time(n_iters, n_graph_iters, executor_func) + if npkit_dump_dir is not None: npkit.dump(npkit_dump_dir) npkit.shutdown() - # Only rank 0 reports output - if mscclpp_group.my_rank == 0: - msg_size = result_buf.nbytes - bw = result_buf.nbytes / execution_time / 1e3 # GB/s - latency = execution_time # us - - # Print header once - print(f"{'Message Size (B)':>18} {'BW (GB/s)':>12} {'Latency (us)':>14} {'Packet Type':>12}") - print(f"{msg_size:18d} {bw:12.2f} {latency:14.2f} {str(packet_type):>12}") - + print( + f"Rank: {mscclpp_group.my_rank} Execution time: {execution_time} us, " + f"data size: {result_buf.nbytes} bytes data type: {dtype().dtype.name} " + f"bandwidth: {result_buf.nbytes / (execution_time * 1e-6) / (1024**3):.2f} GB/s, " + f"packet type: {packet_type}" + ) executor = None mscclpp_group = None @@ -425,16 +261,8 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-path", "--execution_plan_path", type=str, required=True) - parser.add_argument( - "--size", - type=str, - required=True, - help=( - "Single size (e.g. 1M), comma-separated list (e.g. 1K,2K,4K), " - "or range start:end:factor (e.g. 1K:64K:2)" - ), - ) - parser.add_argument("--in_place", action="store_true", help="Flag to define an in-place operation") + parser.add_argument("--size", type=str, required=True) + parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, float32, int32") parser.add_argument("--packet_type", type=str, default="LL16", help="Choose from LL8, LL16") parser.add_argument("--n_iters", type=int, default=10) @@ -445,11 +273,10 @@ if __name__ == "__main__": if args.packet_type == "LL8": packet_type = PacketType.LL8 - buffer_sizes = parse_size_list(args.size) - + buffer_size = parse_size(args.size) main( args.execution_plan_path, - buffer_sizes, + buffer_size, args.in_place, args.dtype, packet_type, diff --git a/run-sendrecv2.sh b/run-sendrecv2.sh new file mode 100755 index 00000000..556cc09d --- /dev/null +++ b/run-sendrecv2.sh @@ -0,0 +1,12 @@ +module load mpi/hpcx #mpi/hpcx-mrc #mpi/hpcx-mrc-2.23.1 + +MPI_ARGS="" +MPI_ARGS+=" -x CUDA_VISIBLE_DEVICES=1 -mca coll_hcoll_enable 0 --mca coll ^ucc,hcoll --mca btl tcp,vader,self --mca pml ob1 --mca oob_tcp_if_include enP22p1s0f1 --mca btl_tcp_if_include enP22p1s0f1" +MPI_ARGS+=" -x MSCCLPP_IBV_SO=/opt/microsoft/mrc/Azure-Compute-AI-HPC-Perf-verbs-mrc/libibverbs.so -x UCX_NET_DEVICES=enP22p1s0f1 -x LD_LIBRARY_PATH=/opt/microsoft/mrc/Azure-Compute-AI-HPC-Perf-verbs-mrc/mrc-header-lib:$LD_LIBRARY_PATH" +MPI_ARGS+=" -x MSCCLPP_SOCKET_IFNAME=enP22p1s0f1 -x MSCCLPP_IBV_MODE=host-no-atomic -x VMRC_LIBMRC_SO=/opt/mellanox/doca/lib/aarch64-linux-gnu/libnv_mrc.so" +MPI_ARGS+=" -x VMRC_LIBIBVERBS_SO=/lib/aarch64-linux-gnu/libibverbs.so.1 -x PATH=/home/azhpcuser/mahdieh/mscclpp-unittest/mscclpp/mscclpp/bin/:$PATH " +MPI_ARGS+=" -x MSCCLPP_LOG_LEVEL=ERROR -x MSCCLPP_DEBUG=ERROR -x MSCCLPP_IB_GID_INDEX=3 -x MSCCLPP_HCA_DEVICES=mlx5_0" +MPI_ARGS+=" /home/azhpcuser/mahdieh/mscclpp-unittest/mscclpp/mscclpp/bin/python3 /home/azhpcuser/mahdieh/mscclpp-unittest/mscclpp/python/test/executor_test.py -path /home/azhpcuser/mahdieh/mscclpp-unittest/mscclpp/sendrecv.json" + + +mpirun -np 2 --hostfile ./hosts --map-by ppr:1:node $MPI_ARGS --size 1K