Files
composable_kernel/script/ninja_json_converter.py
Aviral Goel ab68c9d384 chore(copyright): update copyright header for script directory (#3184)
* chore(copyright): update copyright header for tile_engine directory

* chore(copyright): update copyright header for script directory

---------

Co-authored-by: Vidyasagar Ananthan <vanantha@amd.com>
2025-11-11 11:26:01 -08:00

550 lines
19 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
Converts .ninja_log files into Chrome's about:tracing format.
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Optional, Iterator
class BuildTarget:
"""Represents a single build target with timing information."""
def __init__(self, start_time: int, end_time: int, output_name: str, cmd_hash: str):
self.start_time = int(start_time)
self.end_time = int(end_time)
self.cmd_hash = cmd_hash
self.duration = self.end_time - self.start_time
self.targets = [output_name] # List of target names for this command hash
@property
def category(self) -> str:
"""Categorize the build target based on file extension."""
# Use the first target for categorization
primary_target = self.targets[0] if self.targets else ""
ext = Path(primary_target).suffix.lower()
if ext in [".o", ".obj"]:
return "compile"
elif ext in [".a", ".lib"]:
return "archive"
elif ext in [".so", ".dll", ".dylib"]:
return "link_shared"
elif ext in [".exe", ".out"]:
return "link_executable"
elif "test" in primary_target.lower():
return "test"
else:
return "other"
@property
def output_name(self) -> str:
"""Get the primary output name (for backward compatibility)."""
return self.targets[0] if self.targets else ""
class ThreadScheduler:
"""Simulates thread allocation for parallelism analysis."""
def __init__(self, legacy_mode: bool = False):
self.workers: List[int] = []
self.legacy_mode = legacy_mode
def allocate_thread(self, target: BuildTarget) -> int:
"""Allocate a thread for the given target."""
if self.legacy_mode:
# Legacy algorithm from old ninjatracer
for worker in range(len(self.workers)):
if self.workers[worker] >= target.end_time:
self.workers[worker] = target.start_time
return worker
self.workers.append(target.start_time)
return len(self.workers) - 1
else:
# New algorithm
for i, worker_end_time in enumerate(self.workers):
if worker_end_time <= target.start_time:
self.workers[i] = target.end_time
return i
# No available worker, create a new one
self.workers.append(target.end_time)
return len(self.workers) - 1
class NinjaLogParser:
"""Parser for ninja build log files."""
def __init__(self, show_all_builds: bool = False):
self.show_all_builds = show_all_builds
def parse_log_file(self, log_path: str) -> List[BuildTarget]:
"""Parse the ninja log file and return build targets."""
if not os.path.exists(log_path):
raise FileNotFoundError(f"Ninja log file not found: {log_path}")
with open(log_path, "r", encoding="utf-8") as file:
lines = file.readlines()
if not lines:
raise ValueError("Empty ninja log file")
# Parse and validate header
header = lines[0].strip()
version_match = re.match(r"^# ninja log v(\d+)$", header)
if not version_match:
raise ValueError(f"Invalid ninja log header: {header}")
version = int(version_match.group(1))
if version < 5:
raise ValueError(f"Unsupported ninja log version: {version}")
# Skip additional header line for version 6
start_line = 2 if version > 5 else 1
targets: Dict[str, BuildTarget] = {}
last_end_time = 0
for line_num, line in enumerate(lines[start_line:], start=start_line + 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 5:
print(
f"Warning: Skipping malformed line {line_num}: {line}",
file=sys.stderr,
)
continue
try:
start_time, end_time, _, output_name, cmd_hash = parts[:5]
start_time, end_time = int(start_time), int(end_time)
# Handle incremental builds
if not self.show_all_builds and end_time < last_end_time:
targets.clear()
last_end_time = end_time
# Group targets by command hash
if cmd_hash not in targets:
targets[cmd_hash] = BuildTarget(
start_time, end_time, output_name, cmd_hash
)
else:
# Update with the latest timing and add output
existing = targets[cmd_hash]
existing.start_time = min(existing.start_time, start_time)
existing.end_time = max(existing.end_time, end_time)
existing.duration = existing.end_time - existing.start_time
existing.targets.append(output_name)
except (ValueError, IndexError) as e:
print(f"Warning: Error parsing line {line_num}: {e}", file=sys.stderr)
continue
return sorted(targets.values(), key=lambda t: t.end_time, reverse=True)
class FTimeTraceReader:
"""Reads and processes Clang -ftime-trace JSON files."""
def __init__(self, granularity_us: int = 50000):
self.granularity_us = granularity_us
def read_trace_file(self, trace_path: str) -> Optional[Dict]:
"""Read and parse a Clang time trace file."""
try:
with open(trace_path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, IOError):
return None
def filter_events(self, trace_data: Dict) -> List[Dict]:
"""Filter trace events based on criteria."""
if "traceEvents" not in trace_data:
return []
filtered_events = []
for event in trace_data["traceEvents"]:
# Only include complete events (ph=X) that meet duration threshold
if (
event.get("ph") == "X"
and event.get("dur", 0) >= self.granularity_us
and not event.get("name", "").startswith("Total")
):
filtered_events.append(event)
return filtered_events
def adjust_event_timing(
self, event: Dict, target: BuildTarget, pid: int, tid: int
) -> Dict:
"""Adjust event timing to align with ninja build timing."""
ninja_duration_us = target.duration * 1000
# Validate event duration against ninja timing
if event.get("dur", 0) > ninja_duration_us:
print(
f"Warning: Clang trace event duration ({event['dur']}μs) exceeds "
f"ninja duration ({ninja_duration_us}μs) for {target.output_name}",
file=sys.stderr,
)
return None
# Adjust event timing
adjusted_event = event.copy()
adjusted_event["pid"] = pid
adjusted_event["tid"] = tid
adjusted_event["ts"] += target.start_time * 1000 # Offset by ninja start time
return adjusted_event
class ChromeTraceGenerator:
"""Generates Chrome tracing format from build targets."""
def __init__(
self,
process_id: int = 1,
embed_ftime_traces: bool = False,
granularity_us: int = 50000,
ninja_log_dir: Optional[str] = None,
legacy_format: bool = False,
):
self.process_id = process_id
self.scheduler = ThreadScheduler(legacy_mode=legacy_format)
self.embed_ftime_traces = embed_ftime_traces
self.ninja_log_dir = ninja_log_dir
self.ftime_reader = (
FTimeTraceReader(granularity_us) if embed_ftime_traces else None
)
self.legacy_format = legacy_format
def find_ftime_trace_files(self, target: BuildTarget) -> List[str]:
"""Find Clang -ftime-trace files for a build target."""
if not self.ninja_log_dir:
return []
trace_files = []
# Look for .json files adjacent to object files
obj_path = Path(self.ninja_log_dir) / target.output_name
json_path = obj_path.with_suffix(".json")
if json_path.exists():
trace_files.append(str(json_path))
return trace_files
def generate_ftime_events(self, target: BuildTarget, tid: int) -> Iterator[Dict]:
"""Generate Clang -ftime-trace events for a target."""
if not self.embed_ftime_traces or not self.ftime_reader:
return
trace_files = self.find_ftime_trace_files(target)
for trace_file in trace_files:
trace_data = self.ftime_reader.read_trace_file(trace_file)
if not trace_data:
continue
filtered_events = self.ftime_reader.filter_events(trace_data)
for event in filtered_events:
adjusted_event = self.ftime_reader.adjust_event_timing(
event, target, self.process_id, tid
)
if adjusted_event:
yield adjusted_event
def generate_trace_events(self, targets: List[BuildTarget]) -> List[Dict]:
"""Generate Chrome trace events from build targets."""
events = []
for target in targets:
thread_id = self.scheduler.allocate_thread(target)
# Add main ninja build event
if self.legacy_format:
# Legacy format: join multiple targets with commas, use "targets" category, empty args
target_name = (
", ".join(target.targets)
if len(target.targets) > 1
else target.output_name
)
ninja_event = {
"name": target_name,
"cat": "targets",
"ph": "X", # Complete event
"ts": target.start_time * 1000, # Convert to microseconds
"dur": target.duration * 1000, # Convert to microseconds
"pid": self.process_id,
"tid": thread_id,
"args": {},
}
else:
# New format: smart categorization, detailed args
ninja_event = {
"name": target.output_name,
"cat": target.category,
"ph": "X", # Complete event
"ts": target.start_time * 1000, # Convert to microseconds
"dur": target.duration * 1000, # Convert to microseconds
"pid": self.process_id,
"tid": thread_id,
"args": {
"output": target.output_name,
"duration_ms": target.duration,
"cmd_hash": target.cmd_hash,
},
}
events.append(ninja_event)
# Add embedded Clang -ftime-trace events
if self.embed_ftime_traces:
ftime_events = list(self.generate_ftime_events(target, thread_id))
events.extend(ftime_events)
if ftime_events:
print(
f"Embedded {len(ftime_events)} -ftime-trace events for {target.output_name}",
file=sys.stderr,
)
return events
class BuildAnalyzer:
"""Analyzes build performance and provides statistics."""
def __init__(self, targets: List[BuildTarget]):
self.targets = targets
def get_build_summary(self) -> Dict:
"""Generate build performance summary."""
if not self.targets:
return {}
total_duration = sum(t.duration for t in self.targets)
total_targets = len(self.targets)
# Category statistics
category_stats = {}
for target in self.targets:
cat = target.category
if cat not in category_stats:
category_stats[cat] = {"count": 0, "total_time": 0}
category_stats[cat]["count"] += 1
category_stats[cat]["total_time"] += target.duration
# Top slowest targets
slowest_targets = sorted(self.targets, key=lambda t: t.duration, reverse=True)[
:10
]
return {
"total_targets": total_targets,
"total_duration_ms": total_duration,
"total_duration_sec": total_duration / 1000,
"average_duration_ms": total_duration / total_targets
if total_targets > 0
else 0,
"category_stats": category_stats,
"slowest_targets": [
{
"name": t.output_name,
"duration_ms": t.duration,
"category": t.category,
}
for t in slowest_targets
],
}
def print_summary(self):
"""Print build summary to stderr."""
summary = self.get_build_summary()
if not summary:
print("No build data available", file=sys.stderr)
return
print("\n=== Build Summary ===", file=sys.stderr)
print(f"Total targets: {summary['total_targets']}", file=sys.stderr)
print(f"Total time: {summary['total_duration_sec']:.2f}s", file=sys.stderr)
print(
f"Average time per target: {summary['average_duration_ms']:.2f}ms",
file=sys.stderr,
)
print("\nBy category:", file=sys.stderr)
for category, stats in summary["category_stats"].items():
avg_time = stats["total_time"] / stats["count"] if stats["count"] > 0 else 0
print(
f" {category:15} {stats['count']:6} targets "
f"{stats['total_time'] / 1000:8.2f}s "
f"(avg: {avg_time / 1000:.3f}s)",
file=sys.stderr,
)
print("\nSlowest targets:", file=sys.stderr)
for i, target in enumerate(summary["slowest_targets"][:5], 1):
print(
f" {i:2}. {target['name']} ({target['duration_ms']}ms, {target['category']})",
file=sys.stderr,
)
def create_argument_parser() -> argparse.ArgumentParser:
"""Create command line argument parser."""
parser = argparse.ArgumentParser(
description="Convert ninja build logs to Chrome tracing format",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s build/.ninja_log # Basic usage
%(prog)s build/.ninja_log --output trace.json # Save to file
%(prog)s build/.ninja_log --summary # Show build summary
%(prog)s build/.ninja_log --show-all # Include all builds
%(prog)s build/.ninja_log --embed-ftime-trace # Include Clang timing data
%(prog)s build/.ninja_log --granularity 10000 # Custom granularity threshold
""",
)
parser.add_argument(
"ninja_logs",
nargs="+", # Accept one or more ninja log files
help="Path(s) to the .ninja_log file(s)",
)
parser.add_argument("-o", "--output", help="Output file (default: stdout)")
parser.add_argument(
"--show-all", action="store_true", help="Show all builds, not just the last one"
)
parser.add_argument(
"--summary", action="store_true", help="Print build summary to stderr"
)
parser.add_argument(
"--pretty", action="store_true", help="Pretty-print JSON output"
)
parser.add_argument(
"--embed-ftime-trace",
action="store_true",
help="Embed Clang -ftime-trace JSON files found adjacent to targets",
)
parser.add_argument(
"--granularity",
type=int,
default=50000,
help="Minimum duration for -ftime-trace events in microseconds (default: 50000)",
)
parser.add_argument(
"--legacy-format",
action="store_true",
help='Output in legacy format compatible with old ninjatracer (simple JSON array, all categories as "targets", empty args)',
)
return parser
def main():
"""Main entry point."""
parser = create_argument_parser()
args = parser.parse_args()
try:
# Process multiple ninja log files
all_events = []
for pid, ninja_log_path in enumerate(args.ninja_logs):
# Parse ninja log
log_parser = NinjaLogParser(show_all_builds=args.show_all)
targets = log_parser.parse_log_file(ninja_log_path)
if not targets:
print(
f"No build targets found in ninja log: {ninja_log_path}",
file=sys.stderr,
)
continue
# Determine ninja log directory for -ftime-trace files
ninja_log_dir = (
os.path.dirname(os.path.abspath(ninja_log_path))
if args.embed_ftime_trace
else None
)
# Generate trace events for this log file
trace_generator = ChromeTraceGenerator(
process_id=pid, # Use different PID for each log file
embed_ftime_traces=args.embed_ftime_trace,
granularity_us=args.granularity,
ninja_log_dir=ninja_log_dir,
legacy_format=args.legacy_format,
)
events = trace_generator.generate_trace_events(targets)
all_events.extend(events)
# Print summary if requested (for each log file)
if args.summary:
print(f"\n=== Summary for {ninja_log_path} ===", file=sys.stderr)
analyzer = BuildAnalyzer(targets)
analyzer.print_summary()
if not all_events:
print("No build targets found in any ninja log files", file=sys.stderr)
return 1
# Output format logic
if args.legacy_format:
# Legacy format: always output simple JSON array
json_kwargs = {"indent": 2} if args.pretty else {}
json_output = json.dumps(all_events, **json_kwargs)
elif args.output or args.pretty:
# Enhanced format with metadata (when saving to file or pretty printing)
trace_data = {
"traceEvents": all_events,
"displayTimeUnit": "ms",
"systemTraceEvents": "SystemTraceData",
"otherData": {"version": "1.0", "generator": "ninja_json_converter.py"},
}
json_kwargs = {"indent": 2} if args.pretty else {}
json_output = json.dumps(trace_data, **json_kwargs)
else:
# Original format (simple JSON array to stdout)
json_output = json.dumps(all_events)
if args.output:
with open(args.output, "w") as f:
f.write(json_output)
print(f"Trace written to {args.output}", file=sys.stderr)
else:
print(json_output)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())