Files
mscclpp/tools/npkit/npkit_trace_generator.py
Caio Rocha 7eb3ff701a Supporting New Packet Kernel Operation at Executor (#677)
This PR introduces three new operations to enhance flexibility and
performance at executor.

One operation can be invoked directly via the DSL API and two operations
are created through fusion of existing operations, reducing overhead and
improving efficiency.

1. Port Channel Put Packet (Direct DSL API Call): Sends data from pkt
format to the remote side in pkt format via the port channel. Both
source and destination buffers must be scratch.

2. Reduce Copy Packet (Fusion):
Reduce Packet+Copy Packet=Reduce Copy Packet
Triggered when the destination buffer of Reduce Packet matches the
source buffer of Copy Packet.
Purpose: Combine reduction and copy into a single step for better
performance.

3. Reduce Copy Send Packet (Fusion):
Reduce Copy Packet+Put Packet=Reduce Copy Send Packet (when dst buffer
of Reduce Copy Packet matches src buffer of Put Packet)
Reduce Copy Packet+Read Put Packet=Reduce Copy Send Packet (when dst pkt
buffer of Reduce Copy Packet matches src buffer of Read Put Packet)
Purpose: Combine reduction, copy, and send operations into one optimized
pipeline.


Fusion Diagram
Reduce Packet + Copy Packet → Reduce Copy Packet
Reduce Copy Packet + Put Packet → Reduce Copy Send Packet
Reduce Copy Packet + Read Put Packet → Reduce Copy Send Packet

Beyond this, this PR adjust the AllReduce 2 Node algorithm:

Message Size  |  Latency (µs)
        1K            |     15.34
        2K            |     15.88
        4K            |     15.71
        8K            |     16.01
        16K          |     15.88
        32K          |     16.21
        64K          |     16.90
        128K        |     18.24
        256K        |     20.39
        512K        |     25.26
        1M           |     32.74
        2M           |     53.64
2025-11-13 14:08:44 -08:00

278 lines
12 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
import os
import json
from queue import Queue
def parse_npkit_event_header(npkit_event_header_path):
npkit_event_def = {"id_to_type": {}, "type_to_id": {}}
executor_ops = [
"NOP",
"BARRIER",
"PUT",
"PUT_PACKET",
"READ_PUT_PACKET",
"PUT_WITH_SIGNAL",
"PUT_WITH_SIGNAL_AND_FLUSH",
"GET",
"COPY",
"COPY_PACKET",
"TRANSFORM_TO_PACKET",
"SIGNAL",
"WAIT",
"FLUSH",
"REDUCE",
"REDUCE_PACKET",
"REDUCE_COPY_PACKETS",
"REDUCE_SEND",
"REDUCE_SEND_PACKET",
"REDUCE_COPY_SEND_PACKETS",
"READ_REDUCE_COPY",
"READ_REDUCE_COPY_SEND",
"MULTI_LOAD_REDUCE_STORE",
"RELAXED_SIGNAL",
"RELAXED_WAIT",
"PIPELINE",
"SEM_RELEASE",
"SEM_ACQUIRE",
]
executor_op_to_offset = {}
for executor_op in executor_ops:
executor_op_to_offset[executor_op] = len(executor_op_to_offset)
with open(npkit_event_header_path, "r") as f:
lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0]
line_idx = 0
while line_idx < len(lines):
if lines[line_idx].startswith("#define NPKIT_EVENT_"):
fields = lines[line_idx].split()
if len(fields) == 3:
event_type = fields[1]
event_id = int(fields[2], 0)
if lines[line_idx].startswith("#define NPKIT_EVENT_EXECUTOR_OP_BASE"):
for executor_op in executor_op_to_offset:
real_event_id = event_id + executor_op_to_offset[executor_op]
if "ENTRY" in lines[line_idx]:
event_type = "NPKIT_EVENT_EXECUTOR_%s_ENTRY" % executor_op
elif "EXIT" in lines[line_idx]:
event_type = "NPKIT_EVENT_EXECUTOR_%s_EXIT" % executor_op
npkit_event_def["type_to_id"][event_type] = real_event_id
npkit_event_def["id_to_type"][real_event_id] = event_type
else:
npkit_event_def["type_to_id"][event_type] = event_id
npkit_event_def["id_to_type"][event_id] = event_type
line_idx += 1
return npkit_event_def
def parse_gpu_clock_scale(gpu_clock_file_path):
with open(gpu_clock_file_path, "r") as f:
freq_in_khz = f.read()
return float(freq_in_khz) * 1e3 / 1e6
def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path):
with open(cpu_clock_num_file_path, "r") as f:
num = float(f.read())
with open(cpu_clock_den_file_path, "r") as f:
den = float(f.read())
return den / num / 1e6
def parse_gpu_event(event_bytes):
return {
"id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False),
"size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False),
"rsvd": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False),
"timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False),
}
def parse_cpu_event(event_bytes):
return {
"id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False),
"size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False),
"slot": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False),
"timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False),
}
def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale):
gpu_event_file_path = os.path.join(npkit_dump_dir, "gpu_events_rank_%d_buf_%d" % (rank, buf_idx))
raw_event_size = 16
curr_cpu_base_time = None
curr_gpu_base_time = None
gpu_events = []
event_type_to_seq = {}
with open(gpu_event_file_path, "rb") as f:
raw_content = f.read()
raw_content_size = len(raw_content)
raw_content_idx = 0
while raw_content_idx < raw_content_size:
parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
if npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_CPU":
curr_cpu_base_time = parsed_gpu_event["timestamp"] / cpu_clock_scale
curr_gpu_base_time = None
elif npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_GPU":
if curr_gpu_base_time is None:
curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale
else:
if curr_gpu_base_time is None:
curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale
event_type = npkit_event_def["id_to_type"][parsed_gpu_event["id"]]
phase = "B" if event_type.endswith("_ENTRY") else "E"
gpu_events.append(
{
"ph": phase,
"ts": curr_cpu_base_time + parsed_gpu_event["timestamp"] / gpu_clock_scale - curr_gpu_base_time,
"pid": rank,
"tid": buf_idx + 1,
}
)
if phase == "B":
if event_type not in event_type_to_seq:
event_type_to_seq[event_type] = 0
gpu_events[-1].update(
{
"name": event_type,
"cat": "GPU",
"args": {
"rank": rank,
"buf_idx": buf_idx,
"seq": event_type_to_seq[event_type],
"rsvd_0": parsed_gpu_event["rsvd"],
"size_0": parsed_gpu_event["size"],
},
}
)
event_type_to_seq[event_type] += 1
else:
gpu_events[-1]["args"] = {"size": parsed_gpu_event["size"], "rsvd": parsed_gpu_event["rsvd"]}
delta_time = gpu_events[-1]["ts"] - gpu_events[-2]["ts"]
gpu_events[-1]["args"]["bw (GB/s)"] = (
0.0 if delta_time == 0.0 else gpu_events[-1]["args"]["size"] / delta_time / 1e3
)
raw_content_idx += raw_event_size
return gpu_events
def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale):
cpu_event_file_path = os.path.join(npkit_dump_dir, "cpu_events_rank_%d_channel_%d" % (rank, channel))
raw_event_size = 16
cpu_events = []
event_type_to_seq = {}
fiber_is_usable = []
fiber_open_ts = []
slot_to_fiber_id = {}
channel_shift = 1000
with open(cpu_event_file_path, "rb") as f:
raw_content = f.read()
raw_content_size = len(raw_content)
raw_content_idx = 0
while raw_content_idx < raw_content_size:
parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
event_type = npkit_event_def["id_to_type"][parsed_cpu_event["id"]]
phase = "B" if event_type.endswith("_ENTRY") else "E"
cpu_events.append({"ph": phase, "ts": parsed_cpu_event["timestamp"] / cpu_clock_scale, "pid": rank})
slot = parsed_cpu_event["slot"]
if phase == "B":
# Open fiber event
fiber_id = 0
while fiber_id < len(fiber_is_usable):
if fiber_is_usable[fiber_id]:
break
fiber_id += 1
if fiber_id == len(fiber_is_usable):
fiber_is_usable.append(True)
fiber_open_ts.append(0.0)
slot_to_fiber_id[slot] = fiber_id
fiber_open_ts[fiber_id] = cpu_events[-1]["ts"]
fiber_is_usable[fiber_id] = False
if event_type not in event_type_to_seq:
event_type_to_seq[event_type] = 0
cpu_events[-1].update(
{
"name": event_type,
"cat": "CPU",
"args": {
"rank": rank,
"channel": channel,
"slot": parsed_cpu_event["slot"],
"seq": event_type_to_seq[event_type],
"size_0": parsed_cpu_event["size"],
},
}
)
event_type_to_seq[event_type] += 1
else:
# Close fiber event
fiber_id = slot_to_fiber_id[slot]
slot_to_fiber_id.pop(slot)
last_ts = fiber_open_ts[fiber_id]
fiber_is_usable[fiber_id] = True
delta_time = max(0.001, cpu_events[-1]["ts"] - last_ts)
cpu_events[-1]["args"] = {"size": parsed_cpu_event["size"]}
cpu_events[-1]["args"]["bw (GB/s)"] = (
0.0 if delta_time == 0.0 else cpu_events[-1]["args"]["size"] / delta_time / 1e3
)
cpu_events[-1]["tid"] = fiber_id + (channel + 1) * channel_shift
raw_content_idx += raw_event_size
return cpu_events
def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def):
files_in_dump_dir = next(os.walk(npkit_dump_dir))[2]
gpu_event_files = [x for x in files_in_dump_dir if x.startswith("gpu_events_rank_")]
cpu_event_files = [x for x in files_in_dump_dir if x.startswith("cpu_events_rank_")]
ranks = list(set([int(x.split("_rank_")[1].split("_")[0]) for x in gpu_event_files]))
buf_indices = list(set([int(x.split("_buf_")[1].split("_")[0]) for x in gpu_event_files]))
channels = list(set([int(x.split("_channel_")[1].split("_")[0]) for x in cpu_event_files]))
trace = {"traceEvents": []}
for rank in ranks:
cpu_clock_den_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_den_rank_%d" % rank)
cpu_clock_num_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_num_rank_%d" % rank)
cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path)
gpu_clock_file_path = os.path.join(npkit_dump_dir, "gpu_clock_rate_rank_%d" % rank)
gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path)
for buf_idx in buf_indices:
gpu_events = parse_gpu_event_file(
npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale
)
trace["traceEvents"].extend(gpu_events)
for channel in channels:
cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale)
trace["traceEvents"].extend(cpu_events)
trace["traceEvents"].sort(key=lambda x: x["ts"])
trace["displayTimeUnit"] = "ns"
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "npkit_event_trace.json"), "w") as f:
json.dump(trace, f)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--npkit_dump_dir", type=str, required=True, help="NPKit dump directory.")
parser.add_argument("--npkit_event_header_path", type=str, required=True, help="Path to npkit_event.h.")
parser.add_argument("--output_dir", type=str, required=True, help="Path to output directory.")
args = parser.parse_args()
npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path)
convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def)