[rocm-libraries] ROCm/rocm-libraries#5249 (commit 2a114bb)

[CK] [CK_TILE] Improve build and test time of CI with smart
 dependency parser (#5249)

## Motivation

Existing dependency parser needs full build of tests to determine which
tests are affected by code changes in a PR. This still takes 2-4 hours
for building the tests which slows down the CI as the number of tests
grow. To resolve this issue we implemented a smart dependency parser
which uses CMake Configure to parse dependencies and build only the
affected test cases. We have ensured that two approaches are available
1) CMake pre-build analysis for each PR to ensure fast build and test.
2) Ninja post-build analysis to enable full build for nightly tests.

## Technical Details

```bash
### 1. Configure the project with CMake
cmake -G Ninja -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..

### 2. Analyze dependencies (no build required!)
python3 ../script/dependency-parser/main.py cmake-parse compile_commands.json build.ninja \
  --workspace-root .. --output cmake_dependency_mapping.json --parallel 8

### 3. Find tests affected by changes
python3 ../script/dependency-parser/main.py select cmake_dependency_mapping.json origin/develop \
  HEAD --test-prefix --output tests_to_run.json

### 4. Build only affected tests
ninja $(jq -r '.executables[]' tests_to_run.json | tr '\n' ' ')

### 5. Run affected tests
ctest -R "$(jq -r '.regex' tests_to_run.json)"
```

### Jenkins Integration
- Added `buildMode` to jenkinsfile to integrate both `selective` and
`full` build methods

### Known Limitations

### 1. Build-Time Generated Headers (HIGH RISK)

**Problem:** Files generated during the build process (e.g., via
`add_custom_command`) cannot be analyzed before building.

**Example:**
```cmake
add_custom_command(
  OUTPUT ${CMAKE_BINARY_DIR}/generated/config.hpp
  COMMAND generate_config.sh
  DEPENDS template.hpp.in
)
```

**Impact:** If a source file includes `generated/config.hpp`, the
dependency won't be detected until after building.

**Mitigation:**
- CK analysis shows **no generated headers** currently used
- If generated headers are added in the future, they must be built first
- Recommendation: Generate headers in CMake configure phase (not build
phase) when possible

## Test Plan
**1. Modified Files:**
```
include/ck_tile/ops/common.hpp
include/ck_tile/ops/gemm.hpp
include/ck_tile/ops/gemm/warp/warp_gemm.hpp
```
**2. Compare tests selected between `build.ninja` and `cmake-parse`
methods**

## Test Result
- 1. The test completed in 5-6 minutes finding about 8000+ executables
that should be built.
- 2. We selected a commit 5ccc1387ea which resulted in same 7 tests with
both legacy and new methods.
-

PR | Legacy tests | Smart tests | Notes
-- | -- | -- | --
5261 | 453 | 455 | Only 2 tests (test_amdgcn_mma and
test_amdgcn_sparse_mma)
5168 | 0 | 0 | Changes in   dispatcher only. No CK tests invoked.
5249 | 0 | 0 | Changes to   dependency parser. No CK tests invoked
5260 | 0 | 0 | Changes in   dispatcher only. No CK tests invoked.
5174 | 1 | 1 | One test from FMHA   affected by this PR in both cases
5383 | 0 | 0 | Changes are only in benchmark files. Did not trigger any
tests
5445 | 1 | 1 | Changes are only to tests/ck_tile/gemm_streamk. Only
triggered one streamk test in both cases.
5454 | 3 | 3 | Both methods identified same test_grouped_conv_bwd tests
5427 | 234 | 234 | Core infrastructure header changes. Detected exactly
same tests
5388 | 85 | 85 | modifies warp-level GEMM operations (warp_gemm.hpp,
warp_gemm_dispatcher.hpp). Correctly identified all the streamK gemm
tests

## Submission Checklist

- [x ] Look over the contributing guidelines at
https://github.com/ROCm/ROCm/blob/develop/CONTRIBUTING.md#pull-requests.
This commit is contained in:
Yaswanth Raparti
2026-03-19 05:31:35 +00:00
committed by assistant-librarian[bot]
parent 345a56c55e
commit 652d3456ca
13 changed files with 3585 additions and 210 deletions

View File

@@ -0,0 +1,745 @@
#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
CMake Dependency Analyzer
Pre-build dependency analysis using compile_commands.json and clang -MM.
This approach extracts header dependencies without requiring a full build,
enabling selective test building in CI pipelines.
Key Features:
- Parses compile_commands.json generated by CMake at configure time
- Uses clang/amdclang -MM to extract header dependencies (preprocessing only)
- Parses build.ninja for target -> source mappings
- Outputs dependency_mapping.json compatible with selective_test_filter.py
"""
import hashlib
import json
import os
import re
import shlex
import subprocess
import sys
import tempfile
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
class CompileCommandsParser:
"""Parses compile_commands.json generated by CMake."""
def __init__(self, compile_commands_path: str):
"""Initialize parser with path to compile_commands.json.
Args:
compile_commands_path: Path to compile_commands.json file
"""
self.compile_commands_path = compile_commands_path
def parse(self, extensions: Optional[List[str]] = None) -> List[Dict]:
"""Parse compile_commands.json and return list of compile commands.
Args:
extensions: Optional list of file extensions to filter by (e.g., ['.cpp', '.cc'])
Returns:
List of compile command dictionaries with 'file', 'directory', and 'command' keys
Raises:
FileNotFoundError: If compile_commands.json doesn't exist
json.JSONDecodeError: If file contains invalid JSON
"""
if not os.path.exists(self.compile_commands_path):
raise FileNotFoundError(f"compile_commands.json not found: {self.compile_commands_path}")
with open(self.compile_commands_path, "r") as f:
commands = json.load(f)
# Normalize commands to always have 'command' key (not 'arguments')
normalized = []
for cmd in commands:
# Handle 'arguments' format (convert to 'command' string)
if "arguments" in cmd and "command" not in cmd:
cmd["command"] = " ".join(shlex.quote(arg) for arg in cmd["arguments"])
# Filter by extension if specified
if extensions:
file_ext = os.path.splitext(cmd["file"])[1]
if file_ext not in extensions:
continue
normalized.append(cmd)
return normalized
class DependencyExtractor:
"""Extracts header dependencies using clang -MM."""
def __init__(self, parallel_workers: int = 1, timeout: int = 30):
"""Initialize dependency extractor.
Args:
parallel_workers: Number of parallel workers for extraction
timeout: Timeout in seconds for each clang -MM call
"""
self.parallel_workers = parallel_workers
self.timeout = timeout
self._temp_dir = None
def convert_to_dependency_command(self, compile_command: str, deps_output_file: str) -> List[str]:
"""Convert a compile command to a dependency extraction command.
Replaces -c with -MM and removes -o output specification.
Args:
compile_command: Original compile command string
deps_output_file: Path to write dependency output
Returns:
Modified command as a list of arguments for dependency extraction
"""
parts = shlex.split(compile_command)
new_parts = []
skip_next = False
for i, part in enumerate(parts):
if skip_next:
skip_next = False
continue
# Skip -c (compile flag)
if part == "-c":
continue
# Skip -o and its argument (output file)
if part == "-o":
skip_next = True
continue
# Skip standalone .o files that might appear
if part.endswith(".o") and not part.startswith("-"):
continue
new_parts.append(part)
# Insert -MM and -MF flags after the compiler
if new_parts:
compiler = new_parts[0]
rest = new_parts[1:]
new_parts = [compiler, "-MM", "-MF", deps_output_file] + rest
return new_parts
def parse_makefile_deps(self, deps_content: str) -> List[str]:
"""Parse makefile-style dependency output from clang -MM.
Args:
deps_content: Content of .d file generated by clang -MM
Returns:
List of dependency file paths (excluding the target .o file)
"""
if not deps_content.strip():
return []
# Join continuation lines and split on whitespace
content = deps_content.replace("\\\n", " ").replace("\\\r\n", " ")
# Find the colon separating target from dependencies
colon_pos = content.find(":")
if colon_pos == -1:
return []
# Everything after the colon is dependencies
deps_part = content[colon_pos + 1:]
# Split on whitespace and filter empty strings
deps = [d.strip() for d in deps_part.split() if d.strip()]
return deps
def _get_deps_file(self, source_file: str) -> str:
"""Get a temporary file path for dependency output.
Args:
source_file: Source file being analyzed
Returns:
Path to temporary .d file
"""
if self._temp_dir is None:
self._temp_dir = tempfile.mkdtemp(prefix="ck_deps_")
basename = os.path.basename(source_file)
return os.path.join(self._temp_dir, f"{basename}.d")
def extract(self, directory: str, compile_command: str, source_file: str) -> List[str]:
"""Extract dependencies for a single source file.
Args:
directory: Working directory for compilation
compile_command: Original compile command
source_file: Source file to analyze
Returns:
List of dependency file paths, or empty list on error
"""
deps_file = self._get_deps_file(source_file)
try:
dep_command = self.convert_to_dependency_command(compile_command, deps_file)
# Run the dependency extraction command
# Note: Use errors='replace' to handle non-UTF8 output from AMD clang
result = subprocess.run(
dep_command,
cwd=directory,
capture_output=True,
text=True,
errors='replace',
timeout=self.timeout,
)
if result.returncode != 0:
return []
# Parse the generated .d file
if os.path.exists(deps_file):
with open(deps_file, "r", errors='replace') as f:
deps_content = f.read()
return self.parse_makefile_deps(deps_content)
return []
except subprocess.TimeoutExpired:
return []
except Exception:
return []
finally:
# Clean up temp file
if os.path.exists(deps_file):
try:
os.unlink(deps_file)
except OSError:
pass
def extract_batch(
self, commands: List[Dict], progress_callback=None
) -> Dict[str, List[str]]:
"""Extract dependencies for multiple source files.
Args:
commands: List of compile command dictionaries
progress_callback: Optional callback(current, total) for progress reporting
Returns:
Dictionary mapping source files to their dependencies
"""
source_to_deps = {}
total = len(commands)
if self.parallel_workers <= 1:
# Serial execution
for i, cmd in enumerate(commands):
deps = self.extract(cmd["directory"], cmd["command"], cmd["file"])
source_to_deps[cmd["file"]] = deps
if progress_callback:
progress_callback(i + 1, total)
else:
# Parallel execution
with ProcessPoolExecutor(max_workers=self.parallel_workers) as executor:
futures = {
executor.submit(
self.extract, cmd["directory"], cmd["command"], cmd["file"]
): cmd["file"]
for cmd in commands
}
completed = 0
for future in as_completed(futures):
source_file = futures[future]
try:
deps = future.result()
source_to_deps[source_file] = deps
except Exception:
source_to_deps[source_file] = []
completed += 1
if progress_callback:
progress_callback(completed, total)
return source_to_deps
class NinjaTargetParser:
"""Parses ninja build files to get target mappings."""
def __init__(self, ninja_file_path: str):
"""Initialize parser with path to build.ninja.
Args:
ninja_file_path: Path to build.ninja file
"""
self.ninja_file_path = ninja_file_path
def parse_executable_mappings(self) -> Dict[str, List[str]]:
"""Parse executable -> object file mappings from build.ninja.
Returns:
Dictionary mapping executable paths to lists of object files
"""
if not os.path.exists(self.ninja_file_path):
return {}
exe_to_objects = {}
# Pattern to match executable build rules
# Example: build bin/test_gemm: CXX_EXECUTABLE_LINKER__test_gemm test.o lib.o | deps
exe_pattern = re.compile(r"^build\s+(bin/[^:]+):\s+\S+\s+([^|]+)")
with open(self.ninja_file_path, "r") as f:
for line in f:
match = exe_pattern.match(line)
if match:
exe = match.group(1)
deps_part = match.group(2).strip()
# Extract object files (ending in .o, not starting with /)
object_files = []
for dep in deps_part.split():
if dep.endswith(".o") and not dep.startswith("/"):
object_files.append(dep)
if object_files:
exe_to_objects[exe] = object_files
return exe_to_objects
def parse_object_to_source(self) -> Dict[str, str]:
"""Parse object -> source file mappings from build.ninja.
Returns:
Dictionary mapping object file paths to source file paths
"""
if not os.path.exists(self.ninja_file_path):
return {}
obj_to_source = {}
# Pattern to match object compilation rules
# Example: build test/test.cpp.o: CXX_COMPILER__target /src/test.cpp
obj_pattern = re.compile(r"^build\s+([^:]+\.(?:cpp|cc|cu|hip)\.o):\s+\S+\s+(\S+)")
with open(self.ninja_file_path, "r") as f:
for line in f:
match = obj_pattern.match(line)
if match:
obj_file = match.group(1)
source_file = match.group(2)
obj_to_source[obj_file] = source_file
return obj_to_source
class DependencyMapper:
"""Builds file -> executable dependency mappings."""
def __init__(self, workspace_root: Optional[str] = None):
"""Initialize dependency mapper.
Args:
workspace_root: Root directory of the workspace for path normalization
"""
self.workspace_root = workspace_root
if workspace_root:
self.workspace_root = os.path.abspath(workspace_root).rstrip("/") + "/"
def normalize_path(self, path: str) -> str:
"""Normalize a file path relative to workspace root.
Args:
path: File path to normalize
Returns:
Normalized relative path
"""
if self.workspace_root and path.startswith(self.workspace_root):
return path[len(self.workspace_root):]
return path
def is_project_file(self, file_path: str) -> bool:
"""Check if a file is part of the project (not a system file).
Args:
file_path: File path to check
Returns:
True if file is a project file, False if system file
"""
# Exclude system files
system_prefixes = ["/usr/", "/opt/rocm", "/lib/", "/system/", "/local/"]
if any(file_path.startswith(prefix) for prefix in system_prefixes):
return False
# Project directory prefixes
project_dirs = [
"include/",
"library/",
"test/",
"example/",
"src/",
"profiler/",
"build/include/",
"build/_deps/gtest",
"client_example",
"codegen",
"tile_engine",
"dispatcher",
"experimental",
"tutorial",
]
if any(file_path.startswith(prefix) for prefix in project_dirs):
return True
# Also check monorepo-style paths
if any(
file_path.startswith(f"projects/composablekernel/{prefix}")
for prefix in project_dirs
):
return True
# Include files with common source/header extensions
if file_path.endswith(
(".cpp", ".hpp", ".h", ".c", ".cc", ".cxx", ".cu", ".hip", ".inc")
):
return True
return False
def build_mapping(
self,
exe_to_objects: Dict[str, List[str]],
obj_to_source: Dict[str, str],
source_to_deps: Dict[str, List[str]],
) -> Dict[str, Set[str]]:
"""Build file -> executable mapping from component mappings.
Args:
exe_to_objects: Executable -> object files mapping
obj_to_source: Object file -> source file mapping
source_to_deps: Source file -> dependency files mapping
Returns:
Dictionary mapping file paths to sets of executables
"""
file_to_exes: Dict[str, Set[str]] = defaultdict(set)
for exe, object_files in exe_to_objects.items():
for obj_file in object_files:
source_file = obj_to_source.get(obj_file)
if not source_file:
continue
deps = source_to_deps.get(source_file, [])
for dep_file in deps:
# Normalize and filter
normalized = self.normalize_path(dep_file)
if self.is_project_file(normalized):
file_to_exes[normalized].add(exe)
return dict(file_to_exes)
class CMakeDependencyAnalyzer:
"""Main analyzer class combining all components."""
def __init__(
self,
compile_commands_path: Optional[str],
ninja_path: Optional[str],
workspace_root: str,
parallel_workers: int = 8,
):
"""Initialize the analyzer.
Args:
compile_commands_path: Path to compile_commands.json
ninja_path: Path to build.ninja
workspace_root: Root directory of the workspace
parallel_workers: Number of parallel workers for dependency extraction
"""
self.compile_commands_path = compile_commands_path
self.ninja_path = ninja_path
self.workspace_root = workspace_root
self.parallel_workers = parallel_workers
# Results
self.file_to_executables: Dict[str, Set[str]] = {}
self.executable_to_files: Dict[str, Set[str]] = {}
def calculate_input_hash(self) -> str:
"""Calculate hash of input files to detect when cache should be invalidated.
Returns:
SHA256 hash string representing the current state of input files
"""
hasher = hashlib.sha256()
# Hash compile_commands.json modification time and size
if self.compile_commands_path and os.path.exists(self.compile_commands_path):
stat = os.stat(self.compile_commands_path)
hasher.update(f"{stat.st_mtime}:{stat.st_size}".encode())
# Hash build.ninja modification time and size
if self.ninja_path and os.path.exists(self.ninja_path):
stat = os.stat(self.ninja_path)
hasher.update(f"{stat.st_mtime}:{stat.st_size}".encode())
# Hash compiler version (first compiler found in compile_commands.json)
if self.compile_commands_path and os.path.exists(self.compile_commands_path):
try:
with open(self.compile_commands_path, "r") as f:
commands = json.load(f)
if commands:
# Extract first compiler command
cmd = commands[0].get("command", "")
if cmd:
compiler = shlex.split(cmd)[0] if cmd else ""
if os.path.exists(compiler):
# Get compiler version
result = subprocess.run(
[compiler, "--version"],
capture_output=True,
text=True,
timeout=5,
)
hasher.update(result.stdout.encode())
except (json.JSONDecodeError, subprocess.TimeoutExpired, Exception):
pass
return hasher.hexdigest()
def should_regenerate_cache(self, cache_file: str) -> bool:
"""Check if dependency cache needs to be regenerated.
Args:
cache_file: Path to the cached dependency mapping JSON
Returns:
True if cache should be regenerated, False if cache is valid
"""
if not os.path.exists(cache_file):
return True
try:
# Load cached metadata
with open(cache_file, "r") as f:
data = json.load(f)
cached_hash = data.get("input_hash")
if not cached_hash:
return True
# Calculate current hash and compare
current_hash = self.calculate_input_hash()
return current_hash != cached_hash
except (json.JSONDecodeError, KeyError):
# Corrupted cache or old format
return True
def analyze(self, progress_callback=None):
"""Run the full dependency analysis.
Args:
progress_callback: Optional callback(phase, current, total) for progress
Raises:
ValueError: If compile_commands_path or ninja_path is None
"""
# Validate required paths
if self.compile_commands_path is None:
raise ValueError("compile_commands_path is required for analysis but was None")
if self.ninja_path is None:
raise ValueError("ninja_path is required for analysis but was None")
# Phase 1: Parse compile commands
if progress_callback:
progress_callback("parsing_compile_commands", 0, 1)
cc_parser = CompileCommandsParser(self.compile_commands_path)
commands = cc_parser.parse(extensions=[".cpp", ".cc", ".cu", ".hip"])
if progress_callback:
progress_callback("parsing_compile_commands", 1, 1)
# Phase 2: Extract dependencies
extractor = DependencyExtractor(parallel_workers=self.parallel_workers)
def dep_progress(current, total):
if progress_callback:
progress_callback("extracting_dependencies", current, total)
source_to_deps = extractor.extract_batch(commands, progress_callback=dep_progress)
# Phase 3: Parse ninja target mappings
if progress_callback:
progress_callback("parsing_ninja", 0, 1)
ninja_parser = NinjaTargetParser(self.ninja_path)
exe_to_objects = ninja_parser.parse_executable_mappings()
obj_to_source = ninja_parser.parse_object_to_source()
if progress_callback:
progress_callback("parsing_ninja", 1, 1)
# Phase 4: Build dependency mapping
if progress_callback:
progress_callback("building_mapping", 0, 1)
mapper = DependencyMapper(workspace_root=self.workspace_root)
self.file_to_executables = mapper.build_mapping(
exe_to_objects, obj_to_source, source_to_deps
)
# Build reverse mapping
self.executable_to_files = defaultdict(set)
for file_path, exes in self.file_to_executables.items():
for exe in exes:
self.executable_to_files[exe].add(file_path)
self.executable_to_files = dict(self.executable_to_files)
if progress_callback:
progress_callback("building_mapping", 1, 1)
def calculate_statistics(self) -> Dict:
"""Calculate statistics about the dependency mapping.
Returns:
Dictionary with statistics
"""
return {
"total_files": len(self.file_to_executables),
"total_executables": len(self.executable_to_files),
"files_with_multiple_executables": sum(
1 for exes in self.file_to_executables.values() if len(exes) > 1
),
}
def export_to_json(self, output_path: str):
"""Export dependency mapping to JSON file.
The output format is compatible with selective_test_filter.py.
Args:
output_path: Path to write JSON output
"""
# Convert sets to sorted lists for JSON serialization
data = {
"file_to_executables": {
f: sorted(exes) for f, exes in self.file_to_executables.items()
},
"executable_to_files": {
exe: sorted(files) for exe, files in self.executable_to_files.items()
},
"statistics": self.calculate_statistics(),
"repo": {
"type": "cmake_prebuild",
"workspace_root": self.workspace_root,
},
"input_hash": self.calculate_input_hash(),
}
with open(output_path, "w") as f:
json.dump(data, f, indent=2)
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(
description="CMake-based dependency analyzer for pre-build test selection"
)
parser.add_argument(
"compile_commands",
help="Path to compile_commands.json",
)
parser.add_argument(
"build_ninja",
help="Path to build.ninja",
)
parser.add_argument(
"--workspace-root",
default=".",
help="Workspace root directory (default: current directory)",
)
parser.add_argument(
"--output",
default="cmake_dependency_mapping.json",
help="Output JSON file (default: cmake_dependency_mapping.json)",
)
parser.add_argument(
"--parallel",
type=int,
default=8,
help="Number of parallel workers (default: 8)",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress progress output",
)
parser.add_argument(
"--force",
action="store_true",
help="Force regeneration even if cache is valid",
)
args = parser.parse_args()
def progress(phase, current, total):
if not args.quiet:
print(f"[{phase}] {current}/{total}", end="\r")
if current == total:
print()
analyzer = CMakeDependencyAnalyzer(
compile_commands_path=args.compile_commands,
ninja_path=args.build_ninja,
workspace_root=args.workspace_root,
parallel_workers=args.parallel,
)
# Check if cache needs regeneration
if not args.force and not analyzer.should_regenerate_cache(args.output):
print(f"Cache is valid, skipping analysis. Use --force to regenerate.")
print(f"Using cached results from {args.output}")
return
if not args.force and os.path.exists(args.output):
print(f"Cache invalid or outdated, regenerating dependencies...")
print(f"Analyzing dependencies from {args.compile_commands}...")
analyzer.analyze(progress_callback=progress)
print(f"\nExporting to {args.output}...")
analyzer.export_to_json(args.output)
stats = analyzer.calculate_statistics()
print(f"\nResults:")
print(f" Total files: {stats['total_files']}")
print(f" Total executables: {stats['total_executables']}")
print(f" Files with multiple executables: {stats['files_with_multiple_executables']}")
if __name__ == "__main__":
main()

View File

@@ -21,7 +21,7 @@ import json
class EnhancedNinjaDependencyParser:
def __init__(self, build_file_path, ninja_executable="ninja"):
self.build_file_path = build_file_path
self.build_dir = os.path.dirname(build_file_path)
self.build_dir = os.path.dirname(build_file_path) or "."
self.ninja_executable = ninja_executable
# Core data structures

View File

@@ -34,10 +34,10 @@ import os
def get_changed_files(ref1, ref2, project: str = None):
"""Return a set of files changed between two git refs."""
try:
cmd = ["git", "diff", "--name-only", ref1, ref2]
if project:
# Scope git diff to only this project's subtree for efficiency
cmd += ["--", f"projects/{project}/"]
# Don't use git path filter - it can miss files when running from subdirectories
git_root = subprocess.run(["git", "rev-parse", "--show-toplevel"], capture_output=True, text=True, check=True).stdout.strip()
cmd = ["git", "-C", git_root, "diff", "--name-only", f"{ref1}...{ref2}", "--", "projects/composablekernel"]
result = subprocess.run(
cmd,
capture_output=True,
@@ -51,6 +51,7 @@ def get_changed_files(ref1, ref2, project: str = None):
files = raw_files
print(f"Identified {len(files)} modified files")
else:
# Strip projects/{project}/ prefix from changed files
root = f"projects/{project}/"
root_len = len(root)
files = set()
@@ -73,23 +74,79 @@ def load_depmap(depmap_json):
data = json.load(f)
# Support both old and new formats
json_project = None
if "repo" in data and data["repo"]["type"] == "monorepo":
json_project = data["repo"]["project"]
if "repo" in data:
if data["repo"]["type"] == "monorepo":
json_project = data["repo"]["project"]
elif "workspace_root" in data["repo"]:
# Extract project from workspace_root path
workspace_root = data["repo"]["workspace_root"]
# Convert relative path to absolute if needed
if not os.path.isabs(workspace_root):
depmap_dir = os.path.dirname(os.path.abspath(depmap_json))
workspace_root = os.path.abspath(os.path.join(depmap_dir, workspace_root))
# If workspace_root is like /path/to/projects/composablekernel, extract composablekernel
if "/projects/" in workspace_root:
json_project = workspace_root.split("/projects/")[1].rstrip("/").split("/")[0]
if "file_to_executables" in data:
return data["file_to_executables"], json_project
return data, json_project
def select_tests(file_to_executables, changed_files, filter_mode):
def get_ctest_registered_tests(build_dir=None):
"""Get list of tests registered with CTest (excludes EXCLUDE_FROM_ALL targets)."""
try:
cmd = ["ctest", "-N"]
if build_dir:
cmd.extend(["--test-dir", build_dir])
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
return None
tests = set()
for line in result.stdout.splitlines():
if line.strip().startswith("Test #"):
parts = line.split(":", 1)
if len(parts) == 2:
test_name = parts[1].strip()
tests.add(test_name)
return tests
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
return None
def select_tests(file_to_executables, changed_files, filter_mode, ctest_only=False, build_dir=None):
"""Return a set of test executables affected by changed files."""
affected = set()
ctest_tests = None
if ctest_only:
ctest_tests = get_ctest_registered_tests(build_dir)
if ctest_tests is None:
print("Warning: Could not get CTest test list, including all executables")
else:
print(f"Filtering to {len(ctest_tests)} CTest-registered tests (excluding EXCLUDE_FROM_ALL targets)")
for f in changed_files:
if f in file_to_executables:
for exe in file_to_executables[f]:
if filter_mode == "all":
affected.add(exe)
elif filter_mode == "test_prefix" and exe.startswith("test_"):
affected.add(exe)
if filter_mode == "test_prefix" and not os.path.basename(exe).startswith("test_"):
continue
if ctest_only and ctest_tests is not None:
test_name = exe.replace("bin/", "")
if test_name not in ctest_tests:
continue
affected.add(exe)
return sorted(affected)
@@ -141,16 +198,32 @@ def main():
ref2 = sys.argv[3]
filter_mode = "all"
output_json = "tests_to_run.json"
ctest_only = False
build_dir = None
if "--test-prefix" in sys.argv:
filter_mode = "test_prefix"
if "--all" in sys.argv:
filter_mode = "all"
if "--ctest-only" in sys.argv:
ctest_only = True
if "--build-dir" in sys.argv:
idx = sys.argv.index("--build-dir")
if idx + 1 < len(sys.argv):
build_dir = sys.argv[idx + 1]
if "--output" in sys.argv:
idx = sys.argv.index("--output")
if idx + 1 < len(sys.argv):
output_json = sys.argv[idx + 1]
# If build_dir not specified, try to infer from depmap_json path
if ctest_only and build_dir is None:
depmap_dir = os.path.dirname(os.path.abspath(depmap_json))
if os.path.basename(depmap_dir) in ["build", "."]:
build_dir = depmap_dir
elif os.path.exists(os.path.join(depmap_dir, "build.ninja")):
build_dir = depmap_dir
if not os.path.exists(depmap_json):
print(f"Dependency map JSON not found: {depmap_json}")
sys.exit(1)
@@ -161,15 +234,55 @@ def main():
print("No changed files detected.")
tests = []
else:
tests = select_tests(file_to_executables, changed_files, filter_mode)
tests = select_tests(file_to_executables, changed_files, filter_mode, ctest_only, build_dir)
# Generate ctest regex from test names
# Split into chunks to avoid regex length limits in CTest
regex_chunks = []
chunk_size = 50 # Max tests per regex pattern
if tests:
# Extract basenames for regex (e.g., bin/test_gemm -> test_gemm)
test_names = [os.path.basename(t) for t in tests]
# Split into chunks
for i in range(0, len(test_names), chunk_size):
chunk = test_names[i:i + chunk_size]
regex_chunks.append("|".join(chunk))
# Keep single regex for backward compatibility (but may be too long)
regex = "|".join(test_names)
else:
regex = ""
# Output format matches Jenkinsfile usage and documentation
output = {
"tests_to_run": tests, # For backward compatibility and length check
"executables": tests, # Used by Jenkinsfile for ninja build
"regex": regex, # Used by Jenkinsfile for ctest (deprecated for large test sets)
"regex_chunks": regex_chunks, # Multiple regex patterns for large test sets
"changed_files": sorted(changed_files),
"statistics": {
"total_changed_files": len(changed_files),
"total_affected_executables": len(tests),
"num_regex_chunks": len(regex_chunks),
},
}
with open(output_json, "w") as f:
json.dump(
{"tests_to_run": tests, "changed_files": sorted(changed_files)}, f, indent=2
)
json.dump(output, f, indent=2)
# Print summary
print(f"Exported {len(tests)} tests to run to {output_json}")
# Print changed files for visibility
if changed_files:
print(f"\nChanged files ({len(changed_files)}):")
for f in sorted(changed_files):
print(f" - {f}")
else:
print("\nNo files changed.")
if __name__ == "__main__":
main()