Add multi-file trace parsing and analysis pipeline (#4259)

Extends build time analysis from ROCm/composable_kernel#3644 to handle
multiple trace files across build directories (see #4229):

- pipeline.py: Generic pipeline framework with fluent interface for
composable data processing. Provides parallel processing, progress
tracking, and error handling independent of trace-specific code.
Processes thousands of trace files at default resolution in minutes,
aggregating results into in-memory DataFrames for analysis.
- parse_build.py: Parse all trace files in a build directory
- build_analysis_example.ipynb: Demonstrates pipeline aggregation across
all build files

The pipeline design improves capability (composable operations),
performance (parallel processing), and user-friendliness (fluent API) of
the analysis modules. It enables analyzing compilation patterns across
the entire codebase with all trace data available in pandas DataFrames
for interactive exploration.

---
🔁 Imported from
[ROCm/composable_kernel#3704](https://github.com/ROCm/composable_kernel/pull/3704)
🧑‍💻 Originally authored by @shumway

Co-authored-by: John Shumway <jshumway@amd.com>
Co-authored-by: Illia Silin <98187287+illsilin@users.noreply.github.com>
This commit is contained in:
assistant-librarian[bot]
2026-02-17 13:13:19 -08:00
committed by GitHub
parent 7df73fafe0
commit 96b64858aa
8 changed files with 1302 additions and 14 deletions

View File

@@ -0,0 +1,685 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Full Build Analysis\n",
"\n",
"This notebook demonstrates comprehensive build-wide analysis using the Pipeline API to process all trace files in parallel.\n",
"\n",
"We'll create three main DataFrames:\n",
"1. **Metadata DataFrame**: One row per build unit with compilation statistics\n",
"2. **Phase DataFrame**: Compilation phase breakdown for all build units\n",
"3. **Template DataFrame**: Template instantiation events across the entire build\n",
"\n",
"All DataFrames are keyed by `build_unit` (the source file name) for easy cross-analysis."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"\n",
"from pathlib import Path\n",
"import sys\n",
"import pandas as pd\n",
"import plotly.express as px\n",
"\n",
"# Add parent directory to path\n",
"sys.path.insert(0, str(Path.cwd().parent))\n",
"\n",
"from trace_analysis import (\n",
" Pipeline,\n",
" find_trace_files,\n",
" parse_file,\n",
" get_trace_file,\n",
")\n",
"from trace_analysis.build_helpers import extract_all_data, print_phase_hierarchy\n",
"\n",
"# Display settings\n",
"pd.set_option(\"display.max_rows\", 100)\n",
"pd.set_option(\"display.max_columns\", None)\n",
"pd.set_option(\"display.width\", None)\n",
"pd.set_option(\"display.max_colwidth\", None)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Find Trace Files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Configure the path to your trace files\n",
"TRACE_DIR = Path(\"../../../build-trace\")\n",
"\n",
"json_files = find_trace_files(TRACE_DIR)\n",
"\n",
"if not json_files:\n",
" print(f\"No trace files found in {TRACE_DIR}\")\n",
" print(\"\\nTo generate trace files:\")\n",
" print(\"1. Configure your build with: cmake -DCMAKE_CXX_FLAGS='-ftime-trace' ...\")\n",
" print(\"2. Build your project\")\n",
" print(\"3. Trace files will be generated alongside object files\")\n",
" raise ValueError(f\"No trace files found in {TRACE_DIR}\")\n",
"else:\n",
" print(f\"Found {len(json_files):,} trace files\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parse All Files in Parallel\n",
"\n",
"Parse all trace files using all available CPUs with progress tracking."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Parse all files in parallel, returning a list of raw trace dataframes.\n",
"parsed_dfs = (\n",
" Pipeline(json_files)\n",
" .map(parse_file, workers=-1, desc=\"Parsing trace files\")\n",
" .collect()\n",
")\n",
"\n",
"print(f\"\\nParsed {len(parsed_dfs):,} files\")\n",
"print(f\"Total events across all files: {sum(len(df) for df in parsed_dfs):,}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Three Analysis DataFrames\n",
"\n",
"Extract metadata, phase breakdown, and template events from all parsed files in parallel.\n",
"This creates three DataFrames:\n",
"1. **metadata_df**: One row per build unit with compilation statistics\n",
"2. **phase_df**: Phase breakdown for all build units\n",
"3. **template_df**: Template instantiation events across the build\n",
"\n",
"All DataFrames use a shared categorical `build_unit` column for efficient grouping and joining.\n",
"\n",
"📝 **TODO:**\n",
"The details of this processing is all exposed in these notebook cells. We should add library functionality to simplify this.\n",
"\n",
"📝 **TODO:**\n",
"This takes way too long, there is likely something going wrong with the in-memory dataframe processing."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Extract all three types of data in one parallel pass (this can take a few minutes)\n",
"results = (\n",
" Pipeline(parsed_dfs)\n",
" .map(\n",
" extract_all_data, workers=-1, desc=\"Extracting metadata, phases, and templates\"\n",
" )\n",
" .collect()\n",
")\n",
"\n",
"print(f\"\\nExtracted data from {len(results):,} build units\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create shared categorical dtype for build_unit column\n",
"build_units = [r[\"build_unit\"] for r in results]\n",
"build_unit_dtype = pd.CategoricalDtype(\n",
" categories=sorted(set(build_units)), ordered=False\n",
")\n",
"\n",
"print(f\"Created categorical dtype with {len(build_unit_dtype.categories)} categories\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Build metadata DataFrame with categorical build_unit\n",
"metadata_df = pd.DataFrame(\n",
" [{\"build_unit\": r[\"build_unit\"], **r[\"metadata\"]} for r in results]\n",
")\n",
"metadata_df[\"build_unit\"] = metadata_df[\"build_unit\"].astype(build_unit_dtype)\n",
"\n",
"# Build trace file mapping and store in DataFrame attributes\n",
"trace_file_mapping = {r[\"build_unit\"]: r[\"trace_file_path\"] for r in results}\n",
"metadata_df.attrs[\"trace_file_mapping\"] = trace_file_mapping\n",
"\n",
"print(f\"metadata_df: {metadata_df.shape[0]:,} rows (one per build unit)\")\n",
"print(f\"Stored trace file mapping for {len(trace_file_mapping):,} build units\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Build phase DataFrame with categorical build_unit\n",
"phase_df = pd.concat(\n",
" [\n",
" r[\"phase\"].assign(\n",
" build_unit=pd.Categorical(\n",
" [r[\"build_unit\"]] * len(r[\"phase\"]), dtype=build_unit_dtype\n",
" )\n",
" )\n",
" for r in results\n",
" ],\n",
" ignore_index=True,\n",
")\n",
"\n",
"print(f\"phase_df: {phase_df.shape[0]:,} rows (phases across all build units)\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Build template DataFrame with categorical build_unit\n",
"template_df = pd.concat(\n",
" [\n",
" r[\"template\"].assign(\n",
" build_unit=pd.Categorical(\n",
" [r[\"build_unit\"]] * len(r[\"template\"]), dtype=build_unit_dtype\n",
" )\n",
" )\n",
" for r in results\n",
" ],\n",
" ignore_index=True,\n",
")\n",
"\n",
"print(f\"template_df: {template_df.shape[0]:,} rows (template events across build)\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Summary of created DataFrames\n",
"print(\"\\n=== Created Analysis DataFrames ===\")\n",
"print(f\" metadata_df: {metadata_df.shape[0]:,} rows × {metadata_df.shape[1]} columns\")\n",
"print(f\" phase_df: {phase_df.shape[0]:,} rows × {phase_df.shape[1]} columns\")\n",
"print(f\" template_df: {template_df.shape[0]:,} rows × {template_df.shape[1]} columns\")\n",
"print(\n",
" f\"\\nAll DataFrames share the same categorical build_unit dtype with {len(build_unit_dtype.categories)} categories\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build-Wide Metadata Analysis\n",
"\n",
"Analyze compilation statistics across all build units."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Overall build statistics\n",
"print(\"=== Build-Wide Statistics ===\")\n",
"print(f\"Total build units: {len(metadata_df):,}\")\n",
"print(f\"Total compilation time: {metadata_df['total_wall_time_s'].sum():.1f} seconds\")\n",
"print(f\"Average time per unit: {metadata_df['total_wall_time_s'].mean():.2f} seconds\")\n",
"print(f\"Median time per unit: {metadata_df['total_wall_time_s'].median():.2f} seconds\")\n",
"print(f\"Slowest unit: {metadata_df['total_wall_time_s'].max():.2f} seconds\")\n",
"print(f\"Fastest unit: {metadata_df['total_wall_time_s'].min():.2f} seconds\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Top 20 slowest compilation units\n",
"print(\"\\n=== Top 20 Slowest Compilation Units ===\")\n",
"slowest = metadata_df.nlargest(20, \"total_wall_time_s\")[\n",
" [\"build_unit\", \"total_wall_time_s\"]\n",
"]\n",
"\n",
"display(slowest.style.format({\"total_wall_time_s\": lambda x: f\"{x:.1f}\"}))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Getting Trace Files for Interesting Build Units\n",
"\n",
"Now that we've identified interesting build units (e.g., slowest compilation times), we can easily get their trace files for deeper analysis."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example: Get trace file for a specific build unit\n",
"example_build_unit = slowest.iloc[0][\"build_unit\"]\n",
"trace_file = get_trace_file(metadata_df, example_build_unit)\n",
"\n",
"print(f\"Build unit: {example_build_unit}\")\n",
"print(f\"Trace file: {trace_file}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get trace files for all slowest compilation units\n",
"print(\"\\n=== Trace Files for Top 10 Slowest Compilation Units ===\\n\")\n",
"for idx, row in slowest.head(10).iterrows():\n",
" build_unit = row[\"build_unit\"]\n",
" compile_time = row[\"total_wall_time_s\"]\n",
" trace_file = get_trace_file(metadata_df, build_unit)\n",
" print(f\"{compile_time:6.1f}s {build_unit}\")\n",
" print(f\" → {trace_file}\\n\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Compilation time distribution\n",
"fig = px.histogram(\n",
" metadata_df,\n",
" x=\"total_wall_time_s\",\n",
" nbins=600,\n",
" title=\"Distribution of Compilation Times\",\n",
" labels={\"total_wall_time_s\": \"Compilation Time (seconds)\"},\n",
" log_y=True,\n",
")\n",
"fig.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build-Wide Phase Analysis\n",
"\n",
"Analyze compilation phases across the entire build."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Add duration_ms column for easier reading\n",
"phase_df[\"duration_ms\"] = phase_df[\"duration\"] / 1000.0\n",
"\n",
"print(f\"Total phase records: {len(phase_df):,}\")\n",
"print(f\"Unique phases: {phase_df['name'].nunique()}\")\n",
"print(f\"\\nPhases tracked: {sorted(phase_df['name'].unique())}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cumulative phase time across entire build - hierarchical view\n",
"print_phase_hierarchy(phase_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Visualize cumulative phase breakdown\n",
"phase_summary = (\n",
" phase_df.groupby([\"name\", \"parent\", \"depth\"]).agg({\"duration\": \"sum\"}).reset_index()\n",
")\n",
"\n",
"fig = px.sunburst(\n",
" phase_summary,\n",
" names=\"name\",\n",
" parents=\"parent\",\n",
" values=\"duration\",\n",
" title=\"Cumulative Phase Breakdown Across Entire Build\",\n",
" branchvalues=\"total\",\n",
")\n",
"fig.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Which build units spend most time in Frontend?\n",
"frontend_time = phase_df[phase_df[\"name\"] == \"Frontend\"].nlargest(20, \"duration_ms\")[\n",
" [\"build_unit\", \"duration_ms\"]\n",
"]\n",
"\n",
"# Convert to seconds (keep as float)\n",
"frontend_time[\"duration_s\"] = frontend_time[\"duration_ms\"] / 1000\n",
"frontend_time = frontend_time[[\"build_unit\", \"duration_s\"]]\n",
"\n",
"print(\"=== Top 20 Build Units by Frontend Time ===\")\n",
"display(frontend_time.style.format({\"duration_s\": \"{:,.1f}\"}))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Which build units spend most time in Backend?\n",
"backend_time = phase_df[phase_df[\"name\"] == \"Backend\"].nlargest(20, \"duration_ms\")[\n",
" [\"build_unit\", \"duration_ms\"]\n",
"]\n",
"\n",
"backend_time[\"duration_s\"] = backend_time[\"duration_ms\"] / 1000\n",
"backend_time = backend_time[[\"build_unit\", \"duration_s\"]]\n",
"\n",
"print(\"=== Top 20 Build Units by Backend Time ===\")\n",
"display(backend_time.style.format({\"duration_s\": \"{:,.1f}\"}))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build-Wide Template Analysis\n",
"\n",
"Analyze template instantiations across the entire build."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Overall template statistics\n",
"print(\"=== Build-Wide Template Statistics ===\")\n",
"print(f\"Total template instantiation events: {len(template_df):,}\")\n",
"print(f\"Total template time: {template_df['dur'].sum() / 1_000_000:.1f} seconds\")\n",
"print(f\"Average template time: {template_df['dur'].mean() / 1000:.2f} ms\")\n",
"print(f\"Unique template names: {template_df['template_name'].nunique():,}\")\n",
"print(f\"Unique namespaces: {template_df['namespace'].nunique()}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Top templates by total time across build\n",
"top_templates = template_df.groupby([\"namespace\", \"template_name\"]).agg(\n",
" {\"dur\": [\"count\", \"sum\", \"mean\"]}\n",
")\n",
"top_templates.columns = [\"count\", \"total_dur\", \"avg_dur\"]\n",
"top_templates[\"total_s\"] = top_templates[\"total_dur\"] / 1_000_000\n",
"top_templates[\"avg_ms\"] = top_templates[\"avg_dur\"] / 1000\n",
"top_templates = top_templates.sort_values(\"total_dur\", ascending=False).reset_index()\n",
"\n",
"print(\"\\n=== Top 30 Templates by Total Time Across Build ===\")\n",
"display(\n",
" top_templates.head(30)[\n",
" [\"namespace\", \"template_name\", \"count\", \"total_s\", \"avg_ms\"]\n",
" ].style.format({\"count\": \"{:,.0f}\", \"total_s\": \"{:,.1f}\", \"avg_ms\": \"{:,.1f}\"})\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Template time by namespace\n",
"namespace_summary = (\n",
" template_df.groupby(\"namespace\")\n",
" .agg({\"dur\": [\"count\", \"sum\", \"mean\"], \"param_count\": \"mean\"})\n",
" .round(2)\n",
")\n",
"namespace_summary.columns = [\"count\", \"total_dur\", \"avg_dur\", \"avg_params\"]\n",
"namespace_summary[\"total_s\"] = namespace_summary[\"total_dur\"] / 1_000_000\n",
"namespace_summary = namespace_summary.sort_values(\"total_dur\", ascending=False)\n",
"\n",
"print(\"\\n=== Template Time by Namespace ===\")\n",
"display(\n",
" namespace_summary.style.format(\n",
" {\n",
" \"count\": \"{:,d}\",\n",
" \"total_dur\": \"{:,.0f}\",\n",
" \"avg_dur\": \"{:,.0f}\",\n",
" \"avg_params\": \"{:,.2f}\",\n",
" \"total_s\": \"{:,.1f}\",\n",
" }\n",
" )\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# CK library templates analysis\n",
"ck_templates = template_df[template_df[\"is_ck_type\"]].copy()\n",
"\n",
"print(\"=== CK Library Template Analysis ===\")\n",
"print(f\"CK template instantiations: {len(ck_templates):,}\")\n",
"print(f\"CK template time: {ck_templates['dur'].sum() / 1_000_000:.1f} seconds\")\n",
"print(\n",
" f\"Percentage of total template time: {100 * ck_templates['dur'].sum() / template_df['dur'].sum():.1f}%\"\n",
")\n",
"\n",
"# Top CK templates\n",
"ck_by_name = (\n",
" ck_templates.groupby(\"template_name\")\n",
" .agg({\"dur\": [\"count\", \"sum\", \"mean\"]})\n",
" .round(2)\n",
")\n",
"ck_by_name.columns = [\"count\", \"total_dur\", \"avg_dur\"]\n",
"ck_by_name[\"total_s\"] = ck_by_name[\"total_dur\"] / 1_000_000\n",
"ck_by_name = ck_by_name.sort_values(\"total_dur\", ascending=False)\n",
"\n",
"print(\"\\n=== Top 20 CK Templates by Total Time ===\")\n",
"display(\n",
" ck_by_name.head(20)[[\"count\", \"total_s\"]].style.format(\n",
" {\"count\": \"{:,d}\", \"total_s\": \"{:,.0f}\"}\n",
" )\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cross-Analysis: Templates vs Compilation Time\n",
"\n",
"Analyze relationships between template instantiations and compilation time."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Template count per build unit\n",
"template_counts = (\n",
" template_df.groupby(\"build_unit\").size().reset_index(name=\"template_count\")\n",
")\n",
"\n",
"# Template time per build unit\n",
"template_time = (\n",
" template_df.groupby(\"build_unit\")[\"dur\"].sum().reset_index(name=\"template_time_us\")\n",
")\n",
"template_time[\"template_time_s\"] = template_time[\"template_time_us\"] / 1_000_000\n",
"\n",
"# Merge with metadata\n",
"analysis_df = (\n",
" metadata_df[[\"build_unit\", \"total_wall_time_s\"]]\n",
" .merge(template_counts, on=\"build_unit\", how=\"left\")\n",
" .merge(\n",
" template_time[[\"build_unit\", \"template_time_s\"]], on=\"build_unit\", how=\"left\"\n",
" )\n",
")\n",
"\n",
"# Fill NaN with 0 for units with no templates\n",
"analysis_df[\"template_count\"] = analysis_df[\"template_count\"].fillna(0)\n",
"analysis_df[\"template_time_s\"] = analysis_df[\"template_time_s\"].fillna(0)\n",
"\n",
"print(\"=== Template Count vs Compilation Time ===\")\n",
"print(\n",
" f\"Correlation: {analysis_df['template_count'].corr(analysis_df['total_wall_time_s']):.3f}\"\n",
")\n",
"\n",
"# Top units by template count\n",
"print(\"\\n=== Top 20 Build Units by Template Count ===\")\n",
"display(analysis_df.nlargest(20, \"template_count\"))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Scatter plot: template count vs compilation time\n",
"fig = px.scatter(\n",
" analysis_df,\n",
" x=\"template_count\",\n",
" y=\"total_wall_time_s\",\n",
" hover_data=[\"build_unit\"],\n",
" title=\"Template Count vs Compilation Time\",\n",
" labels={\n",
" \"template_count\": \"Number of Template Instantiations\",\n",
" \"total_wall_time_s\": \"Compilation Time (seconds)\",\n",
" },\n",
" trendline=\"ols\",\n",
")\n",
"fig.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Scatter plot: template time vs compilation time\n",
"# Note: The total instantiation double-counts nested templates.\n",
"fig = px.scatter(\n",
" analysis_df,\n",
" x=\"template_time_s\",\n",
" y=\"total_wall_time_s\",\n",
" hover_data=[\"build_unit\"],\n",
" title=\"Template Instantiation Time vs Total Compilation Time\",\n",
" labels={\n",
" \"template_time_s\": \"Template Instantiation Time (seconds)\",\n",
" \"total_wall_time_s\": \"Total Compilation Time (seconds)\",\n",
" },\n",
" trendline=\"ols\",\n",
")\n",
"fig.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"This notebook demonstrated:\n",
"- Parallel parsing of all trace files using the Pipeline API\n",
"- Parallel extraction of metadata, phases, and templates in a single pass\n",
"- Creating three consistently-keyed DataFrames with shared categorical dtype\n",
"- **Trace file mapping** stored in metadata_df.attrs for easy lookup\n",
"- Build-wide metadata analysis\n",
"- Cumulative phase analysis with visualizations\n",
"- Build-wide template analysis\n",
"- Cross-analysis between templates and compilation time\n",
"- **Using get_trace_file() to locate trace files for interesting build units**\n",
"\n",
"The shared categorical `build_unit` dtype enables efficient grouping and joining across all three DataFrames for comprehensive build analysis.\n",
"\n",
"The trace file mapping allows you to quickly download or analyze the raw trace JSON for any build unit of interest."
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

View File

@@ -17,6 +17,9 @@
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"\n",
"import sys\n",
"from pathlib import Path\n",
"\n",
@@ -62,7 +65,8 @@
"\n",
"print(f\"Total events: {len(df):,}\")\n",
"starting_timestamp = datetime.fromtimestamp(df.attrs[\"beginningOfTime\"] / 1e6)\n",
"print(f\"Starting timestamp: {starting_timestamp.strftime('%Y-%m-%d:%H:%M:%S')}\")"
"print(f\"Starting timestamp: {starting_timestamp.strftime('%Y-%m-%d:%H:%M:%S')}\")\n",
"print(f\"Source file: {df.attrs['sourceFile']}\")"
]
},
{

View File

@@ -4,6 +4,9 @@
pandas>=2.0.0
orjson>=3.9.0
# Statistical analysis
statsmodels>=0.14.0
# Jupyter notebook support
nbformat>=4.2.0
ipykernel>=6.0.0
@@ -16,3 +19,6 @@ kaleido>=0.2.0
# Full Jupyter environment (if not using VSCode)
jupyter>=1.0.0
# Progress meter in notebook
tqdm>=4.0.0

View File

@@ -22,13 +22,32 @@ from .phase_breakdown import (
PhaseBreakdown,
)
from .parse_build import (
find_trace_files,
read_trace_files,
)
from .pipeline import (
Pipeline,
)
from .build_helpers import (
get_trace_file,
)
__all__ = [
# Core parsing and filtering
"parse_file",
"get_metadata",
"find_trace_files",
"read_trace_files",
# Pipeline processing
"Pipeline",
# Template analysis
"get_template_instantiation_events",
# Phase breakdown
"get_phase_breakdown",
"PhaseBreakdown",
# Build helpers
"get_trace_file",
]

View File

@@ -0,0 +1,139 @@
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
Helper functions for full build analysis.
"""
from .parse_file import get_metadata
from .phase_breakdown import get_phase_breakdown
from .template_analysis import get_template_instantiation_events
def extract_all_data(df):
"""
Extract metadata, phase breakdown, and template events from a parsed DataFrame.
Args:
df: Parsed DataFrame from parse_file()
Returns:
Dictionary with keys:
- build_unit: Source file path starting from composable_kernel/
- trace_file_path: Path to the original trace JSON file
- metadata: Metadata dictionary
- phase: Phase breakdown DataFrame
- template: Template events DataFrame
"""
return {
"build_unit": df.attrs["sourceFile"],
"trace_file_path": df.attrs.get("traceFilePath"),
"metadata": get_metadata(df).__dict__,
"phase": get_phase_breakdown(df).df,
"template": get_template_instantiation_events(df),
}
def get_trace_file(metadata_df, build_unit):
"""
Get the trace file path for a given build unit.
Args:
metadata_df: Metadata DataFrame with trace_file_mapping in .attrs
build_unit: Source file path (build unit name)
Returns:
Path to the trace JSON file, or None if not found
Examples:
>>> # Get trace file for a specific build unit
>>> trace_path = get_trace_file(metadata_df, "library/src/tensor/gemm.cpp")
>>> print(f"Trace file: {trace_path}")
>>>
>>> # Get trace files for slowest compilation units
>>> slowest = metadata_df.nlargest(5, "total_wall_time_s")
>>> for _, row in slowest.iterrows():
... trace_path = get_trace_file(metadata_df, row['build_unit'])
... print(f"{row['build_unit']}: {trace_path}")
"""
mapping = metadata_df.attrs.get("trace_file_mapping", {})
return mapping.get(build_unit)
def print_phase_hierarchy(phase_df):
"""
Print cumulative phase times in a hierarchical tree structure.
Args:
phase_df: DataFrame with columns: name, parent, depth, duration, build_unit
(as created by concatenating phase breakdown results)
"""
# Aggregate by phase name, parent, and depth
phase_summary = (
phase_df.groupby(["name", "parent", "depth"])
.agg({"duration": "sum"})
.reset_index()
)
# Convert to seconds
phase_summary["duration_s"] = phase_summary["duration"] / 1_000_000
# Calculate total time from root node only (depth == 0)
# With branchvalues="total", parent nodes include their children's time,
# so summing all phases would double/triple count nested values
root_phases = phase_summary[
(phase_summary["parent"] == "")
| (phase_summary["parent"].isna())
| (phase_summary["depth"] == 0)
].sort_values("duration_s", ascending=False)
if len(root_phases) == 0:
raise ValueError("No root phase found (depth == 0)")
if len(root_phases) > 1:
raise ValueError(f"Multiple root phases found: {root_phases['name'].tolist()}")
total_time_s = root_phases.iloc[0]["duration_s"]
print("=== Cumulative Phase Time Across Build ===")
print(f"\nTotal compilation time: {total_time_s:,.1f} s")
print("\nBreakdown by phase:")
# Track which phases we've printed to handle hierarchy
printed_phases = set()
def print_phase_tree(df, parent_name, depth=0):
"""Recursively print phases in hierarchical order"""
# Get children of this parent at the next depth level
children = df[(df["parent"] == parent_name) & (df["depth"] == depth)]
# Sort by duration descending within each level
children = children.sort_values("duration_s", ascending=False)
for _, row in children.iterrows():
phase_name = row["name"]
if phase_name in printed_phases:
continue
time_s = row["duration_s"]
pct = 100 * time_s / total_time_s
indent = " " * depth
# Create indented name and pad the whole thing to align colons
indented_name = f"{indent}{phase_name}"
print(f"{indented_name:32s}: {time_s:12,.1f} s ({pct:5.1f}%)")
printed_phases.add(phase_name)
# Recursively print children
print_phase_tree(df, phase_name, depth + 1)
for _, row in root_phases.iterrows():
phase_name = row["name"]
if phase_name in printed_phases:
continue
time_s = row["duration_s"]
pct = 100 * time_s / total_time_s
# Pad root phase name to align with children
print(f"{phase_name:32s}: {time_s:12,.1f} s ({pct:5.1f}%)")
printed_phases.add(phase_name)
# Print children recursively
print_phase_tree(phase_summary, phase_name, 1)

View File

@@ -0,0 +1,96 @@
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
Utility functions for trace analysis.
Helper functions for file discovery, path handling, and other common operations.
"""
import subprocess
import pandas as pd
from pathlib import Path
from typing import List
def find_trace_files(trace_dir: Path) -> List[Path]:
"""
Find all JSON trace files in a directory.
Uses Unix 'find' command when available (2-5x faster than Python),
with automatic fallback to Python's rglob for cross-platform compatibility.
Args:
trace_dir: Directory to search for trace files
Returns:
List of Path objects pointing to .json files
Example:
>>> from pathlib import Path
>>> from trace_analysis import find_trace_files
>>> trace_files = find_trace_files(Path("build/CMakeFiles"))
>>> print(f"Found {len(trace_files)} trace files")
"""
try:
# Try Unix find (2-5x faster than Python)
result = subprocess.run(
["find", str(trace_dir), "-name", "*.cpp.json", "-type", "f"],
capture_output=True,
text=True,
timeout=30,
check=True,
)
json_files = [Path(p) for p in result.stdout.strip().split("\n") if p]
except (subprocess.SubprocessError, FileNotFoundError, OSError):
# Fallback to Python (cross-platform)
print("Using Python to find trace files (this may be slower)...")
json_files = list(trace_dir.rglob("*.cpp.json"))
return json_files
def read_trace_files(json_files: List[Path], workers: int = -1) -> List["pd.DataFrame"]:
"""
Parse trace files in parallel and return list of DataFrames.
This is a convenience function that uses the Pipeline API to parse
multiple trace files in parallel with progress tracking.
Args:
json_files: List of paths to trace JSON files
workers: Number of parallel workers to use:
- -1: Use all available CPUs (default)
- None: Sequential processing (single-threaded)
- N > 0: Use N worker processes
Returns:
List of parsed DataFrames, one per file
Example:
>>> from pathlib import Path
>>> from trace_analysis import find_trace_files, read_trace_files
>>>
>>> # Find and parse all trace files
>>> trace_files = find_trace_files(Path("build/CMakeFiles"))
>>> dataframes = read_trace_files(trace_files, workers=8)
>>> print(f"Parsed {len(dataframes)} files")
>>>
>>> # Use Pipeline directly for more control
>>> from trace_analysis import Pipeline
>>> from trace_analysis.parse_file import parse_file
>>>
>>> pipeline = Pipeline(trace_files).map(parse_file, workers=8)
>>> all_events, metadata = pipeline.tee(
... lambda dfs: pd.concat(dfs, ignore_index=True),
... lambda dfs: [get_metadata(df) for df in dfs]
... )
"""
from trace_analysis.pipeline import Pipeline
from trace_analysis.parse_file import parse_file
return (
Pipeline(json_files)
.map(parse_file, workers=workers, desc="Parsing trace files")
.collect()
)

View File

@@ -185,6 +185,14 @@ def parse_file(filepath: Union[str, Path]) -> pd.DataFrame:
data.get("beginningOfTime") if isinstance(data, dict) else None
)
# Store the source file path derived from the trace filename
# The trace filename format is: <source_file>.json
# Remove the .json extension to get the source file path
source_file_path = filepath.stem # Gets filename without .json extension
full_path = filepath.parent / source_file_path
df.attrs["sourceFile"] = _remove_cmake_artifacts(str(full_path))
df.attrs["traceFilePath"] = str(filepath)
return df
@@ -201,14 +209,8 @@ def _flatten_args(df: pd.DataFrame) -> pd.DataFrame:
Returns:
DataFrame with flattened args columns and original 'args' column removed
"""
# Extract args into separate DataFrame
args_data = []
for idx, row in df.iterrows():
args = row.get("args", {})
if isinstance(args, dict):
args_data.append(args)
else:
args_data.append({})
args_list = df["args"].tolist()
args_data = [arg if isinstance(arg, dict) else {} for arg in args_list]
if args_data:
args_df = pd.DataFrame(args_data)
@@ -222,6 +224,42 @@ def _flatten_args(df: pd.DataFrame) -> pd.DataFrame:
return df
def _remove_cmake_artifacts(file_path: str) -> str:
"""
Remove CMake build artifacts from a file path.
CMake creates build directories with the pattern:
<build-dir>/<source-path>/CMakeFiles/<target>.dir/<source-file>
This function removes the CMakeFiles and .dir segments to reconstruct
the logical source file path while preserving the build directory prefix.
Args:
file_path: Path potentially containing CMake artifacts
Returns:
Path with CMakeFiles and .dir segments removed
Examples:
>>> _remove_cmake_artifacts('build/library/src/foo/CMakeFiles/target.dir/bar.cpp')
'build/library/src/foo/bar.cpp'
>>> _remove_cmake_artifacts('library/src/foo/bar.cpp')
'library/src/foo/bar.cpp'
"""
path = Path(file_path)
parts = path.parts
# Filter out CMakeFiles and any parts ending with .dir
filtered_parts = [
part for part in parts if part != "CMakeFiles" and not part.endswith(".dir")
]
# Reconstruct the path
if filtered_parts:
return str(Path(*filtered_parts))
return file_path
def _normalize_source_path(file_path: str) -> str:
"""
Normalize a source file path to be relative to composable_kernel if present.
@@ -287,13 +325,17 @@ def get_metadata(df: pd.DataFrame) -> FileMetadata:
>>> print(f"Duration: {metadata.total_wall_time_s:.2f}s")
>>> print(f"Started: {metadata.wall_start_datetime}")
"""
# Extract beginningOfTime from DataFrame attributes
# Extract beginningOfTime and source_file from DataFrame attributes
beginning_of_time = None
source_file = None
if hasattr(df, "attrs"):
beginning_of_time = df.attrs.get("beginningOfTime")
source_file = df.attrs.get("source_file")
# Initialize metadata with beginningOfTime from JSON structure
metadata = FileMetadata(beginning_of_time=beginning_of_time)
# Initialize metadata with values from DataFrame attributes
metadata = FileMetadata(
beginning_of_time=beginning_of_time, source_file=source_file
)
# Process events to extract ExecuteCompiler timing information
if "name" in df.columns:
@@ -306,8 +348,13 @@ def get_metadata(df: pd.DataFrame) -> FileMetadata:
if "dur" in event:
metadata.execute_compiler_dur = event["dur"]
# Process events to find the main source file being compiled
if "name" in df.columns and "arg_detail" in df.columns:
# Fallback: Try to find source file from ParseDeclarationOrFunctionDefinition events
# This is only used if source_file wasn't already set from the filename
if (
metadata.source_file is None
and "name" in df.columns
and "arg_detail" in df.columns
):
# Look for ParseDeclarationOrFunctionDefinition events with .cpp or .c files
source_extensions = (".cpp", ".cc", ".cxx", ".c")
parse_events = df[df["name"] == "ParseDeclarationOrFunctionDefinition"]

View File

@@ -0,0 +1,292 @@
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
Functional pipeline for parallel processing of trace files.
This module provides a fluent API for building data processing pipelines with
support for parallel execution, progress tracking, and multiple output branches.
Example:
>>> from trace_analysis import Pipeline, find_trace_files
>>> from trace_analysis.parse_file import parse_file
>>>
>>> files = find_trace_files(Path("build"))
>>> dfs = Pipeline(files).map(parse_file, workers=8).collect()
"""
from typing import Any, Callable, List, Optional, Tuple, Union
from multiprocessing import Pool, cpu_count
from tqdm.auto import tqdm
class Pipeline:
"""
Functional pipeline for processing data with parallel execution support.
Provides a fluent API for chaining operations like map, filter, and reduce.
Supports parallel processing with multiprocessing and progress tracking with tqdm.
Features:
- Fluent API with method chaining
- Parallel processing with configurable worker count
- Progress bars in Jupyter notebooks (tqdm)
- Fail-fast error handling
- In-memory processing for speed
- Tee operation for branching into multiple outputs
Attributes:
_items: Current list of items in the pipeline
_is_reduced: Flag indicating if pipeline has been reduced to single value
Example:
Basic parallel processing:
>>> files = find_trace_files(Path("build"))
>>> dfs = Pipeline(files).map(parse_file, workers=8).collect()
Multi-stage pipeline:
>>> results = (
... Pipeline(files)
... .map(parse_file, workers=8)
... .filter(lambda df: len(df) > 1000)
... .collect()
... )
Multiple outputs with tee:
>>> pipeline = Pipeline(files).map(parse_file, workers=8)
>>> all_events, metadata, stats = pipeline.tee(
... lambda dfs: pd.concat(dfs, ignore_index=True),
... lambda dfs: [get_metadata(df) for df in dfs],
... lambda dfs: {"count": len(dfs)}
... )
"""
def __init__(self, items: List[Any]):
"""
Initialize a new pipeline with a list of items.
Args:
items: Initial list of items to process
"""
self._items = items
self._is_reduced = False
def map(
self,
func: Callable[[Any], Any],
workers: Optional[int] = None,
desc: Optional[str] = None,
) -> "Pipeline":
"""
Apply a function to each item in the pipeline.
Args:
func: Function to apply to each item. Should accept a single argument
and return a transformed value.
workers: Number of parallel workers to use:
- None: Sequential processing (single-threaded)
- -1: Use all available CPUs
- N > 0: Use N worker processes
desc: Description for the progress bar. If None, uses a default description.
Returns:
Self for method chaining
Raises:
ValueError: If pipeline has already been reduced
Exception: Any exception raised by func is re-raised with context
Example:
>>> # Sequential processing
>>> Pipeline(files).map(parse_file).collect()
>>>
>>> # Parallel processing with all CPUs
>>> Pipeline(files).map(parse_file, workers=-1).collect()
>>>
>>> # Parallel with custom worker count and description
>>> Pipeline(files).map(parse_file, workers=8, desc="Parsing").collect()
"""
if self._is_reduced:
raise ValueError("Cannot map after reduce operation")
if not self._items:
return self
# Determine worker count
if workers == -1:
workers = cpu_count()
# Set default description
if desc is None:
desc = "Processing items"
# Sequential processing
if workers is None or workers == 1:
results = []
for item in tqdm(self._items, desc=desc):
try:
results.append(func(item))
except Exception as e:
raise type(e)(f"Error processing item {item}: {e}") from e
self._items = results
return self
# Parallel processing
try:
with Pool(processes=workers) as pool:
# Use imap_unordered for better performance (results as they complete)
# Wrap with tqdm for progress tracking
results = list(
tqdm(
pool.imap_unordered(func, self._items),
total=len(self._items),
desc=desc,
)
)
self._items = results
return self
except Exception as e:
# Re-raise with context
raise type(e)(f"Error in parallel map operation: {e}") from e
def filter(self, predicate: Callable[[Any], bool]) -> "Pipeline":
"""
Filter items based on a predicate function.
Args:
predicate: Function that returns True for items to keep, False to discard.
Should accept a single argument and return a boolean.
Returns:
Self for method chaining
Raises:
ValueError: If pipeline has already been reduced
Example:
>>> # Keep only large DataFrames
>>> Pipeline(dfs).filter(lambda df: len(df) > 1000).collect()
>>>
>>> # Keep only successful builds
>>> Pipeline(dfs).filter(
... lambda df: 'ExecuteCompiler' in df['name'].values
... ).collect()
"""
if self._is_reduced:
raise ValueError("Cannot filter after reduce operation")
self._items = [item for item in self._items if predicate(item)]
return self
def reduce(self, func: Callable[[List[Any]], Any]) -> "Pipeline":
"""
Reduce all items to a single value using an aggregation function.
After reduction, the pipeline contains a single value and no further
map or filter operations are allowed.
Args:
func: Aggregation function that accepts a list of all items and
returns a single aggregated value.
Returns:
Self for method chaining
Raises:
ValueError: If pipeline has already been reduced
Example:
>>> # Concatenate all DataFrames
>>> Pipeline(dfs).reduce(
... lambda dfs: pd.concat(dfs, ignore_index=True)
... ).collect()
>>>
>>> # Sum all values
>>> Pipeline(numbers).reduce(sum).collect()
>>>
>>> # Custom aggregation
>>> Pipeline(dfs).reduce(
... lambda dfs: {
... "total_files": len(dfs),
... "total_events": sum(len(df) for df in dfs)
... }
... ).collect()
"""
if self._is_reduced:
raise ValueError("Cannot reduce twice")
try:
self._items = [func(self._items)]
self._is_reduced = True
return self
except Exception as e:
raise type(e)(f"Error in reduce operation: {e}") from e
def tee(self, *funcs: Callable[[List[Any]], Any]) -> Tuple[Any, ...]:
"""
Branch the pipeline into multiple outputs.
Each function receives the full list of current items and produces
an independent output. This is useful for generating multiple
aggregations or analyses from the same data.
This operation automatically collects the pipeline results.
Args:
*funcs: Variable number of functions, each accepting the full list
of items and returning a result. Each function is applied
independently to the same input data.
Returns:
Tuple of results, one per function, in the same order as the functions
Raises:
ValueError: If no functions are provided
Exception: Any exception raised by a function is re-raised with context
Example:
>>> pipeline = Pipeline(files).map(parse_file, workers=8)
>>>
>>> # Create three different outputs from the same data
>>> all_events, metadata_df, stats = pipeline.tee(
... # Output 1: Concatenated DataFrame
... lambda dfs: pd.concat(dfs, ignore_index=True),
... # Output 2: Metadata summary
... lambda dfs: pd.DataFrame([get_metadata(df).__dict__ for df in dfs]),
... # Output 3: Statistics dictionary
... lambda dfs: {
... "total_files": len(dfs),
... "total_events": sum(len(df) for df in dfs)
... }
... )
"""
if not funcs:
raise ValueError("At least one function must be provided to tee")
results = []
for i, func in enumerate(funcs):
try:
results.append(func(self._items))
except Exception as e:
raise type(e)(f"Error in tee function {i}: {e}") from e
return tuple(results)
def collect(self) -> Union[List[Any], Any]:
"""
Execute the pipeline and return the results.
Returns:
If the pipeline has been reduced, returns the single reduced value.
Otherwise, returns the list of items.
Example:
>>> # Returns list of DataFrames
>>> dfs = Pipeline(files).map(parse_file, workers=8).collect()
>>>
>>> # Returns single concatenated DataFrame
>>> df = Pipeline(files).map(parse_file, workers=8).reduce(pd.concat).collect()
"""
if self._is_reduced:
return self._items[0]
return self._items