mirror of
https://github.com/microsoft/mscclpp.git
synced 2026-05-11 17:00:22 +00:00
This PR introduces three new operations to enhance flexibility and
performance at executor.
One operation can be invoked directly via the DSL API and two operations
are created through fusion of existing operations, reducing overhead and
improving efficiency.
1. Port Channel Put Packet (Direct DSL API Call): Sends data from pkt
format to the remote side in pkt format via the port channel. Both
source and destination buffers must be scratch.
2. Reduce Copy Packet (Fusion):
Reduce Packet+Copy Packet=Reduce Copy Packet
Triggered when the destination buffer of Reduce Packet matches the
source buffer of Copy Packet.
Purpose: Combine reduction and copy into a single step for better
performance.
3. Reduce Copy Send Packet (Fusion):
Reduce Copy Packet+Put Packet=Reduce Copy Send Packet (when dst buffer
of Reduce Copy Packet matches src buffer of Put Packet)
Reduce Copy Packet+Read Put Packet=Reduce Copy Send Packet (when dst pkt
buffer of Reduce Copy Packet matches src buffer of Read Put Packet)
Purpose: Combine reduction, copy, and send operations into one optimized
pipeline.
Fusion Diagram
Reduce Packet + Copy Packet → Reduce Copy Packet
Reduce Copy Packet + Put Packet → Reduce Copy Send Packet
Reduce Copy Packet + Read Put Packet → Reduce Copy Send Packet
Beyond this, this PR adjust the AllReduce 2 Node algorithm:
Message Size | Latency (µs)
1K | 15.34
2K | 15.88
4K | 15.71
8K | 16.01
16K | 15.88
32K | 16.21
64K | 16.90
128K | 18.24
256K | 20.39
512K | 25.26
1M | 32.74
2M | 53.64
278 lines
12 KiB
Python
278 lines
12 KiB
Python
# Copyright (c) Microsoft Corporation.
|
|
# Licensed under the MIT License.
|
|
|
|
import argparse
|
|
import os
|
|
import json
|
|
|
|
from queue import Queue
|
|
|
|
|
|
def parse_npkit_event_header(npkit_event_header_path):
|
|
npkit_event_def = {"id_to_type": {}, "type_to_id": {}}
|
|
executor_ops = [
|
|
"NOP",
|
|
"BARRIER",
|
|
"PUT",
|
|
"PUT_PACKET",
|
|
"READ_PUT_PACKET",
|
|
"PUT_WITH_SIGNAL",
|
|
"PUT_WITH_SIGNAL_AND_FLUSH",
|
|
"GET",
|
|
"COPY",
|
|
"COPY_PACKET",
|
|
"TRANSFORM_TO_PACKET",
|
|
"SIGNAL",
|
|
"WAIT",
|
|
"FLUSH",
|
|
"REDUCE",
|
|
"REDUCE_PACKET",
|
|
"REDUCE_COPY_PACKETS",
|
|
"REDUCE_SEND",
|
|
"REDUCE_SEND_PACKET",
|
|
"REDUCE_COPY_SEND_PACKETS",
|
|
"READ_REDUCE_COPY",
|
|
"READ_REDUCE_COPY_SEND",
|
|
"MULTI_LOAD_REDUCE_STORE",
|
|
"RELAXED_SIGNAL",
|
|
"RELAXED_WAIT",
|
|
"PIPELINE",
|
|
"SEM_RELEASE",
|
|
"SEM_ACQUIRE",
|
|
]
|
|
executor_op_to_offset = {}
|
|
for executor_op in executor_ops:
|
|
executor_op_to_offset[executor_op] = len(executor_op_to_offset)
|
|
with open(npkit_event_header_path, "r") as f:
|
|
lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0]
|
|
line_idx = 0
|
|
while line_idx < len(lines):
|
|
if lines[line_idx].startswith("#define NPKIT_EVENT_"):
|
|
fields = lines[line_idx].split()
|
|
if len(fields) == 3:
|
|
event_type = fields[1]
|
|
event_id = int(fields[2], 0)
|
|
if lines[line_idx].startswith("#define NPKIT_EVENT_EXECUTOR_OP_BASE"):
|
|
for executor_op in executor_op_to_offset:
|
|
real_event_id = event_id + executor_op_to_offset[executor_op]
|
|
if "ENTRY" in lines[line_idx]:
|
|
event_type = "NPKIT_EVENT_EXECUTOR_%s_ENTRY" % executor_op
|
|
elif "EXIT" in lines[line_idx]:
|
|
event_type = "NPKIT_EVENT_EXECUTOR_%s_EXIT" % executor_op
|
|
npkit_event_def["type_to_id"][event_type] = real_event_id
|
|
npkit_event_def["id_to_type"][real_event_id] = event_type
|
|
else:
|
|
npkit_event_def["type_to_id"][event_type] = event_id
|
|
npkit_event_def["id_to_type"][event_id] = event_type
|
|
line_idx += 1
|
|
return npkit_event_def
|
|
|
|
|
|
def parse_gpu_clock_scale(gpu_clock_file_path):
|
|
with open(gpu_clock_file_path, "r") as f:
|
|
freq_in_khz = f.read()
|
|
return float(freq_in_khz) * 1e3 / 1e6
|
|
|
|
|
|
def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path):
|
|
with open(cpu_clock_num_file_path, "r") as f:
|
|
num = float(f.read())
|
|
with open(cpu_clock_den_file_path, "r") as f:
|
|
den = float(f.read())
|
|
return den / num / 1e6
|
|
|
|
|
|
def parse_gpu_event(event_bytes):
|
|
return {
|
|
"id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False),
|
|
"size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False),
|
|
"rsvd": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False),
|
|
"timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False),
|
|
}
|
|
|
|
|
|
def parse_cpu_event(event_bytes):
|
|
return {
|
|
"id": int.from_bytes(event_bytes[0:1], byteorder="little", signed=False),
|
|
"size": int.from_bytes(event_bytes[1:5], byteorder="little", signed=False),
|
|
"slot": int.from_bytes(event_bytes[5:8], byteorder="little", signed=False),
|
|
"timestamp": int.from_bytes(event_bytes[8:16], byteorder="little", signed=False),
|
|
}
|
|
|
|
|
|
def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale):
|
|
gpu_event_file_path = os.path.join(npkit_dump_dir, "gpu_events_rank_%d_buf_%d" % (rank, buf_idx))
|
|
raw_event_size = 16
|
|
curr_cpu_base_time = None
|
|
curr_gpu_base_time = None
|
|
gpu_events = []
|
|
event_type_to_seq = {}
|
|
with open(gpu_event_file_path, "rb") as f:
|
|
raw_content = f.read()
|
|
raw_content_size = len(raw_content)
|
|
raw_content_idx = 0
|
|
while raw_content_idx < raw_content_size:
|
|
parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
|
|
if npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_CPU":
|
|
curr_cpu_base_time = parsed_gpu_event["timestamp"] / cpu_clock_scale
|
|
curr_gpu_base_time = None
|
|
elif npkit_event_def["id_to_type"][parsed_gpu_event["id"]] == "NPKIT_EVENT_TIME_SYNC_GPU":
|
|
if curr_gpu_base_time is None:
|
|
curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale
|
|
else:
|
|
if curr_gpu_base_time is None:
|
|
curr_gpu_base_time = parsed_gpu_event["timestamp"] / gpu_clock_scale
|
|
event_type = npkit_event_def["id_to_type"][parsed_gpu_event["id"]]
|
|
phase = "B" if event_type.endswith("_ENTRY") else "E"
|
|
gpu_events.append(
|
|
{
|
|
"ph": phase,
|
|
"ts": curr_cpu_base_time + parsed_gpu_event["timestamp"] / gpu_clock_scale - curr_gpu_base_time,
|
|
"pid": rank,
|
|
"tid": buf_idx + 1,
|
|
}
|
|
)
|
|
if phase == "B":
|
|
if event_type not in event_type_to_seq:
|
|
event_type_to_seq[event_type] = 0
|
|
gpu_events[-1].update(
|
|
{
|
|
"name": event_type,
|
|
"cat": "GPU",
|
|
"args": {
|
|
"rank": rank,
|
|
"buf_idx": buf_idx,
|
|
"seq": event_type_to_seq[event_type],
|
|
"rsvd_0": parsed_gpu_event["rsvd"],
|
|
"size_0": parsed_gpu_event["size"],
|
|
},
|
|
}
|
|
)
|
|
event_type_to_seq[event_type] += 1
|
|
else:
|
|
gpu_events[-1]["args"] = {"size": parsed_gpu_event["size"], "rsvd": parsed_gpu_event["rsvd"]}
|
|
delta_time = gpu_events[-1]["ts"] - gpu_events[-2]["ts"]
|
|
gpu_events[-1]["args"]["bw (GB/s)"] = (
|
|
0.0 if delta_time == 0.0 else gpu_events[-1]["args"]["size"] / delta_time / 1e3
|
|
)
|
|
raw_content_idx += raw_event_size
|
|
return gpu_events
|
|
|
|
|
|
def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale):
|
|
cpu_event_file_path = os.path.join(npkit_dump_dir, "cpu_events_rank_%d_channel_%d" % (rank, channel))
|
|
raw_event_size = 16
|
|
cpu_events = []
|
|
event_type_to_seq = {}
|
|
|
|
fiber_is_usable = []
|
|
fiber_open_ts = []
|
|
slot_to_fiber_id = {}
|
|
channel_shift = 1000
|
|
|
|
with open(cpu_event_file_path, "rb") as f:
|
|
raw_content = f.read()
|
|
raw_content_size = len(raw_content)
|
|
raw_content_idx = 0
|
|
while raw_content_idx < raw_content_size:
|
|
parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
|
|
event_type = npkit_event_def["id_to_type"][parsed_cpu_event["id"]]
|
|
phase = "B" if event_type.endswith("_ENTRY") else "E"
|
|
cpu_events.append({"ph": phase, "ts": parsed_cpu_event["timestamp"] / cpu_clock_scale, "pid": rank})
|
|
slot = parsed_cpu_event["slot"]
|
|
if phase == "B":
|
|
# Open fiber event
|
|
fiber_id = 0
|
|
while fiber_id < len(fiber_is_usable):
|
|
if fiber_is_usable[fiber_id]:
|
|
break
|
|
fiber_id += 1
|
|
if fiber_id == len(fiber_is_usable):
|
|
fiber_is_usable.append(True)
|
|
fiber_open_ts.append(0.0)
|
|
slot_to_fiber_id[slot] = fiber_id
|
|
fiber_open_ts[fiber_id] = cpu_events[-1]["ts"]
|
|
fiber_is_usable[fiber_id] = False
|
|
|
|
if event_type not in event_type_to_seq:
|
|
event_type_to_seq[event_type] = 0
|
|
cpu_events[-1].update(
|
|
{
|
|
"name": event_type,
|
|
"cat": "CPU",
|
|
"args": {
|
|
"rank": rank,
|
|
"channel": channel,
|
|
"slot": parsed_cpu_event["slot"],
|
|
"seq": event_type_to_seq[event_type],
|
|
"size_0": parsed_cpu_event["size"],
|
|
},
|
|
}
|
|
)
|
|
event_type_to_seq[event_type] += 1
|
|
else:
|
|
# Close fiber event
|
|
fiber_id = slot_to_fiber_id[slot]
|
|
slot_to_fiber_id.pop(slot)
|
|
last_ts = fiber_open_ts[fiber_id]
|
|
fiber_is_usable[fiber_id] = True
|
|
|
|
delta_time = max(0.001, cpu_events[-1]["ts"] - last_ts)
|
|
cpu_events[-1]["args"] = {"size": parsed_cpu_event["size"]}
|
|
cpu_events[-1]["args"]["bw (GB/s)"] = (
|
|
0.0 if delta_time == 0.0 else cpu_events[-1]["args"]["size"] / delta_time / 1e3
|
|
)
|
|
|
|
cpu_events[-1]["tid"] = fiber_id + (channel + 1) * channel_shift
|
|
|
|
raw_content_idx += raw_event_size
|
|
return cpu_events
|
|
|
|
|
|
def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def):
|
|
files_in_dump_dir = next(os.walk(npkit_dump_dir))[2]
|
|
gpu_event_files = [x for x in files_in_dump_dir if x.startswith("gpu_events_rank_")]
|
|
cpu_event_files = [x for x in files_in_dump_dir if x.startswith("cpu_events_rank_")]
|
|
|
|
ranks = list(set([int(x.split("_rank_")[1].split("_")[0]) for x in gpu_event_files]))
|
|
buf_indices = list(set([int(x.split("_buf_")[1].split("_")[0]) for x in gpu_event_files]))
|
|
channels = list(set([int(x.split("_channel_")[1].split("_")[0]) for x in cpu_event_files]))
|
|
|
|
trace = {"traceEvents": []}
|
|
|
|
for rank in ranks:
|
|
cpu_clock_den_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_den_rank_%d" % rank)
|
|
cpu_clock_num_file_path = os.path.join(npkit_dump_dir, "cpu_clock_period_num_rank_%d" % rank)
|
|
cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path)
|
|
|
|
gpu_clock_file_path = os.path.join(npkit_dump_dir, "gpu_clock_rate_rank_%d" % rank)
|
|
gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path)
|
|
|
|
for buf_idx in buf_indices:
|
|
gpu_events = parse_gpu_event_file(
|
|
npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale
|
|
)
|
|
trace["traceEvents"].extend(gpu_events)
|
|
|
|
for channel in channels:
|
|
cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale)
|
|
trace["traceEvents"].extend(cpu_events)
|
|
|
|
trace["traceEvents"].sort(key=lambda x: x["ts"])
|
|
trace["displayTimeUnit"] = "ns"
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
with open(os.path.join(output_dir, "npkit_event_trace.json"), "w") as f:
|
|
json.dump(trace, f)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--npkit_dump_dir", type=str, required=True, help="NPKit dump directory.")
|
|
parser.add_argument("--npkit_event_header_path", type=str, required=True, help="Path to npkit_event.h.")
|
|
parser.add_argument("--output_dir", type=str, required=True, help="Path to output directory.")
|
|
args = parser.parse_args()
|
|
|
|
npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path)
|
|
convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def)
|