#!/usr/bin/env python3 # Copyright (c) Advanced Micro Devices, Inc., or its affiliates. # SPDX-License-Identifier: MIT """ CMake Dependency Analyzer Pre-build dependency analysis using compile_commands.json and clang -MM. This approach extracts header dependencies without requiring a full build, enabling selective test building in CI pipelines. Key Features: - Parses compile_commands.json generated by CMake at configure time - Uses clang/amdclang -MM to extract header dependencies (preprocessing only) - Parses build.ninja for target -> source mappings - Outputs dependency_mapping.json compatible with selective_test_filter.py """ import hashlib import json import os import re import shlex import subprocess import sys import tempfile from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from typing import Dict, List, Optional, Set, Tuple class CompileCommandsParser: """Parses compile_commands.json generated by CMake.""" def __init__(self, compile_commands_path: str): """Initialize parser with path to compile_commands.json. Args: compile_commands_path: Path to compile_commands.json file """ self.compile_commands_path = compile_commands_path def parse(self, extensions: Optional[List[str]] = None) -> List[Dict]: """Parse compile_commands.json and return list of compile commands. Args: extensions: Optional list of file extensions to filter by (e.g., ['.cpp', '.cc']) Returns: List of compile command dictionaries with 'file', 'directory', and 'command' keys Raises: FileNotFoundError: If compile_commands.json doesn't exist json.JSONDecodeError: If file contains invalid JSON """ if not os.path.exists(self.compile_commands_path): raise FileNotFoundError(f"compile_commands.json not found: {self.compile_commands_path}") with open(self.compile_commands_path, "r") as f: commands = json.load(f) # Normalize commands to always have 'command' key (not 'arguments') normalized = [] for cmd in commands: # Handle 'arguments' format (convert to 'command' string) if "arguments" in cmd and "command" not in cmd: cmd["command"] = " ".join(shlex.quote(arg) for arg in cmd["arguments"]) # Filter by extension if specified if extensions: file_ext = os.path.splitext(cmd["file"])[1] if file_ext not in extensions: continue normalized.append(cmd) return normalized class DependencyExtractor: """Extracts header dependencies using clang -MM.""" def __init__(self, parallel_workers: int = 1, timeout: int = 30): """Initialize dependency extractor. Args: parallel_workers: Number of parallel workers for extraction timeout: Timeout in seconds for each clang -MM call """ self.parallel_workers = parallel_workers self.timeout = timeout self._temp_dir = None def convert_to_dependency_command(self, compile_command: str, deps_output_file: str) -> List[str]: """Convert a compile command to a dependency extraction command. Replaces -c with -MM and removes -o output specification. Args: compile_command: Original compile command string deps_output_file: Path to write dependency output Returns: Modified command as a list of arguments for dependency extraction """ parts = shlex.split(compile_command) new_parts = [] skip_next = False for i, part in enumerate(parts): if skip_next: skip_next = False continue # Skip -c (compile flag) if part == "-c": continue # Skip -o and its argument (output file) if part == "-o": skip_next = True continue # Skip standalone .o files that might appear if part.endswith(".o") and not part.startswith("-"): continue new_parts.append(part) # Insert -MM and -MF flags after the compiler if new_parts: compiler = new_parts[0] rest = new_parts[1:] new_parts = [compiler, "-MM", "-MF", deps_output_file] + rest return new_parts def parse_makefile_deps(self, deps_content: str) -> List[str]: """Parse makefile-style dependency output from clang -MM. Args: deps_content: Content of .d file generated by clang -MM Returns: List of dependency file paths (excluding the target .o file) """ if not deps_content.strip(): return [] # Join continuation lines and split on whitespace content = deps_content.replace("\\\n", " ").replace("\\\r\n", " ") # Find the colon separating target from dependencies colon_pos = content.find(":") if colon_pos == -1: return [] # Everything after the colon is dependencies deps_part = content[colon_pos + 1:] # Split on whitespace and filter empty strings deps = [d.strip() for d in deps_part.split() if d.strip()] return deps def _get_deps_file(self, source_file: str) -> str: """Get a temporary file path for dependency output. Args: source_file: Source file being analyzed Returns: Path to temporary .d file """ if self._temp_dir is None: self._temp_dir = tempfile.mkdtemp(prefix="ck_deps_") basename = os.path.basename(source_file) return os.path.join(self._temp_dir, f"{basename}.d") def extract(self, directory: str, compile_command: str, source_file: str) -> List[str]: """Extract dependencies for a single source file. Args: directory: Working directory for compilation compile_command: Original compile command source_file: Source file to analyze Returns: List of dependency file paths, or empty list on error """ deps_file = self._get_deps_file(source_file) try: dep_command = self.convert_to_dependency_command(compile_command, deps_file) # Run the dependency extraction command # Note: Use errors='replace' to handle non-UTF8 output from AMD clang result = subprocess.run( dep_command, cwd=directory, capture_output=True, text=True, errors='replace', timeout=self.timeout, ) if result.returncode != 0: return [] # Parse the generated .d file if os.path.exists(deps_file): with open(deps_file, "r", errors='replace') as f: deps_content = f.read() return self.parse_makefile_deps(deps_content) return [] except subprocess.TimeoutExpired: return [] except Exception: return [] finally: # Clean up temp file if os.path.exists(deps_file): try: os.unlink(deps_file) except OSError: pass def extract_batch( self, commands: List[Dict], progress_callback=None ) -> Dict[str, List[str]]: """Extract dependencies for multiple source files. Args: commands: List of compile command dictionaries progress_callback: Optional callback(current, total) for progress reporting Returns: Dictionary mapping source files to their dependencies """ source_to_deps = {} total = len(commands) if self.parallel_workers <= 1: # Serial execution for i, cmd in enumerate(commands): deps = self.extract(cmd["directory"], cmd["command"], cmd["file"]) source_to_deps[cmd["file"]] = deps if progress_callback: progress_callback(i + 1, total) else: # Parallel execution with ProcessPoolExecutor(max_workers=self.parallel_workers) as executor: futures = { executor.submit( self.extract, cmd["directory"], cmd["command"], cmd["file"] ): cmd["file"] for cmd in commands } completed = 0 for future in as_completed(futures): source_file = futures[future] try: deps = future.result() source_to_deps[source_file] = deps except Exception: source_to_deps[source_file] = [] completed += 1 if progress_callback: progress_callback(completed, total) return source_to_deps class NinjaTargetParser: """Parses ninja build files to get target mappings.""" def __init__(self, ninja_file_path: str): """Initialize parser with path to build.ninja. Args: ninja_file_path: Path to build.ninja file """ self.ninja_file_path = ninja_file_path def parse_executable_mappings(self) -> Dict[str, List[str]]: """Parse executable -> object file mappings from build.ninja. Returns: Dictionary mapping executable paths to lists of object files """ if not os.path.exists(self.ninja_file_path): return {} exe_to_objects = {} # Pattern to match executable build rules # Example: build bin/test_gemm: CXX_EXECUTABLE_LINKER__test_gemm test.o lib.o | deps exe_pattern = re.compile(r"^build\s+(bin/[^:]+):\s+\S+\s+([^|]+)") with open(self.ninja_file_path, "r") as f: for line in f: match = exe_pattern.match(line) if match: exe = match.group(1) deps_part = match.group(2).strip() # Extract object files (ending in .o, not starting with /) object_files = [] for dep in deps_part.split(): if dep.endswith(".o") and not dep.startswith("/"): object_files.append(dep) if object_files: exe_to_objects[exe] = object_files return exe_to_objects def parse_object_to_source(self) -> Dict[str, str]: """Parse object -> source file mappings from build.ninja. Returns: Dictionary mapping object file paths to source file paths """ if not os.path.exists(self.ninja_file_path): return {} obj_to_source = {} # Pattern to match object compilation rules # Example: build test/test.cpp.o: CXX_COMPILER__target /src/test.cpp obj_pattern = re.compile(r"^build\s+([^:]+\.(?:cpp|cc|cu|hip)\.o):\s+\S+\s+(\S+)") with open(self.ninja_file_path, "r") as f: for line in f: match = obj_pattern.match(line) if match: obj_file = match.group(1) source_file = match.group(2) obj_to_source[obj_file] = source_file return obj_to_source class DependencyMapper: """Builds file -> executable dependency mappings.""" def __init__(self, workspace_root: Optional[str] = None): """Initialize dependency mapper. Args: workspace_root: Root directory of the workspace for path normalization """ self.workspace_root = workspace_root if workspace_root: self.workspace_root = os.path.abspath(workspace_root).rstrip("/") + "/" def normalize_path(self, path: str) -> str: """Normalize a file path relative to workspace root. Args: path: File path to normalize Returns: Normalized relative path """ if self.workspace_root and path.startswith(self.workspace_root): return path[len(self.workspace_root):] return path def is_project_file(self, file_path: str) -> bool: """Check if a file is part of the project (not a system file). Args: file_path: File path to check Returns: True if file is a project file, False if system file """ # Exclude system files system_prefixes = ["/usr/", "/opt/rocm", "/lib/", "/system/", "/local/"] if any(file_path.startswith(prefix) for prefix in system_prefixes): return False # Project directory prefixes project_dirs = [ "include/", "library/", "test/", "example/", "src/", "profiler/", "build/include/", "build/_deps/gtest", "client_example", "codegen", "tile_engine", "dispatcher", "experimental", "tutorial", ] if any(file_path.startswith(prefix) for prefix in project_dirs): return True # Also check monorepo-style paths if any( file_path.startswith(f"projects/composablekernel/{prefix}") for prefix in project_dirs ): return True # Include files with common source/header extensions if file_path.endswith( (".cpp", ".hpp", ".h", ".c", ".cc", ".cxx", ".cu", ".hip", ".inc") ): return True return False def build_mapping( self, exe_to_objects: Dict[str, List[str]], obj_to_source: Dict[str, str], source_to_deps: Dict[str, List[str]], ) -> Dict[str, Set[str]]: """Build file -> executable mapping from component mappings. Args: exe_to_objects: Executable -> object files mapping obj_to_source: Object file -> source file mapping source_to_deps: Source file -> dependency files mapping Returns: Dictionary mapping file paths to sets of executables """ file_to_exes: Dict[str, Set[str]] = defaultdict(set) for exe, object_files in exe_to_objects.items(): for obj_file in object_files: source_file = obj_to_source.get(obj_file) if not source_file: continue deps = source_to_deps.get(source_file, []) for dep_file in deps: # Normalize and filter normalized = self.normalize_path(dep_file) if self.is_project_file(normalized): file_to_exes[normalized].add(exe) return dict(file_to_exes) class CMakeDependencyAnalyzer: """Main analyzer class combining all components.""" def __init__( self, compile_commands_path: Optional[str], ninja_path: Optional[str], workspace_root: str, parallel_workers: int = 8, ): """Initialize the analyzer. Args: compile_commands_path: Path to compile_commands.json ninja_path: Path to build.ninja workspace_root: Root directory of the workspace parallel_workers: Number of parallel workers for dependency extraction """ self.compile_commands_path = compile_commands_path self.ninja_path = ninja_path self.workspace_root = workspace_root self.parallel_workers = parallel_workers # Results self.file_to_executables: Dict[str, Set[str]] = {} self.executable_to_files: Dict[str, Set[str]] = {} def calculate_input_hash(self) -> str: """Calculate hash of input files to detect when cache should be invalidated. Returns: SHA256 hash string representing the current state of input files """ hasher = hashlib.sha256() # Hash compile_commands.json modification time and size if self.compile_commands_path and os.path.exists(self.compile_commands_path): stat = os.stat(self.compile_commands_path) hasher.update(f"{stat.st_mtime}:{stat.st_size}".encode()) # Hash build.ninja modification time and size if self.ninja_path and os.path.exists(self.ninja_path): stat = os.stat(self.ninja_path) hasher.update(f"{stat.st_mtime}:{stat.st_size}".encode()) # Hash compiler version (first compiler found in compile_commands.json) if self.compile_commands_path and os.path.exists(self.compile_commands_path): try: with open(self.compile_commands_path, "r") as f: commands = json.load(f) if commands: # Extract first compiler command cmd = commands[0].get("command", "") if cmd: compiler = shlex.split(cmd)[0] if cmd else "" if os.path.exists(compiler): # Get compiler version result = subprocess.run( [compiler, "--version"], capture_output=True, text=True, timeout=5, ) hasher.update(result.stdout.encode()) except (json.JSONDecodeError, subprocess.TimeoutExpired, Exception): pass return hasher.hexdigest() def should_regenerate_cache(self, cache_file: str) -> bool: """Check if dependency cache needs to be regenerated. Args: cache_file: Path to the cached dependency mapping JSON Returns: True if cache should be regenerated, False if cache is valid """ if not os.path.exists(cache_file): return True try: # Load cached metadata with open(cache_file, "r") as f: data = json.load(f) cached_hash = data.get("input_hash") if not cached_hash: return True # Calculate current hash and compare current_hash = self.calculate_input_hash() return current_hash != cached_hash except (json.JSONDecodeError, KeyError): # Corrupted cache or old format return True def analyze(self, progress_callback=None): """Run the full dependency analysis. Args: progress_callback: Optional callback(phase, current, total) for progress Raises: ValueError: If compile_commands_path or ninja_path is None """ # Validate required paths if self.compile_commands_path is None: raise ValueError("compile_commands_path is required for analysis but was None") if self.ninja_path is None: raise ValueError("ninja_path is required for analysis but was None") # Phase 1: Parse compile commands if progress_callback: progress_callback("parsing_compile_commands", 0, 1) cc_parser = CompileCommandsParser(self.compile_commands_path) commands = cc_parser.parse(extensions=[".cpp", ".cc", ".cu", ".hip"]) if progress_callback: progress_callback("parsing_compile_commands", 1, 1) # Phase 2: Extract dependencies extractor = DependencyExtractor(parallel_workers=self.parallel_workers) def dep_progress(current, total): if progress_callback: progress_callback("extracting_dependencies", current, total) source_to_deps = extractor.extract_batch(commands, progress_callback=dep_progress) # Phase 3: Parse ninja target mappings if progress_callback: progress_callback("parsing_ninja", 0, 1) ninja_parser = NinjaTargetParser(self.ninja_path) exe_to_objects = ninja_parser.parse_executable_mappings() obj_to_source = ninja_parser.parse_object_to_source() if progress_callback: progress_callback("parsing_ninja", 1, 1) # Phase 4: Build dependency mapping if progress_callback: progress_callback("building_mapping", 0, 1) mapper = DependencyMapper(workspace_root=self.workspace_root) self.file_to_executables = mapper.build_mapping( exe_to_objects, obj_to_source, source_to_deps ) # Build reverse mapping self.executable_to_files = defaultdict(set) for file_path, exes in self.file_to_executables.items(): for exe in exes: self.executable_to_files[exe].add(file_path) self.executable_to_files = dict(self.executable_to_files) if progress_callback: progress_callback("building_mapping", 1, 1) def calculate_statistics(self) -> Dict: """Calculate statistics about the dependency mapping. Returns: Dictionary with statistics """ return { "total_files": len(self.file_to_executables), "total_executables": len(self.executable_to_files), "files_with_multiple_executables": sum( 1 for exes in self.file_to_executables.values() if len(exes) > 1 ), } def export_to_json(self, output_path: str): """Export dependency mapping to JSON file. The output format is compatible with selective_test_filter.py. Args: output_path: Path to write JSON output """ # Convert sets to sorted lists for JSON serialization data = { "file_to_executables": { f: sorted(exes) for f, exes in self.file_to_executables.items() }, "executable_to_files": { exe: sorted(files) for exe, files in self.executable_to_files.items() }, "statistics": self.calculate_statistics(), "repo": { "type": "cmake_prebuild", "workspace_root": self.workspace_root, }, "input_hash": self.calculate_input_hash(), } with open(output_path, "w") as f: json.dump(data, f, indent=2) def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser( description="CMake-based dependency analyzer for pre-build test selection" ) parser.add_argument( "compile_commands", help="Path to compile_commands.json", ) parser.add_argument( "build_ninja", help="Path to build.ninja", ) parser.add_argument( "--workspace-root", default=".", help="Workspace root directory (default: current directory)", ) parser.add_argument( "--output", default="cmake_dependency_mapping.json", help="Output JSON file (default: cmake_dependency_mapping.json)", ) parser.add_argument( "--parallel", type=int, default=8, help="Number of parallel workers (default: 8)", ) parser.add_argument( "--quiet", action="store_true", help="Suppress progress output", ) parser.add_argument( "--force", action="store_true", help="Force regeneration even if cache is valid", ) args = parser.parse_args() def progress(phase, current, total): if not args.quiet: print(f"[{phase}] {current}/{total}", end="\r") if current == total: print() analyzer = CMakeDependencyAnalyzer( compile_commands_path=args.compile_commands, ninja_path=args.build_ninja, workspace_root=args.workspace_root, parallel_workers=args.parallel, ) # Check if cache needs regeneration if not args.force and not analyzer.should_regenerate_cache(args.output): print(f"Cache is valid, skipping analysis. Use --force to regenerate.") print(f"Using cached results from {args.output}") return if not args.force and os.path.exists(args.output): print(f"Cache invalid or outdated, regenerating dependencies...") print(f"Analyzing dependencies from {args.compile_commands}...") analyzer.analyze(progress_callback=progress) print(f"\nExporting to {args.output}...") analyzer.export_to_json(args.output) stats = analyzer.calculate_statistics() print(f"\nResults:") print(f" Total files: {stats['total_files']}") print(f" Total executables: {stats['total_executables']}") print(f" Files with multiple executables: {stats['files_with_multiple_executables']}") if __name__ == "__main__": main()