Files
mscclpp/tools/npkit/npkit_trace_generator.py
Binyang Li fa95e82e18 Fix CI/CD pipeline issues (#773)
This pull request updates the deployment pipeline to allow custom CMake
arguments to be passed to the pip install process on remote VMs.

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-07 08:41:51 -07:00

278 lines
12 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
import os
import json
from queue import Queue
def parse_npkit_event_header(npkit_event_header_path):
npkit_event_def = {"id_to_type": {}, "type_to_id": {}}
executor_ops = [
"NOP",
"BARRIER",
"PUT",
"PUT_PACKETS",
"READ_PUT_PACKETS",
"PUT_WITH_SIGNAL",
"PUT_WITH_SIGNAL_AND_FLUSH",
"GET",
"COPY",
"COPY_PACKETS",
"UNPACK_PACKETS",
"SIGNAL",
"WAIT",
"FLUSH",
"REDUCE",
"REDUCE_PACKETS",
"REDUCE_COPY_PACKETS",
"REDUCE_SEND",
"REDUCE_SEND_PACKETS",
"REDUCE_COPY_SEND_PACKETS",
"READ_REDUCE",
"READ_REDUCE_SEND",
"MULTI_LOAD_REDUCE_STORE",
"RELAXED_SIGNAL",
"RELAXED_WAIT",
"PIPELINE",
"SEM_RELEASE",
"SEM_ACQUIRE",
]
executor_op_to_offset = {}
for executor_op in executor_ops:
executor_op_to_offset[executor_op] = len(executor_op_to_offset)
with open(npkit_event_header_path, "r") as f:
lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0]
line_idx = 0
while line_idx < len(lines):
if lines[line_idx].startswith("#define NPKIT_EVENT_"):
fields = lines[line_idx].split()
if len(fields) == 3:
event_type = fields[1]
event_id = int(fields[2], 0)
if lines[line_idx].startswith("#define NPKIT_EVENT_EXECUTOR_OP_BASE"):
for executor_op in executor_op_to_offset:
real_event_id = event_id + executor_op_to_offset[executor_op]
if "ENTRY" in lines[line_idx]:
event_type = "NPKIT_EVENT_EXECUTOR_%s_ENTRY" % executor_op
elif "EXIT" in lines[line_idx]:
event_type = "NPKIT_EVENT_EXECUTOR_%s_EXIT" % executor_op
npkit_event_def["type_to_id"][event_type] = real_event_id
npkit_event_def["id_to_type"][real_event_id] = event_type
else:
npkit_event_def["type_to_id"][event_type] = event_id
npkit_event_def["id_to_type"][event_id] = event_type
line_idx += 1
return npkit_event_def
def parse_gpu_clock_scale(gpu_clock_file_path):
with open(gpu_clock_file_path, "r") as f:
freq_in_khz = f.read()
return float(freq_in_khz) * 1e3 / 1e6
def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path):
with open(cpu_clock_num_file_path, "r") as f:
num = float(f.read())
with open(cpu_clock_den_file_path, "r") as f:
den = float(f.read())
return den / num / 1e6
def parse_gpu_event(event_bytes):
return {
"id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False),
"size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False),
"rsvd": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False),
"timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False),
}
def parse_cpu_event(event_bytes):
return {
"id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False),
"size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False),
"slot": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False),
"timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False),
}
def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale):
gpu_event_file_path = os.path.join(npkit_dump_dir, "gpu_events_rank_%d_buf_%d" % (rank, buf_idx))
raw_event_size = 16
curr_cpu_base_time = None
curr_gpu_base_time = None
gpu_events = []
event_type_to_seq = {}
with open(gpu_event_file_path, "rb") as f:
raw_content = f.read()
raw_content_size = len(raw_content)
raw_content_idx = 0
while raw_content_idx < raw_content_size:
parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
if npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_CPU":
curr_cpu_base_time = parsed_gpu_event["timestamp"] / cpu_clock_scale
curr_gpu_base_time = None
elif npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_GPU":
if curr_gpu_base_time is None:
curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale
else:
if curr_gpu_base_time is None:
curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale
event_type = npkit_event_def["id_to_type"][parsed_gpu_event["id"]]
phase = "B" if event_type.endswith("_ENTRY") else "E"
gpu_events.append(
{
"ph": phase,
"ts": curr_cpu_base_time + parsed_gpu_event["timestamp"] / gpu_clock_scale - curr_gpu_base_time,
"pid": rank,
"tid": buf_idx + 1,
}
)
if phase == "B":
if event_type not in event_type_to_seq:
event_type_to_seq[event_type] = 0
gpu_events[-1].update(
{
"name": event_type,
"cat": "GPU",
"args": {
"rank": rank,
"buf_idx": buf_idx,
"seq": event_type_to_seq[event_type],
"rsvd_0": parsed_gpu_event["rsvd"],
"size_0": parsed_gpu_event["size"],
},
}
)
event_type_to_seq[event_type] += 1
else:
gpu_events[-1]["args"] = {"size": parsed_gpu_event["size"], "rsvd": parsed_gpu_event["rsvd"]}
delta_time = gpu_events[-1]["ts"] - gpu_events[-2]["ts"]
gpu_events[-1]["args"]["bw (GB/s)"] = (
0.0 if delta_time == 0.0 else gpu_events[-1]["args"]["size"] / delta_time / 1e3
)
raw_content_idx += raw_event_size
return gpu_events
def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale):
cpu_event_file_path = os.path.join(npkit_dump_dir, "cpu_events_rank_%d_channel_%d" % (rank, channel))
raw_event_size = 16
cpu_events = []
event_type_to_seq = {}
fiber_is_usable = []
fiber_open_ts = []
slot_to_fiber_id = {}
channel_shift = 1000
with open(cpu_event_file_path, "rb") as f:
raw_content = f.read()
raw_content_size = len(raw_content)
raw_content_idx = 0
while raw_content_idx < raw_content_size:
parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
event_type = npkit_event_def["id_to_type"][parsed_cpu_event["id"]]
phase = "B" if event_type.endswith("_ENTRY") else "E"
cpu_events.append({"ph": phase, "ts": parsed_cpu_event["timestamp"] / cpu_clock_scale, "pid": rank})
slot = parsed_cpu_event["slot"]
if phase == "B":
# Open fiber event
fiber_id = 0
while fiber_id < len(fiber_is_usable):
if fiber_is_usable[fiber_id]:
break
fiber_id += 1
if fiber_id == len(fiber_is_usable):
fiber_is_usable.append(True)
fiber_open_ts.append(0.0)
slot_to_fiber_id[slot] = fiber_id
fiber_open_ts[fiber_id] = cpu_events[-1]["ts"]
fiber_is_usable[fiber_id] = False
if event_type not in event_type_to_seq:
event_type_to_seq[event_type] = 0
cpu_events[-1].update(
{
"name": event_type,
"cat": "CPU",
"args": {
"rank": rank,
"channel": channel,
"slot": parsed_cpu_event["slot"],
"seq": event_type_to_seq[event_type],
"size_0": parsed_cpu_event["size"],
},
}
)
event_type_to_seq[event_type] += 1
else:
# Close fiber event
fiber_id = slot_to_fiber_id[slot]
slot_to_fiber_id.pop(slot)
last_ts = fiber_open_ts[fiber_id]
fiber_is_usable[fiber_id] = True
delta_time = max(0.001, cpu_events[-1]["ts"] - last_ts)
cpu_events[-1]["args"] = {"size": parsed_cpu_event["size"]}
cpu_events[-1]["args"]["bw (GB/s)"] = (
0.0 if delta_time == 0.0 else cpu_events[-1]["args"]["size"] / delta_time / 1e3
)
cpu_events[-1]["tid"] = fiber_id + (channel + 1) * channel_shift
raw_content_idx += raw_event_size
return cpu_events
def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def):
files_in_dump_dir = next(os.walk(npkit_dump_dir))[2]
gpu_event_files = [x for x in files_in_dump_dir if x.startswith("gpu_events_rank_")]
cpu_event_files = [x for x in files_in_dump_dir if x.startswith("cpu_events_rank_")]
ranks = list(set([int(x.split("_rank_")[1].split("_")[0]) for x in gpu_event_files]))
buf_indices = list(set([int(x.split("_buf_")[1].split("_")[0]) for x in gpu_event_files]))
channels = list(set([int(x.split("_channel_")[1].split("_")[0]) for x in cpu_event_files]))
trace = {"traceEvents": []}
for rank in ranks:
cpu_clock_den_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_den_rank_%d" % rank)
cpu_clock_num_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_num_rank_%d" % rank)
cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path)
gpu_clock_file_path = os.path.join(npkit_dump_dir, "gpu_clock_rate_rank_%d" % rank)
gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path)
for buf_idx in buf_indices:
gpu_events = parse_gpu_event_file(
npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale
)
trace["traceEvents"].extend(gpu_events)
for channel in channels:
cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale)
trace["traceEvents"].extend(cpu_events)
trace["traceEvents"].sort(key=lambda x: x["ts"])
trace["displayTimeUnit"] = "ns"
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "npkit_event_trace.json"), "w") as f:
json.dump(trace, f)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--npkit_dump_dir", type=str, required=True, help="NPKit dump directory.")
parser.add_argument("--npkit_event_header_path", type=str, required=True, help="Path to npkit_event.h.")
parser.add_argument("--output_dir", type=str, required=True, help="Path to output directory.")
args = parser.parse_args()
npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path)
convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def)