mirror of
https://github.com/ROCm/composable_kernel.git
synced 2026-04-19 22:39:03 +00:00
[rocm-libraries] ROCm/rocm-libraries#4259 (commit 223d90c)
Add multi-file trace parsing and analysis pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends build time analysis from ROCm/composable_kernel#3644 to handle multiple trace files across build directories (see #4229): - pipeline.py: Generic pipeline framework with fluent interface for composable data processing. Provides parallel processing, progress tracking, and error handling independent of trace-specific code. Processes thousands of trace files at default resolution in minutes, aggregating results into in-memory DataFrames for analysis. - parse_build.py: Parse all trace files in a build directory - build_analysis_example.ipynb: Demonstrates pipeline aggregation across all build files The pipeline design improves capability (composable operations), performance (parallel processing), and user-friendliness (fluent API) of the analysis modules. It enables analyzing compilation patterns across the entire codebase with all trace data available in pandas DataFrames for interactive exploration.
This commit is contained in:
committed by
assistant-librarian[bot]
parent
1bf66006c9
commit
270b1445b1
@@ -17,6 +17,9 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"%load_ext autoreload\n",
|
||||
"%autoreload 2\n",
|
||||
"\n",
|
||||
"import sys\n",
|
||||
"from pathlib import Path\n",
|
||||
"\n",
|
||||
@@ -62,7 +65,8 @@
|
||||
"\n",
|
||||
"print(f\"Total events: {len(df):,}\")\n",
|
||||
"starting_timestamp = datetime.fromtimestamp(df.attrs[\"beginningOfTime\"] / 1e6)\n",
|
||||
"print(f\"Starting timestamp: {starting_timestamp.strftime('%Y-%m-%d:%H:%M:%S')}\")"
|
||||
"print(f\"Starting timestamp: {starting_timestamp.strftime('%Y-%m-%d:%H:%M:%S')}\")\n",
|
||||
"print(f\"Source file: {df.attrs['sourceFile']}\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
pandas>=2.0.0
|
||||
orjson>=3.9.0
|
||||
|
||||
# Statistical analysis
|
||||
statsmodels>=0.14.0
|
||||
|
||||
# Jupyter notebook support
|
||||
nbformat>=4.2.0
|
||||
ipykernel>=6.0.0
|
||||
@@ -16,3 +19,6 @@ kaleido>=0.2.0
|
||||
|
||||
# Full Jupyter environment (if not using VSCode)
|
||||
jupyter>=1.0.0
|
||||
|
||||
# Progress meter in notebook
|
||||
tqdm>=4.0.0
|
||||
|
||||
@@ -22,13 +22,32 @@ from .phase_breakdown import (
|
||||
PhaseBreakdown,
|
||||
)
|
||||
|
||||
from .parse_build import (
|
||||
find_trace_files,
|
||||
read_trace_files,
|
||||
)
|
||||
|
||||
from .pipeline import (
|
||||
Pipeline,
|
||||
)
|
||||
|
||||
from .build_helpers import (
|
||||
get_trace_file,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Core parsing and filtering
|
||||
"parse_file",
|
||||
"get_metadata",
|
||||
"find_trace_files",
|
||||
"read_trace_files",
|
||||
# Pipeline processing
|
||||
"Pipeline",
|
||||
# Template analysis
|
||||
"get_template_instantiation_events",
|
||||
# Phase breakdown
|
||||
"get_phase_breakdown",
|
||||
"PhaseBreakdown",
|
||||
# Build helpers
|
||||
"get_trace_file",
|
||||
]
|
||||
|
||||
96
script/analyze_build/trace_analysis/parse_build.py
Normal file
96
script/analyze_build/trace_analysis/parse_build.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
"""
|
||||
Utility functions for trace analysis.
|
||||
|
||||
Helper functions for file discovery, path handling, and other common operations.
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
|
||||
def find_trace_files(trace_dir: Path) -> List[Path]:
|
||||
"""
|
||||
Find all JSON trace files in a directory.
|
||||
|
||||
Uses Unix 'find' command when available (2-5x faster than Python),
|
||||
with automatic fallback to Python's rglob for cross-platform compatibility.
|
||||
|
||||
Args:
|
||||
trace_dir: Directory to search for trace files
|
||||
|
||||
Returns:
|
||||
List of Path objects pointing to .json files
|
||||
|
||||
Example:
|
||||
>>> from pathlib import Path
|
||||
>>> from trace_analysis import find_trace_files
|
||||
>>> trace_files = find_trace_files(Path("build/CMakeFiles"))
|
||||
>>> print(f"Found {len(trace_files)} trace files")
|
||||
"""
|
||||
try:
|
||||
# Try Unix find (2-5x faster than Python)
|
||||
result = subprocess.run(
|
||||
["find", str(trace_dir), "-name", "*.cpp.json", "-type", "f"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
check=True,
|
||||
)
|
||||
json_files = [Path(p) for p in result.stdout.strip().split("\n") if p]
|
||||
except (subprocess.SubprocessError, FileNotFoundError, OSError):
|
||||
# Fallback to Python (cross-platform)
|
||||
print("Using Python to find trace files (this may be slower)...")
|
||||
json_files = list(trace_dir.rglob("*.cpp.json"))
|
||||
|
||||
return json_files
|
||||
|
||||
|
||||
def read_trace_files(json_files: List[Path], workers: int = -1) -> List["pd.DataFrame"]:
|
||||
"""
|
||||
Parse trace files in parallel and return list of DataFrames.
|
||||
|
||||
This is a convenience function that uses the Pipeline API to parse
|
||||
multiple trace files in parallel with progress tracking.
|
||||
|
||||
Args:
|
||||
json_files: List of paths to trace JSON files
|
||||
workers: Number of parallel workers to use:
|
||||
- -1: Use all available CPUs (default)
|
||||
- None: Sequential processing (single-threaded)
|
||||
- N > 0: Use N worker processes
|
||||
|
||||
Returns:
|
||||
List of parsed DataFrames, one per file
|
||||
|
||||
Example:
|
||||
>>> from pathlib import Path
|
||||
>>> from trace_analysis import find_trace_files, read_trace_files
|
||||
>>>
|
||||
>>> # Find and parse all trace files
|
||||
>>> trace_files = find_trace_files(Path("build/CMakeFiles"))
|
||||
>>> dataframes = read_trace_files(trace_files, workers=8)
|
||||
>>> print(f"Parsed {len(dataframes)} files")
|
||||
>>>
|
||||
>>> # Use Pipeline directly for more control
|
||||
>>> from trace_analysis import Pipeline
|
||||
>>> from trace_analysis.parse_file import parse_file
|
||||
>>>
|
||||
>>> pipeline = Pipeline(trace_files).map(parse_file, workers=8)
|
||||
>>> all_events, metadata = pipeline.tee(
|
||||
... lambda dfs: pd.concat(dfs, ignore_index=True),
|
||||
... lambda dfs: [get_metadata(df) for df in dfs]
|
||||
... )
|
||||
"""
|
||||
from trace_analysis.pipeline import Pipeline
|
||||
from trace_analysis.parse_file import parse_file
|
||||
|
||||
return (
|
||||
Pipeline(json_files)
|
||||
.map(parse_file, workers=workers, desc="Parsing trace files")
|
||||
.collect()
|
||||
)
|
||||
@@ -185,6 +185,14 @@ def parse_file(filepath: Union[str, Path]) -> pd.DataFrame:
|
||||
data.get("beginningOfTime") if isinstance(data, dict) else None
|
||||
)
|
||||
|
||||
# Store the source file path derived from the trace filename
|
||||
# The trace filename format is: <source_file>.json
|
||||
# Remove the .json extension to get the source file path
|
||||
source_file_path = filepath.stem # Gets filename without .json extension
|
||||
full_path = filepath.parent / source_file_path
|
||||
df.attrs["sourceFile"] = _remove_cmake_artifacts(str(full_path))
|
||||
df.attrs["traceFilePath"] = str(filepath)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@@ -201,14 +209,8 @@ def _flatten_args(df: pd.DataFrame) -> pd.DataFrame:
|
||||
Returns:
|
||||
DataFrame with flattened args columns and original 'args' column removed
|
||||
"""
|
||||
# Extract args into separate DataFrame
|
||||
args_data = []
|
||||
for idx, row in df.iterrows():
|
||||
args = row.get("args", {})
|
||||
if isinstance(args, dict):
|
||||
args_data.append(args)
|
||||
else:
|
||||
args_data.append({})
|
||||
args_list = df["args"].tolist()
|
||||
args_data = [arg if isinstance(arg, dict) else {} for arg in args_list]
|
||||
|
||||
if args_data:
|
||||
args_df = pd.DataFrame(args_data)
|
||||
@@ -222,6 +224,42 @@ def _flatten_args(df: pd.DataFrame) -> pd.DataFrame:
|
||||
return df
|
||||
|
||||
|
||||
def _remove_cmake_artifacts(file_path: str) -> str:
|
||||
"""
|
||||
Remove CMake build artifacts from a file path.
|
||||
|
||||
CMake creates build directories with the pattern:
|
||||
<build-dir>/<source-path>/CMakeFiles/<target>.dir/<source-file>
|
||||
|
||||
This function removes the CMakeFiles and .dir segments to reconstruct
|
||||
the logical source file path while preserving the build directory prefix.
|
||||
|
||||
Args:
|
||||
file_path: Path potentially containing CMake artifacts
|
||||
|
||||
Returns:
|
||||
Path with CMakeFiles and .dir segments removed
|
||||
|
||||
Examples:
|
||||
>>> _remove_cmake_artifacts('build/library/src/foo/CMakeFiles/target.dir/bar.cpp')
|
||||
'build/library/src/foo/bar.cpp'
|
||||
>>> _remove_cmake_artifacts('library/src/foo/bar.cpp')
|
||||
'library/src/foo/bar.cpp'
|
||||
"""
|
||||
path = Path(file_path)
|
||||
parts = path.parts
|
||||
|
||||
# Filter out CMakeFiles and any parts ending with .dir
|
||||
filtered_parts = [
|
||||
part for part in parts if part != "CMakeFiles" and not part.endswith(".dir")
|
||||
]
|
||||
|
||||
# Reconstruct the path
|
||||
if filtered_parts:
|
||||
return str(Path(*filtered_parts))
|
||||
return file_path
|
||||
|
||||
|
||||
def _normalize_source_path(file_path: str) -> str:
|
||||
"""
|
||||
Normalize a source file path to be relative to composable_kernel if present.
|
||||
@@ -287,13 +325,17 @@ def get_metadata(df: pd.DataFrame) -> FileMetadata:
|
||||
>>> print(f"Duration: {metadata.total_wall_time_s:.2f}s")
|
||||
>>> print(f"Started: {metadata.wall_start_datetime}")
|
||||
"""
|
||||
# Extract beginningOfTime from DataFrame attributes
|
||||
# Extract beginningOfTime and source_file from DataFrame attributes
|
||||
beginning_of_time = None
|
||||
source_file = None
|
||||
if hasattr(df, "attrs"):
|
||||
beginning_of_time = df.attrs.get("beginningOfTime")
|
||||
source_file = df.attrs.get("source_file")
|
||||
|
||||
# Initialize metadata with beginningOfTime from JSON structure
|
||||
metadata = FileMetadata(beginning_of_time=beginning_of_time)
|
||||
# Initialize metadata with values from DataFrame attributes
|
||||
metadata = FileMetadata(
|
||||
beginning_of_time=beginning_of_time, source_file=source_file
|
||||
)
|
||||
|
||||
# Process events to extract ExecuteCompiler timing information
|
||||
if "name" in df.columns:
|
||||
@@ -306,8 +348,13 @@ def get_metadata(df: pd.DataFrame) -> FileMetadata:
|
||||
if "dur" in event:
|
||||
metadata.execute_compiler_dur = event["dur"]
|
||||
|
||||
# Process events to find the main source file being compiled
|
||||
if "name" in df.columns and "arg_detail" in df.columns:
|
||||
# Fallback: Try to find source file from ParseDeclarationOrFunctionDefinition events
|
||||
# This is only used if source_file wasn't already set from the filename
|
||||
if (
|
||||
metadata.source_file is None
|
||||
and "name" in df.columns
|
||||
and "arg_detail" in df.columns
|
||||
):
|
||||
# Look for ParseDeclarationOrFunctionDefinition events with .cpp or .c files
|
||||
source_extensions = (".cpp", ".cc", ".cxx", ".c")
|
||||
parse_events = df[df["name"] == "ParseDeclarationOrFunctionDefinition"]
|
||||
|
||||
292
script/analyze_build/trace_analysis/pipeline.py
Normal file
292
script/analyze_build/trace_analysis/pipeline.py
Normal file
@@ -0,0 +1,292 @@
|
||||
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
"""
|
||||
Functional pipeline for parallel processing of trace files.
|
||||
|
||||
This module provides a fluent API for building data processing pipelines with
|
||||
support for parallel execution, progress tracking, and multiple output branches.
|
||||
|
||||
Example:
|
||||
>>> from trace_analysis import Pipeline, find_trace_files
|
||||
>>> from trace_analysis.parse_file import parse_file
|
||||
>>>
|
||||
>>> files = find_trace_files(Path("build"))
|
||||
>>> dfs = Pipeline(files).map(parse_file, workers=8).collect()
|
||||
"""
|
||||
|
||||
from typing import Any, Callable, List, Optional, Tuple, Union
|
||||
from multiprocessing import Pool, cpu_count
|
||||
from tqdm.auto import tqdm
|
||||
|
||||
|
||||
class Pipeline:
|
||||
"""
|
||||
Functional pipeline for processing data with parallel execution support.
|
||||
|
||||
Provides a fluent API for chaining operations like map, filter, and reduce.
|
||||
Supports parallel processing with multiprocessing and progress tracking with tqdm.
|
||||
|
||||
Features:
|
||||
- Fluent API with method chaining
|
||||
- Parallel processing with configurable worker count
|
||||
- Progress bars in Jupyter notebooks (tqdm)
|
||||
- Fail-fast error handling
|
||||
- In-memory processing for speed
|
||||
- Tee operation for branching into multiple outputs
|
||||
|
||||
Attributes:
|
||||
_items: Current list of items in the pipeline
|
||||
_is_reduced: Flag indicating if pipeline has been reduced to single value
|
||||
|
||||
Example:
|
||||
Basic parallel processing:
|
||||
>>> files = find_trace_files(Path("build"))
|
||||
>>> dfs = Pipeline(files).map(parse_file, workers=8).collect()
|
||||
|
||||
Multi-stage pipeline:
|
||||
>>> results = (
|
||||
... Pipeline(files)
|
||||
... .map(parse_file, workers=8)
|
||||
... .filter(lambda df: len(df) > 1000)
|
||||
... .collect()
|
||||
... )
|
||||
|
||||
Multiple outputs with tee:
|
||||
>>> pipeline = Pipeline(files).map(parse_file, workers=8)
|
||||
>>> all_events, metadata, stats = pipeline.tee(
|
||||
... lambda dfs: pd.concat(dfs, ignore_index=True),
|
||||
... lambda dfs: [get_metadata(df) for df in dfs],
|
||||
... lambda dfs: {"count": len(dfs)}
|
||||
... )
|
||||
"""
|
||||
|
||||
def __init__(self, items: List[Any]):
|
||||
"""
|
||||
Initialize a new pipeline with a list of items.
|
||||
|
||||
Args:
|
||||
items: Initial list of items to process
|
||||
"""
|
||||
self._items = items
|
||||
self._is_reduced = False
|
||||
|
||||
def map(
|
||||
self,
|
||||
func: Callable[[Any], Any],
|
||||
workers: Optional[int] = None,
|
||||
desc: Optional[str] = None,
|
||||
) -> "Pipeline":
|
||||
"""
|
||||
Apply a function to each item in the pipeline.
|
||||
|
||||
Args:
|
||||
func: Function to apply to each item. Should accept a single argument
|
||||
and return a transformed value.
|
||||
workers: Number of parallel workers to use:
|
||||
- None: Sequential processing (single-threaded)
|
||||
- -1: Use all available CPUs
|
||||
- N > 0: Use N worker processes
|
||||
desc: Description for the progress bar. If None, uses a default description.
|
||||
|
||||
Returns:
|
||||
Self for method chaining
|
||||
|
||||
Raises:
|
||||
ValueError: If pipeline has already been reduced
|
||||
Exception: Any exception raised by func is re-raised with context
|
||||
|
||||
Example:
|
||||
>>> # Sequential processing
|
||||
>>> Pipeline(files).map(parse_file).collect()
|
||||
>>>
|
||||
>>> # Parallel processing with all CPUs
|
||||
>>> Pipeline(files).map(parse_file, workers=-1).collect()
|
||||
>>>
|
||||
>>> # Parallel with custom worker count and description
|
||||
>>> Pipeline(files).map(parse_file, workers=8, desc="Parsing").collect()
|
||||
"""
|
||||
if self._is_reduced:
|
||||
raise ValueError("Cannot map after reduce operation")
|
||||
|
||||
if not self._items:
|
||||
return self
|
||||
|
||||
# Determine worker count
|
||||
if workers == -1:
|
||||
workers = cpu_count()
|
||||
|
||||
# Set default description
|
||||
if desc is None:
|
||||
desc = "Processing items"
|
||||
|
||||
# Sequential processing
|
||||
if workers is None or workers == 1:
|
||||
results = []
|
||||
for item in tqdm(self._items, desc=desc):
|
||||
try:
|
||||
results.append(func(item))
|
||||
except Exception as e:
|
||||
raise type(e)(f"Error processing item {item}: {e}") from e
|
||||
self._items = results
|
||||
return self
|
||||
|
||||
# Parallel processing
|
||||
try:
|
||||
with Pool(processes=workers) as pool:
|
||||
# Use imap_unordered for better performance (results as they complete)
|
||||
# Wrap with tqdm for progress tracking
|
||||
results = list(
|
||||
tqdm(
|
||||
pool.imap_unordered(func, self._items),
|
||||
total=len(self._items),
|
||||
desc=desc,
|
||||
)
|
||||
)
|
||||
self._items = results
|
||||
return self
|
||||
except Exception as e:
|
||||
# Re-raise with context
|
||||
raise type(e)(f"Error in parallel map operation: {e}") from e
|
||||
|
||||
def filter(self, predicate: Callable[[Any], bool]) -> "Pipeline":
|
||||
"""
|
||||
Filter items based on a predicate function.
|
||||
|
||||
Args:
|
||||
predicate: Function that returns True for items to keep, False to discard.
|
||||
Should accept a single argument and return a boolean.
|
||||
|
||||
Returns:
|
||||
Self for method chaining
|
||||
|
||||
Raises:
|
||||
ValueError: If pipeline has already been reduced
|
||||
|
||||
Example:
|
||||
>>> # Keep only large DataFrames
|
||||
>>> Pipeline(dfs).filter(lambda df: len(df) > 1000).collect()
|
||||
>>>
|
||||
>>> # Keep only successful builds
|
||||
>>> Pipeline(dfs).filter(
|
||||
... lambda df: 'ExecuteCompiler' in df['name'].values
|
||||
... ).collect()
|
||||
"""
|
||||
if self._is_reduced:
|
||||
raise ValueError("Cannot filter after reduce operation")
|
||||
|
||||
self._items = [item for item in self._items if predicate(item)]
|
||||
return self
|
||||
|
||||
def reduce(self, func: Callable[[List[Any]], Any]) -> "Pipeline":
|
||||
"""
|
||||
Reduce all items to a single value using an aggregation function.
|
||||
|
||||
After reduction, the pipeline contains a single value and no further
|
||||
map or filter operations are allowed.
|
||||
|
||||
Args:
|
||||
func: Aggregation function that accepts a list of all items and
|
||||
returns a single aggregated value.
|
||||
|
||||
Returns:
|
||||
Self for method chaining
|
||||
|
||||
Raises:
|
||||
ValueError: If pipeline has already been reduced
|
||||
|
||||
Example:
|
||||
>>> # Concatenate all DataFrames
|
||||
>>> Pipeline(dfs).reduce(
|
||||
... lambda dfs: pd.concat(dfs, ignore_index=True)
|
||||
... ).collect()
|
||||
>>>
|
||||
>>> # Sum all values
|
||||
>>> Pipeline(numbers).reduce(sum).collect()
|
||||
>>>
|
||||
>>> # Custom aggregation
|
||||
>>> Pipeline(dfs).reduce(
|
||||
... lambda dfs: {
|
||||
... "total_files": len(dfs),
|
||||
... "total_events": sum(len(df) for df in dfs)
|
||||
... }
|
||||
... ).collect()
|
||||
"""
|
||||
if self._is_reduced:
|
||||
raise ValueError("Cannot reduce twice")
|
||||
|
||||
try:
|
||||
self._items = [func(self._items)]
|
||||
self._is_reduced = True
|
||||
return self
|
||||
except Exception as e:
|
||||
raise type(e)(f"Error in reduce operation: {e}") from e
|
||||
|
||||
def tee(self, *funcs: Callable[[List[Any]], Any]) -> Tuple[Any, ...]:
|
||||
"""
|
||||
Branch the pipeline into multiple outputs.
|
||||
|
||||
Each function receives the full list of current items and produces
|
||||
an independent output. This is useful for generating multiple
|
||||
aggregations or analyses from the same data.
|
||||
|
||||
This operation automatically collects the pipeline results.
|
||||
|
||||
Args:
|
||||
*funcs: Variable number of functions, each accepting the full list
|
||||
of items and returning a result. Each function is applied
|
||||
independently to the same input data.
|
||||
|
||||
Returns:
|
||||
Tuple of results, one per function, in the same order as the functions
|
||||
|
||||
Raises:
|
||||
ValueError: If no functions are provided
|
||||
Exception: Any exception raised by a function is re-raised with context
|
||||
|
||||
Example:
|
||||
>>> pipeline = Pipeline(files).map(parse_file, workers=8)
|
||||
>>>
|
||||
>>> # Create three different outputs from the same data
|
||||
>>> all_events, metadata_df, stats = pipeline.tee(
|
||||
... # Output 1: Concatenated DataFrame
|
||||
... lambda dfs: pd.concat(dfs, ignore_index=True),
|
||||
... # Output 2: Metadata summary
|
||||
... lambda dfs: pd.DataFrame([get_metadata(df).__dict__ for df in dfs]),
|
||||
... # Output 3: Statistics dictionary
|
||||
... lambda dfs: {
|
||||
... "total_files": len(dfs),
|
||||
... "total_events": sum(len(df) for df in dfs)
|
||||
... }
|
||||
... )
|
||||
"""
|
||||
if not funcs:
|
||||
raise ValueError("At least one function must be provided to tee")
|
||||
|
||||
results = []
|
||||
for i, func in enumerate(funcs):
|
||||
try:
|
||||
results.append(func(self._items))
|
||||
except Exception as e:
|
||||
raise type(e)(f"Error in tee function {i}: {e}") from e
|
||||
|
||||
return tuple(results)
|
||||
|
||||
def collect(self) -> Union[List[Any], Any]:
|
||||
"""
|
||||
Execute the pipeline and return the results.
|
||||
|
||||
Returns:
|
||||
If the pipeline has been reduced, returns the single reduced value.
|
||||
Otherwise, returns the list of items.
|
||||
|
||||
Example:
|
||||
>>> # Returns list of DataFrames
|
||||
>>> dfs = Pipeline(files).map(parse_file, workers=8).collect()
|
||||
>>>
|
||||
>>> # Returns single concatenated DataFrame
|
||||
>>> df = Pipeline(files).map(parse_file, workers=8).reduce(pd.concat).collect()
|
||||
"""
|
||||
if self._is_reduced:
|
||||
return self._items[0]
|
||||
return self._items
|
||||
Reference in New Issue
Block a user