Files
composable_kernel/script/dependency-parser/src/cmake_dependency_analyzer.py
Yaswanth Raparti 3cc95874f9 [rocm-libraries] ROCm/rocm-libraries#6912 (commit c705da2)
[CK] Reduce per-file logging in cmake_dependency_analyzer (#6912)

## Motivation

Current progress_callback function generates large volume of prints
which creates noise in seeing actual CI failure logs.
Only emit a progress line at the completion of each stage to avoid
massive logs from the per-source-file extracting_dependencies callback.

## Technical Details

Update the `progress` function to print only at the completion of each
stage.
https://github.com/ROCm/rocm-libraries/pull/6912/changes#diff-15971b83c7dfefb48fd788507a923017d93bbd9487ed6aeb414ad2c5e00be934R720

## Test Plan

to be tested in CI

## Test Result

to be tested in CI

## Submission Checklist

- [x ] Look over the contributing guidelines at
https://github.com/ROCm/ROCm/blob/develop/CONTRIBUTING.md#pull-requests.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-01 08:12:42 +00:00

756 lines
25 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
CMake Dependency Analyzer
Pre-build dependency analysis using compile_commands.json and clang -MM.
This approach extracts header dependencies without requiring a full build,
enabling selective test building in CI pipelines.
Key Features:
- Parses compile_commands.json generated by CMake at configure time
- Uses clang/amdclang -MM to extract header dependencies (preprocessing only)
- Parses build.ninja for target -> source mappings
- Outputs dependency_mapping.json compatible with selective_test_filter.py
"""
import hashlib
import json
import os
import re
import shlex
import subprocess
import tempfile
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Dict, List, Optional, Set
class CompileCommandsParser:
"""Parses compile_commands.json generated by CMake."""
def __init__(self, compile_commands_path: str):
"""Initialize parser with path to compile_commands.json.
Args:
compile_commands_path: Path to compile_commands.json file
"""
self.compile_commands_path = compile_commands_path
def parse(self, extensions: Optional[List[str]] = None) -> List[Dict]:
"""Parse compile_commands.json and return list of compile commands.
Args:
extensions: Optional list of file extensions to filter by (e.g., ['.cpp', '.cc'])
Returns:
List of compile command dictionaries with 'file', 'directory', and 'command' keys
Raises:
FileNotFoundError: If compile_commands.json doesn't exist
json.JSONDecodeError: If file contains invalid JSON
"""
if not os.path.exists(self.compile_commands_path):
raise FileNotFoundError(
f"compile_commands.json not found: {self.compile_commands_path}"
)
with open(self.compile_commands_path, "r") as f:
commands = json.load(f)
# Normalize commands to always have 'command' key (not 'arguments')
normalized = []
for cmd in commands:
# Handle 'arguments' format (convert to 'command' string)
if "arguments" in cmd and "command" not in cmd:
cmd["command"] = " ".join(shlex.quote(arg) for arg in cmd["arguments"])
# Filter by extension if specified
if extensions:
file_ext = os.path.splitext(cmd["file"])[1]
if file_ext not in extensions:
continue
normalized.append(cmd)
return normalized
class DependencyExtractor:
"""Extracts header dependencies using clang -MM."""
def __init__(self, parallel_workers: int = 1, timeout: int = 30):
"""Initialize dependency extractor.
Args:
parallel_workers: Number of parallel workers for extraction
timeout: Timeout in seconds for each clang -MM call
"""
self.parallel_workers = parallel_workers
self.timeout = timeout
self._temp_dir = None
def convert_to_dependency_command(
self, compile_command: str, deps_output_file: str
) -> List[str]:
"""Convert a compile command to a dependency extraction command.
Replaces -c with -MM and removes -o output specification.
Args:
compile_command: Original compile command string
deps_output_file: Path to write dependency output
Returns:
Modified command as a list of arguments for dependency extraction
"""
parts = shlex.split(compile_command)
new_parts = []
skip_next = False
for i, part in enumerate(parts):
if skip_next:
skip_next = False
continue
# Skip -c (compile flag)
if part == "-c":
continue
# Skip -o and its argument (output file)
if part == "-o":
skip_next = True
continue
# Skip standalone .o files that might appear
if part.endswith(".o") and not part.startswith("-"):
continue
new_parts.append(part)
# Insert -MM and -MF flags after the compiler
if new_parts:
compiler = new_parts[0]
rest = new_parts[1:]
new_parts = [compiler, "-MM", "-MF", deps_output_file] + rest
return new_parts
def parse_makefile_deps(self, deps_content: str) -> List[str]:
"""Parse makefile-style dependency output from clang -MM.
Args:
deps_content: Content of .d file generated by clang -MM
Returns:
List of dependency file paths (excluding the target .o file)
"""
if not deps_content.strip():
return []
# Join continuation lines and split on whitespace
content = deps_content.replace("\\\n", " ").replace("\\\r\n", " ")
# Find the colon separating target from dependencies
colon_pos = content.find(":")
if colon_pos == -1:
return []
# Everything after the colon is dependencies
deps_part = content[colon_pos + 1 :]
# Split on whitespace and filter empty strings
deps = [d.strip() for d in deps_part.split() if d.strip()]
return deps
def _get_deps_file(self, source_file: str) -> str:
"""Get a temporary file path for dependency output.
Args:
source_file: Source file being analyzed
Returns:
Path to temporary .d file
"""
if self._temp_dir is None:
self._temp_dir = tempfile.mkdtemp(prefix="ck_deps_")
basename = os.path.basename(source_file)
return os.path.join(self._temp_dir, f"{basename}.d")
def extract(
self, directory: str, compile_command: str, source_file: str
) -> List[str]:
"""Extract dependencies for a single source file.
Args:
directory: Working directory for compilation
compile_command: Original compile command
source_file: Source file to analyze
Returns:
List of dependency file paths, or empty list on error
"""
deps_file = self._get_deps_file(source_file)
try:
dep_command = self.convert_to_dependency_command(compile_command, deps_file)
# Run the dependency extraction command
# Note: Use errors='replace' to handle non-UTF8 output from AMD clang
result = subprocess.run(
dep_command,
cwd=directory,
capture_output=True,
text=True,
errors="replace",
timeout=self.timeout,
)
if result.returncode != 0:
return []
# Parse the generated .d file
if os.path.exists(deps_file):
with open(deps_file, "r", errors="replace") as f:
deps_content = f.read()
return self.parse_makefile_deps(deps_content)
return []
except subprocess.TimeoutExpired:
return []
except Exception:
return []
finally:
# Clean up temp file
if os.path.exists(deps_file):
try:
os.unlink(deps_file)
except OSError:
pass
def extract_batch(
self, commands: List[Dict], progress_callback=None
) -> Dict[str, List[str]]:
"""Extract dependencies for multiple source files.
Args:
commands: List of compile command dictionaries
progress_callback: Optional callback(current, total) for progress reporting
Returns:
Dictionary mapping source files to their dependencies
"""
source_to_deps = {}
total = len(commands)
if self.parallel_workers <= 1:
# Serial execution
for i, cmd in enumerate(commands):
deps = self.extract(cmd["directory"], cmd["command"], cmd["file"])
source_to_deps[cmd["file"]] = deps
if progress_callback:
progress_callback(i + 1, total)
else:
# Parallel execution
with ProcessPoolExecutor(max_workers=self.parallel_workers) as executor:
futures = {
executor.submit(
self.extract, cmd["directory"], cmd["command"], cmd["file"]
): cmd["file"]
for cmd in commands
}
completed = 0
for future in as_completed(futures):
source_file = futures[future]
try:
deps = future.result()
source_to_deps[source_file] = deps
except Exception:
source_to_deps[source_file] = []
completed += 1
if progress_callback:
progress_callback(completed, total)
return source_to_deps
class NinjaTargetParser:
"""Parses ninja build files to get target mappings."""
def __init__(self, ninja_file_path: str):
"""Initialize parser with path to build.ninja.
Args:
ninja_file_path: Path to build.ninja file
"""
self.ninja_file_path = ninja_file_path
def parse_executable_mappings(self) -> Dict[str, List[str]]:
"""Parse executable -> object file mappings from build.ninja.
Returns:
Dictionary mapping executable paths to lists of object files
"""
if not os.path.exists(self.ninja_file_path):
return {}
exe_to_objects = {}
# Pattern to match executable build rules
# Example: build bin/test_gemm: CXX_EXECUTABLE_LINKER__test_gemm test.o lib.o | deps
exe_pattern = re.compile(r"^build\s+(bin/[^:]+):\s+\S+\s+([^|]+)")
with open(self.ninja_file_path, "r") as f:
for line in f:
match = exe_pattern.match(line)
if match:
exe = match.group(1)
deps_part = match.group(2).strip()
# Extract object files (ending in .o, not starting with /)
object_files = []
for dep in deps_part.split():
if dep.endswith(".o") and not dep.startswith("/"):
object_files.append(dep)
if object_files:
exe_to_objects[exe] = object_files
return exe_to_objects
def parse_object_to_source(self) -> Dict[str, str]:
"""Parse object -> source file mappings from build.ninja.
Returns:
Dictionary mapping object file paths to source file paths
"""
if not os.path.exists(self.ninja_file_path):
return {}
obj_to_source = {}
# Pattern to match object compilation rules
# Example: build test/test.cpp.o: CXX_COMPILER__target /src/test.cpp
obj_pattern = re.compile(
r"^build\s+([^:]+\.(?:cpp|cc|cu|hip)\.o):\s+\S+\s+(\S+)"
)
with open(self.ninja_file_path, "r") as f:
for line in f:
match = obj_pattern.match(line)
if match:
obj_file = match.group(1)
source_file = match.group(2)
obj_to_source[obj_file] = source_file
return obj_to_source
class DependencyMapper:
"""Builds file -> executable dependency mappings."""
def __init__(self, workspace_root: Optional[str] = None):
"""Initialize dependency mapper.
Args:
workspace_root: Root directory of the workspace for path normalization
"""
self.workspace_root = workspace_root
if workspace_root:
self.workspace_root = os.path.abspath(workspace_root).rstrip("/") + "/"
def normalize_path(self, path: str) -> str:
"""Normalize a file path relative to workspace root.
Args:
path: File path to normalize
Returns:
Normalized relative path
"""
if self.workspace_root and path.startswith(self.workspace_root):
return path[len(self.workspace_root) :]
return path
def is_project_file(self, file_path: str) -> bool:
"""Check if a file is part of the project (not a system file).
Args:
file_path: File path to check
Returns:
True if file is a project file, False if system file
"""
# Exclude system files
system_prefixes = ["/usr/", "/opt/rocm", "/lib/", "/system/", "/local/"]
if any(file_path.startswith(prefix) for prefix in system_prefixes):
return False
# Project directory prefixes
project_dirs = [
"include/",
"library/",
"test/",
"example/",
"src/",
"profiler/",
"build/include/",
"build/_deps/gtest",
"client_example",
"codegen",
"tile_engine",
"dispatcher",
"experimental",
"tutorial",
]
if any(file_path.startswith(prefix) for prefix in project_dirs):
return True
# Also check monorepo-style paths
if any(
file_path.startswith(f"projects/composablekernel/{prefix}")
for prefix in project_dirs
):
return True
# Include files with common source/header extensions
if file_path.endswith(
(".cpp", ".hpp", ".h", ".c", ".cc", ".cxx", ".cu", ".hip", ".inc")
):
return True
return False
def build_mapping(
self,
exe_to_objects: Dict[str, List[str]],
obj_to_source: Dict[str, str],
source_to_deps: Dict[str, List[str]],
) -> Dict[str, Set[str]]:
"""Build file -> executable mapping from component mappings.
Args:
exe_to_objects: Executable -> object files mapping
obj_to_source: Object file -> source file mapping
source_to_deps: Source file -> dependency files mapping
Returns:
Dictionary mapping file paths to sets of executables
"""
file_to_exes: Dict[str, Set[str]] = defaultdict(set)
for exe, object_files in exe_to_objects.items():
for obj_file in object_files:
source_file = obj_to_source.get(obj_file)
if not source_file:
continue
deps = source_to_deps.get(source_file, [])
for dep_file in deps:
# Normalize and filter
normalized = self.normalize_path(dep_file)
if self.is_project_file(normalized):
file_to_exes[normalized].add(exe)
return dict(file_to_exes)
class CMakeDependencyAnalyzer:
"""Main analyzer class combining all components."""
def __init__(
self,
compile_commands_path: Optional[str],
ninja_path: Optional[str],
workspace_root: str,
parallel_workers: int = 8,
):
"""Initialize the analyzer.
Args:
compile_commands_path: Path to compile_commands.json
ninja_path: Path to build.ninja
workspace_root: Root directory of the workspace
parallel_workers: Number of parallel workers for dependency extraction
"""
self.compile_commands_path = compile_commands_path
self.ninja_path = ninja_path
self.workspace_root = workspace_root
self.parallel_workers = parallel_workers
# Results
self.file_to_executables: Dict[str, Set[str]] = {}
self.executable_to_files: Dict[str, Set[str]] = {}
def calculate_input_hash(self) -> str:
"""Calculate hash of input files to detect when cache should be invalidated.
Returns:
SHA256 hash string representing the current state of input files
"""
hasher = hashlib.sha256()
# Hash compile_commands.json modification time and size
if self.compile_commands_path and os.path.exists(self.compile_commands_path):
stat = os.stat(self.compile_commands_path)
hasher.update(f"{stat.st_mtime}:{stat.st_size}".encode())
# Hash build.ninja modification time and size
if self.ninja_path and os.path.exists(self.ninja_path):
stat = os.stat(self.ninja_path)
hasher.update(f"{stat.st_mtime}:{stat.st_size}".encode())
# Hash compiler version (first compiler found in compile_commands.json)
if self.compile_commands_path and os.path.exists(self.compile_commands_path):
try:
with open(self.compile_commands_path, "r") as f:
commands = json.load(f)
if commands:
# Extract first compiler command
cmd = commands[0].get("command", "")
if cmd:
compiler = shlex.split(cmd)[0] if cmd else ""
if os.path.exists(compiler):
# Get compiler version
result = subprocess.run(
[compiler, "--version"],
capture_output=True,
text=True,
timeout=5,
)
hasher.update(result.stdout.encode())
except (json.JSONDecodeError, subprocess.TimeoutExpired, Exception):
pass
return hasher.hexdigest()
def should_regenerate_cache(self, cache_file: str) -> bool:
"""Check if dependency cache needs to be regenerated.
Args:
cache_file: Path to the cached dependency mapping JSON
Returns:
True if cache should be regenerated, False if cache is valid
"""
if not os.path.exists(cache_file):
return True
try:
# Load cached metadata
with open(cache_file, "r") as f:
data = json.load(f)
cached_hash = data.get("input_hash")
if not cached_hash:
return True
# Calculate current hash and compare
current_hash = self.calculate_input_hash()
return current_hash != cached_hash
except (json.JSONDecodeError, KeyError):
# Corrupted cache or old format
return True
def analyze(self, progress_callback=None):
"""Run the full dependency analysis.
Args:
progress_callback: Optional callback(phase, current, total) for progress
Raises:
ValueError: If compile_commands_path or ninja_path is None
"""
# Validate required paths
if self.compile_commands_path is None:
raise ValueError(
"compile_commands_path is required for analysis but was None"
)
if self.ninja_path is None:
raise ValueError("ninja_path is required for analysis but was None")
# Phase 1: Parse compile commands
if progress_callback:
progress_callback("parsing_compile_commands", 0, 1)
cc_parser = CompileCommandsParser(self.compile_commands_path)
commands = cc_parser.parse(extensions=[".cpp", ".cc", ".cu", ".hip"])
if progress_callback:
progress_callback("parsing_compile_commands", 1, 1)
# Phase 2: Extract dependencies
extractor = DependencyExtractor(parallel_workers=self.parallel_workers)
def dep_progress(current, total):
if progress_callback:
progress_callback("extracting_dependencies", current, total)
source_to_deps = extractor.extract_batch(
commands, progress_callback=dep_progress
)
# Phase 3: Parse ninja target mappings
if progress_callback:
progress_callback("parsing_ninja", 0, 1)
ninja_parser = NinjaTargetParser(self.ninja_path)
exe_to_objects = ninja_parser.parse_executable_mappings()
obj_to_source = ninja_parser.parse_object_to_source()
if progress_callback:
progress_callback("parsing_ninja", 1, 1)
# Phase 4: Build dependency mapping
if progress_callback:
progress_callback("building_mapping", 0, 1)
mapper = DependencyMapper(workspace_root=self.workspace_root)
self.file_to_executables = mapper.build_mapping(
exe_to_objects, obj_to_source, source_to_deps
)
# Build reverse mapping
self.executable_to_files = defaultdict(set)
for file_path, exes in self.file_to_executables.items():
for exe in exes:
self.executable_to_files[exe].add(file_path)
self.executable_to_files = dict(self.executable_to_files)
if progress_callback:
progress_callback("building_mapping", 1, 1)
def calculate_statistics(self) -> Dict:
"""Calculate statistics about the dependency mapping.
Returns:
Dictionary with statistics
"""
return {
"total_files": len(self.file_to_executables),
"total_executables": len(self.executable_to_files),
"files_with_multiple_executables": sum(
1 for exes in self.file_to_executables.values() if len(exes) > 1
),
}
def export_to_json(self, output_path: str):
"""Export dependency mapping to JSON file.
The output format is compatible with selective_test_filter.py.
Args:
output_path: Path to write JSON output
"""
# Convert sets to sorted lists for JSON serialization
data = {
"file_to_executables": {
f: sorted(exes) for f, exes in self.file_to_executables.items()
},
"executable_to_files": {
exe: sorted(files) for exe, files in self.executable_to_files.items()
},
"statistics": self.calculate_statistics(),
"repo": {
"type": "cmake_prebuild",
"workspace_root": self.workspace_root,
},
"input_hash": self.calculate_input_hash(),
}
with open(output_path, "w") as f:
json.dump(data, f, indent=2)
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(
description="CMake-based dependency analyzer for pre-build test selection"
)
parser.add_argument(
"compile_commands",
help="Path to compile_commands.json",
)
parser.add_argument(
"build_ninja",
help="Path to build.ninja",
)
parser.add_argument(
"--workspace-root",
default=".",
help="Workspace root directory (default: current directory)",
)
parser.add_argument(
"--output",
default="cmake_dependency_mapping.json",
help="Output JSON file (default: cmake_dependency_mapping.json)",
)
parser.add_argument(
"--parallel",
type=int,
default=8,
help="Number of parallel workers (default: 8)",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress progress output",
)
parser.add_argument(
"--force",
action="store_true",
help="Force regeneration even if cache is valid",
)
args = parser.parse_args()
def progress(phase, current, total):
if not args.quiet and current == total:
print(f"[{phase}] {current}/{total}")
analyzer = CMakeDependencyAnalyzer(
compile_commands_path=args.compile_commands,
ninja_path=args.build_ninja,
workspace_root=args.workspace_root,
parallel_workers=args.parallel,
)
# Check if cache needs regeneration
if not args.force and not analyzer.should_regenerate_cache(args.output):
print("Cache is valid, skipping analysis. Use --force to regenerate.")
print(f"Using cached results from {args.output}")
return
if not args.force and os.path.exists(args.output):
print("Cache invalid or outdated, regenerating dependencies...")
print(f"Analyzing dependencies from {args.compile_commands}...")
analyzer.analyze(progress_callback=progress)
print(f"\nExporting to {args.output}...")
analyzer.export_to_json(args.output)
stats = analyzer.calculate_statistics()
print("\nResults:")
print(f" Total files: {stats['total_files']}")
print(f" Total executables: {stats['total_executables']}")
print(
f" Files with multiple executables: {stats['files_with_multiple_executables']}"
)
if __name__ == "__main__":
main()