From f15996d0f32c49cf66f02f760f1715ca5beeb90e Mon Sep 17 00:00:00 2001 From: Astha Date: Wed, 26 Nov 2025 13:55:40 -0500 Subject: [PATCH] Restructure Tile Engine's benchmarking process This change restructures the Benchmark structs into 3 files. There is an addition of a base class for all GEMM benchmarks, derived classes for Universal GEMM, multi dim GEMM, and GEMM preshuffle. Common functions have been relocated into a common directory. For any derived base classes, only the redefination of the constructor is needed, significantly mitigating the need for duplicated code. --- tile_engine/ops/commons/__init__.py | 2 + tile_engine/ops/commons/benchmark_utils.py | 285 ++++++++ tile_engine/ops/gemm/gemm_benchmark.py | 331 +++++++++ .../gemm_multi_d/gemm_multi_d_benchmark.py | 612 +--------------- .../gemm_preshuffle_benchmark.py | 614 ++-------------- .../ops/gemm/gemm_universal/CMakeLists.txt | 2 +- .../ops/gemm/gemm_universal/gemm_benchmark.py | 678 ------------------ ...hmark.hpp => gemm_universal_benchmark.hpp} | 2 +- .../gemm_universal_benchmark.py | 146 ++++ ...pp => gemm_universal_benchmark_single.cpp} | 61 +- ...m_common.hpp => gemm_universal_common.hpp} | 0 ...ofiler.hpp => gemm_universal_profiler.hpp} | 2 +- 12 files changed, 848 insertions(+), 1887 deletions(-) create mode 100644 tile_engine/ops/commons/__init__.py create mode 100644 tile_engine/ops/commons/benchmark_utils.py create mode 100644 tile_engine/ops/gemm/gemm_benchmark.py delete mode 100644 tile_engine/ops/gemm/gemm_universal/gemm_benchmark.py rename tile_engine/ops/gemm/gemm_universal/{gemm_benchmark.hpp => gemm_universal_benchmark.hpp} (99%) create mode 100755 tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.py rename tile_engine/ops/gemm/gemm_universal/{gemm_benchmark_single.cpp => gemm_universal_benchmark_single.cpp} (55%) rename tile_engine/ops/gemm/gemm_universal/{gemm_common.hpp => gemm_universal_common.hpp} (100%) rename tile_engine/ops/gemm/gemm_universal/{gemm_profiler.hpp => gemm_universal_profiler.hpp} (99%) diff --git a/tile_engine/ops/commons/__init__.py b/tile_engine/ops/commons/__init__.py new file mode 100644 index 0000000000..1df4857184 --- /dev/null +++ b/tile_engine/ops/commons/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Advanced Micro Devices, Inc., or its affiliates. +# SPDX-License-Identifier: MIT diff --git a/tile_engine/ops/commons/benchmark_utils.py b/tile_engine/ops/commons/benchmark_utils.py new file mode 100644 index 0000000000..0c158fa48c --- /dev/null +++ b/tile_engine/ops/commons/benchmark_utils.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +# Copyright (c) Advanced Micro Devices, Inc., or its affiliates. +# SPDX-License-Identifier: MIT + +import sys +import json +import subprocess +import argparse +import csv +import time +from pathlib import Path +from typing import List, Dict, Tuple, Optional + + +def run_kernel(build_dir: Path, kernel_path: Path, params: Dict[str, str], verbose: bool = False) -> Optional[Dict]: + """Run a single kernel with given parameters and save output to individual JSON file""" + # Create results directory + results_dir = build_dir / "results" + results_dir.mkdir(exist_ok=True) + + # Generate unique JSON filename for this kernel + json_file = results_dir / f"{kernel_path.stem}.json" + + cmd = [str(kernel_path)] + + # Add parameters + for key, value in params.items(): + cmd.append(f"-{key}={value}") + + # Add JSON output flag for clean JSON output + cmd.append("-json_output=true") + + if verbose: + print(f"Running: {' '.join(cmd)}") + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + + if result.returncode != 0: + print(f"Error running {kernel_path.name}: {result.stderr}") + return None + + # Save raw output to individual JSON file + output = result.stdout.strip() + if output: + with open(json_file, "w") as f: + f.write(output) + + # Parse the JSON file + return parse_json_file(json_file, verbose=verbose) + else: + print(f"No output from {kernel_path.name}") + return None + + except subprocess.TimeoutExpired: + print(f"Timeout running {kernel_path.name}") + return None + except Exception as e: + print(f"Error running {kernel_path.name}: {e}") + return None + +def parse_json_file(json_file: Path, verbose: bool = False) -> Optional[Dict]: + """Parse JSON data from individual kernel output file""" + try: + with open(json_file, "r") as f: + content = f.read().strip() + + # Parse the JSON directly since executables produce clean JSON + data = json.loads(content) + + # Return the complete JSON data as-is, just add some convenience fields + result = data.copy() + if "perf_result" in data: + perf = data["perf_result"] + # Add convenience fields for backward compatibility + result["time_ms"] = perf.get("latency(ms)", 0) + result["tflops"] = perf.get("tflops(TFlops)", 0) + result["bandwidth_gb_s"] = perf.get("bandwidth(GB/s)", 0) + + return result + + except json.JSONDecodeError as e: + if verbose: + print(f"Failed to parse JSON from {json_file}: {e}") + return None + except Exception as e: + if verbose: + print(f"Error reading JSON file {json_file}: {e}") + return None + +def find_best_kernel( + results: List[Dict], metric: str = "tflops" +) -> Optional[Dict]: + """Find the best performing kernel based on metric""" + if not results: + return None + + if metric == "tflops": + return max(results, key=lambda x: x.get("tflops", 0)) + elif metric == "time_ms": + return min(results, key=lambda x: x.get("time_ms", float("inf"))) + elif metric == "bandwidth_gb_s": + return max(results, key=lambda x: x.get("bandwidth_gb_s", 0)) + else: + raise ValueError(f"Unknown metric: {metric}") + + +def export_csv(results: List[Dict], filename: str, verbose: bool = False): + """Export all results to CSV""" + if not results: + print("No results to export") + return + + # Get all unique keys from results + all_keys = set() + for result in results: + all_keys.update(result.keys()) + + # Sort keys for consistent output + fieldnames = sorted(all_keys) + + with open(filename, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + + print(f"Results exported to {filename}") + +def export_best_kernels( best_kernels: Dict, filename: str, verbose: bool = False): + """Export best kernel selections to file""" + with open(filename, "w") as f: + f.write("# Best kernel selections\n") + f.write( + "# Format: problem_size -> kernel_name (TFLOPS, bandwidth, latency)\n\n" + ) + + for key, kernel in sorted(best_kernels.items()): + f.write( + f"{key}: {kernel['name']} ({kernel['tflops']:.2f} TFLOPS, {kernel['bandwidth_gb_s']:.2f} GB/s, {kernel['time_ms']:.2f}ms)\n" + ) + + print(f"Best kernels exported to {filename}") + +def export_json(results: List[Dict], filename: str, best_kernels: Dict = None, verbose: bool = False): + """Export all results and best kernels to JSON with comprehensive metadata""" + from datetime import datetime + + # Calculate comprehensive summary statistics for all metrics + successful_results = [r for r in results if r.get("tflops", 0) > 0] + + tflops_values = [r.get("tflops", 0) for r in successful_results] + bandwidth_values = [r.get("bandwidth_gb_s", 0) for r in successful_results] + latency_values = [ + r.get("time_ms", 0) for r in successful_results if r.get("time_ms", 0) > 0 + ] + + # Performance breakdown by kernel type + pipeline_stats = {} + scheduler_stats = {} + data_type_stats = {} + + for result in successful_results: + # Get config info from the new structure + config = result.get("config", {}) + + # Pipeline statistics + pipeline = config.get("pipeline", "unknown") + if pipeline not in pipeline_stats: + pipeline_stats[pipeline] = { + "count": 0, + "avg_tflops": 0, + "best_tflops": 0, + } + pipeline_stats[pipeline]["count"] += 1 + pipeline_stats[pipeline]["best_tflops"] = max( + pipeline_stats[pipeline]["best_tflops"], result.get("tflops", 0) + ) + + # Scheduler statistics + scheduler = config.get("scheduler", "unknown") + if scheduler not in scheduler_stats: + scheduler_stats[scheduler] = { + "count": 0, + "avg_tflops": 0, + "best_tflops": 0, + } + scheduler_stats[scheduler]["count"] += 1 + scheduler_stats[scheduler]["best_tflops"] = max( + scheduler_stats[scheduler]["best_tflops"], result.get("tflops", 0) + ) + + # Data type statistics + data_type = config.get("data_type", "unknown") + if data_type not in data_type_stats: + data_type_stats[data_type] = { + "count": 0, + "avg_tflops": 0, + "best_tflops": 0, + } + data_type_stats[data_type]["count"] += 1 + data_type_stats[data_type]["best_tflops"] = max( + data_type_stats[data_type]["best_tflops"], result.get("tflops", 0) + ) + + # Calculate averages for breakdown stats + for stats_dict, field_name in [ + (pipeline_stats, "pipeline"), + (scheduler_stats, "scheduler"), + (data_type_stats, "data_type"), + ]: + for key in stats_dict: + relevant_results = [ + r + for r in successful_results + if r.get("config", {}).get(field_name, "unknown") == key + ] + if relevant_results: + stats_dict[key]["avg_tflops"] = sum( + r.get("tflops", 0) for r in relevant_results + ) / len(relevant_results) + + output_data = { + "benchmark_metadata": { + "timestamp": datetime.now().isoformat(), + "total_kernels_tested": len(results), + "unique_kernels": len( + set(r.get("name", "unknown") for r in results) + ), + "successful_runs": len(successful_results), + "failed_runs": len(results) - len(successful_results), + }, + "performance_summary": { + "tflops_stats": { + "best": max(tflops_values, default=0), + "average": sum(tflops_values) / len(tflops_values) + if tflops_values + else 0, + "min": min(tflops_values, default=0), + "median": sorted(tflops_values)[len(tflops_values) // 2] + if tflops_values + else 0, + }, + "bandwidth_stats": { + "best_gb_s": max(bandwidth_values, default=0), + "average_gb_s": sum(bandwidth_values) / len(bandwidth_values) + if bandwidth_values + else 0, + "min_gb_s": min(bandwidth_values, default=0), + "median_gb_s": sorted(bandwidth_values)[len(bandwidth_values) // 2] + if bandwidth_values + else 0, + }, + "latency_stats": { + "best_ms": min(latency_values, default=0), + "average_ms": sum(latency_values) / len(latency_values) + if latency_values + else 0, + "max_ms": max(latency_values, default=0), + "median_ms": sorted(latency_values)[len(latency_values) // 2] + if latency_values + else 0, + }, + "kernel_type_breakdown": { + "by_pipeline": pipeline_stats, + "by_scheduler": scheduler_stats, + "by_data_type": data_type_stats, + }, + "total_problem_configurations": len(best_kernels) + if best_kernels + else 0, + }, + "kernel_results": results, + "best_kernels_by_problem": best_kernels or {}, + } + + with open(filename, "w") as f: + json.dump(output_data, f, indent=2) + + print(f"JSON results exported to {filename}") + print(f" - Total kernels: {len(results)}") + print(f" - Successful runs: {len(successful_results)}") + print(f" - Best TFLOPS: {max(tflops_values, default=0):.2f}") + print(f" - Best bandwidth: {max(bandwidth_values, default=0):.2f} GB/s") + print(f" - Best latency: {min(latency_values, default=0):.2f}ms") + diff --git a/tile_engine/ops/gemm/gemm_benchmark.py b/tile_engine/ops/gemm/gemm_benchmark.py new file mode 100644 index 0000000000..3a7afc74e8 --- /dev/null +++ b/tile_engine/ops/gemm/gemm_benchmark.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +# Copyright (c) Advanced Micro Devices, Inc., or its affiliates. +# SPDX-License-Identifier: MIT + +import os +import sys +import json +import subprocess +import argparse +import csv +import time +import importlib.util +from pathlib import Path +from typing import List, Dict, Tuple, Optional + +# TODO: explore modularizing tile engine to avoid accessing imports like this +def _import_benchmark_utils(): + """Import benchmark utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(current_dir) + + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "benchmark_utils", + os.path.join(parent_dir, "commons", "benchmark_utils.py"), + ) + benchmark_utils = importlib.util.module_from_spec(spec) + spec.loader.exec_module(benchmark_utils) + + return benchmark_utils + +benchmark_utils = _import_benchmark_utils() + +class GemmBenchmark: + def __init__(self, build_dir: str, verbose: bool = False, name: str = "benchmark_gemm_"): + self.build_dir = Path(build_dir) + self.verbose = verbose + self.results = [] + self.name = name + + def discover_kernels(self) -> List[Path]: + """Find all benchmark_gemm_* executables in the build directory""" + bin_dir = self.build_dir / "bin" + if not bin_dir.exists(): + print(f"Error: Binary directory {bin_dir} does not exist") + return [] + + glob_name = f"{self.name}*" + kernels = list(bin_dir.glob(glob_name)) + if self.verbose: + print(f"Found {len(kernels)} kernel executables") + for k in kernels: + print(f" - {k.name}") + return kernels + + def extract_kernel_info(self, kernel_path: Path) -> Dict[str, str]: + """Extract comprehensive kernel information from filename""" + name = kernel_path.stem + if name.startswith(self.name): + args = name[len(self.name):] + else: + args = name + + # Initialize with basic info + info = { + "executable": str(kernel_path), + "name": name, + "data_type": "unknown", + "layout": "unknown", + "pipeline": "unknown", + "scheduler": "unknown", + "epilogue": "unknown", + } + + # Parse the kernel name pattern: + # benchmark_gemm_fp16_rcr_mem_default_intrawave_False_False_False_False_False_256x256x32_2x2x1_4x64x16 + parts = args.split("_") + + if len(parts) >= 5: + info["data_type"] = parts[0] + info["layout"] = parts[1] + info["pipeline"] = parts[2] + info["epilogue"] = parts[3] + info["scheduler"] = parts[4] + + # Extract detailed configuration from the end of the name + config_info = self.parse_detailed_config(name) + info.update(config_info) + + # Generate config ID + info["config_id"] = self.generate_config_id(info) + + return info + + def parse_detailed_config(self, kernel_name: str) -> Dict: + """Parse detailed configuration from kernel name""" + config = { + "tile_sizes": {"tile_m": 0, "tile_n": 0, "tile_k": 0}, + "warp_config": {"warp_m": 0, "warp_n": 0, "warp_k": 0}, + "warp_tile": {"warp_tile_m": 0, "warp_tile_n": 0, "warp_tile_k": 0}, + "optimization_flags": { + "pad_m": False, + "pad_n": False, + "pad_k": False, + "persistent": False, + }, + } + + # Split by underscore and look for patterns + parts = kernel_name.split("_") + + # Look for boolean flags (sequence of True/False values) + bool_sequence = [] + for i, part in enumerate(parts): + if part in ["True", "False"]: + bool_sequence.append(part == "True") + # Continue collecting consecutive boolean values + j = i + 1 + while j < len(parts) and parts[j] in ["True", "False"]: + bool_sequence.append(parts[j] == "True") + j += 1 + break + + # Assign boolean flags if we found them + # Order: pad_m, pad_n, pad_k, persistent (4 flags total) + if len(bool_sequence) >= 4: + config["optimization_flags"]["pad_m"] = bool_sequence[0] + config["optimization_flags"]["pad_n"] = bool_sequence[1] + config["optimization_flags"]["pad_k"] = bool_sequence[2] + config["optimization_flags"]["persistent"] = bool_sequence[3] + + # Look for tile size patterns (e.g., 256x256x32_2x2x1_4x64x16) + # The pattern is: tile_sizes_warp_config_warp_tile + dimension_groups = [] + for part in parts: + if "x" in part and len(part.split("x")) == 3: + try: + dims = [int(x) for x in part.split("x")] + if all(d > 0 for d in dims): + dimension_groups.append(dims) + except ValueError: + continue + + # Assign dimensions based on order and magnitude + if len(dimension_groups) >= 3: + # Sort by magnitude to identify: largest=tile_sizes, smallest=warp_config, middle=warp_tile + sorted_groups = sorted(dimension_groups, key=lambda x: max(x), reverse=True) + + # Largest dimensions = tile sizes + config["tile_sizes"]["tile_m"] = sorted_groups[0][0] + config["tile_sizes"]["tile_n"] = sorted_groups[0][1] + config["tile_sizes"]["tile_k"] = sorted_groups[0][2] + + # Smallest dimensions = warp config + config["warp_config"]["warp_m"] = sorted_groups[2][0] + config["warp_config"]["warp_n"] = sorted_groups[2][1] + config["warp_config"]["warp_k"] = sorted_groups[2][2] + + # Middle dimensions = warp tile + config["warp_tile"]["warp_tile_m"] = sorted_groups[1][0] + config["warp_tile"]["warp_tile_n"] = sorted_groups[1][1] + config["warp_tile"]["warp_tile_k"] = sorted_groups[1][2] + elif len(dimension_groups) == 2: + # If only 2 groups, assign based on magnitude + sorted_groups = sorted(dimension_groups, key=lambda x: max(x), reverse=True) + + # Larger = tile sizes + config["tile_sizes"]["tile_m"] = sorted_groups[0][0] + config["tile_sizes"]["tile_n"] = sorted_groups[0][1] + config["tile_sizes"]["tile_k"] = sorted_groups[0][2] + + # Smaller = warp config + config["warp_config"]["warp_m"] = sorted_groups[1][0] + config["warp_config"]["warp_n"] = sorted_groups[1][1] + config["warp_config"]["warp_k"] = sorted_groups[1][2] + elif len(dimension_groups) == 1: + # Only one group - assume it's tile sizes + config["tile_sizes"]["tile_m"] = dimension_groups[0][0] + config["tile_sizes"]["tile_n"] = dimension_groups[0][1] + config["tile_sizes"]["tile_k"] = dimension_groups[0][2] + + return config + + def generate_config_id(self, info: Dict) -> str: + """Generate a compact config ID from kernel info""" + # Create a compact identifier + parts = [ + info.get("data_type", "unk"), + info.get("layout", "unk"), + info.get("pipeline", "unk"), + info.get("scheduler", "unk"), + ] + + # Add tile configuration if available + tile_sizes = info.get("tile_sizes", {}) + if tile_sizes.get("tile_m", 0) > 0: + tile_str = ( + f"{tile_sizes['tile_m']}x{tile_sizes['tile_n']}x{tile_sizes['tile_k']}" + ) + parts.append(tile_str) + + # Add warp config if available + warp_config = info.get("warp_config", {}) + if warp_config.get("warp_m", 0) > 0: + warp_str = f"w{warp_config['warp_m']}x{warp_config['warp_n']}x{warp_config['warp_k']}" + parts.append(warp_str) + + # Add warp tile if available + warp_tile = info.get("warp_tile", {}) + if warp_tile.get("warp_tile_m", 0) > 0: + warp_tile_str = f"wt{warp_tile['warp_tile_m']}x{warp_tile['warp_tile_n']}x{warp_tile['warp_tile_k']}" + parts.append(warp_tile_str) + + return "_".join(parts) + + def benchmark_problem_size( + self, + kernels: List[Path], + m: int, + n: int, + k: int, + split_k: int = 1, + verify: int = 0, + warmup: int = 50, + repeat: int = 100, + flush_cache: bool = True, + rotating_count: int = 1000, + ) -> List[Dict]: + """Benchmark all kernels for a specific problem size""" + results = [] + + params = { + "m": m, + "n": n, + "k": k, + "split_k": split_k, + "verify": verify, + "warmup": warmup, + "repeat": repeat, + "flush_cache": str(flush_cache).lower(), + "rotating_count": rotating_count, + } + + print(f"\nBenchmarking M={m}, N={n}, K={k}, split_k={split_k}") + + for kernel_path in kernels: + kernel_info = self.extract_kernel_info(kernel_path) + result = benchmark_utils.run_kernel(self.build_dir, kernel_path, params, verbose=self.verbose) + if result: + # Create new structured result format + structured_result = { + "name": kernel_info["name"], # Add name field for compatibility + "config_id": kernel_info["config_id"], + "problem": result.get("problem", {}), + "perf_result": result.get("perf_result", {}), + "config": { + "data_type": kernel_info["data_type"], + "layout": kernel_info["layout"], + "pipeline": kernel_info["pipeline"], + "scheduler": kernel_info["scheduler"], + "epilogue": kernel_info["epilogue"], + "tile_sizes": kernel_info.get("tile_sizes", {}), + "warp_config": kernel_info.get("warp_config", {}), + "warp_tile": kernel_info.get("warp_tile", {}), + "optimization_flags": kernel_info.get("optimization_flags", {}), + }, + "executable": kernel_info["executable"], + # Keep backward compatibility fields + "time_ms": result.get("time_ms", 0), + "tflops": result.get("tflops", 0), + "bandwidth_gb_s": result.get("bandwidth_gb_s", 0), + } + + results.append(structured_result) + + if self.verbose: + print( + f" {kernel_info['config_id']}: {structured_result['tflops']:.2f} TFLOPS, {structured_result['bandwidth_gb_s']:.2f} GB/s, {structured_result['time_ms']:.2f}ms" + ) + + return results + + def benchmark_sweep( + self, + problem_sizes: List[Tuple[int, int, int]], + split_k_values: List[int] = [1], + verify: bool = False, + warmup: int = 50, + repeat: int = 100, + flush_cache: bool = True, + rotating_count: int = 1000, + ) -> Dict: + """Run comprehensive benchmark sweep""" + kernels = self.discover_kernels() + if not kernels: + print("No kernels found!") + return {} + + all_results = [] + best_kernels = {} + + for m, n, k in problem_sizes: + for split_k in split_k_values: + results = self.benchmark_problem_size( + kernels, + m, + n, + k, + split_k, + verify=2 if verify else 0, + warmup=warmup, + repeat=repeat, + flush_cache=flush_cache, + rotating_count=rotating_count, + ) + + all_results.extend(results) + + # Find best kernel for this configuration + best = benchmark_utils.find_best_kernel(results) + if best: + key = f"m{m}_n{n}_k{k}_splitk{split_k}" + best_kernels[key] = best + print( + f"Best for {key}: {best['name']} ({best['tflops']:.2f} TFLOPS, {best['bandwidth_gb_s']:.2f} GB/s, {best['time_ms']:.2f}ms)" + ) + + self.results = all_results + return best_kernels + + diff --git a/tile_engine/ops/gemm/gemm_multi_d/gemm_multi_d_benchmark.py b/tile_engine/ops/gemm/gemm_multi_d/gemm_multi_d_benchmark.py index faf04a7de0..ae79668707 100644 --- a/tile_engine/ops/gemm/gemm_multi_d/gemm_multi_d_benchmark.py +++ b/tile_engine/ops/gemm/gemm_multi_d/gemm_multi_d_benchmark.py @@ -1,587 +1,53 @@ # Copyright (c) Advanced Micro Devices, Inc., or its affiliates. # SPDX-License-Identifier: MIT +import os import sys import json import subprocess import argparse import csv import time +import importlib.util from pathlib import Path from typing import List, Dict, Tuple, Optional +def _import_gemm_benchmark(): + """Import validation utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(current_dir) -class GemmMultiDBenchmark: + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "gemm_benchmark", + os.path.join(parent_dir, "gemm_benchmark.py"), + ) + gemm_benchmark_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(gemm_benchmark_module) + + return gemm_benchmark_module.GemmBenchmark + +def _import_benchmark_utils(): + """Import benchmark utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(os.path.dirname(current_dir)) + + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "benchmark_utils", + os.path.join(parent_dir, "commons", "benchmark_utils.py"), + ) + benchmark_utils = importlib.util.module_from_spec(spec) + spec.loader.exec_module(benchmark_utils) + + return benchmark_utils + +GemmBenchmark = _import_gemm_benchmark() +benchmark_utils = _import_benchmark_utils() + +class GemmMultiDBenchmark(GemmBenchmark): def __init__(self, build_dir: str, verbose: bool = False): - self.build_dir = Path(build_dir) - self.verbose = verbose - self.results = [] - - def discover_kernels(self) -> List[Path]: - """Find all benchmark_gemm_multi_d_* executables in the build directory""" - bin_dir = self.build_dir / "bin" - if not bin_dir.exists(): - print(f"Error: Binary directory {bin_dir} does not exist") - return [] - - kernels = list(bin_dir.glob("benchmark_gemm_multi_d_*")) - if self.verbose: - print(f"Found {len(kernels)} kernel executables") - for k in kernels: - print(f" - {k.name}") - return kernels - - def extract_kernel_info(self, kernel_path: Path) -> Dict[str, str]: - """Extract comprehensive kernel information from filename""" - name = kernel_path.stem - - # Initialize with basic info - info = { - "executable": str(kernel_path), - "name": name, - "data_type": "unknown", - "layout": "unknown", - "pipeline": "unknown", - "scheduler": "unknown", - "epilogue": "unknown", - } - - # Parse the kernel name pattern: - # benchmark_gemm_multi_d_fp16_rcr_mem_default_intrawave_False_False_False_False_False_256x256x32_2x2x1_4x64x16 - parts = name.split("_") - - if len(parts) >= 5: - # Extract data type (3rd part after benchmark_gemm_) - info["data_type"] = parts[4] if len(parts) > 4 else "unknown" - - # Extract layout (4th part) - info["layout"] = parts[5] if len(parts) > 5 else "unknown" - - # Extract pipeline (5th part) - info["pipeline"] = parts[6] if len(parts) > 6 else "unknown" - - # Extract epilogue (6th part) - info["epilogue"] = parts[7] if len(parts) > 7 else "unknown" - - # Extract scheduler (7th part) - info["scheduler"] = parts[8] if len(parts) > 8 else "unknown" - - # Extract detailed configuration from the end of the name - config_info = self.parse_detailed_config(name) - info.update(config_info) - - # Generate config ID - info["config_id"] = self.generate_config_id(info) - - return info - - def parse_detailed_config(self, kernel_name: str) -> Dict: - """Parse detailed configuration from kernel name""" - config = { - "tile_sizes": {"tile_m": 0, "tile_n": 0, "tile_k": 0}, - "warp_config": {"warp_m": 0, "warp_n": 0, "warp_k": 0}, - "warp_tile": {"warp_tile_m": 0, "warp_tile_n": 0, "warp_tile_k": 0}, - "optimization_flags": { - "pad_m": False, - "pad_n": False, - "pad_k": False, - "persistent": False, - }, - } - - # Split by underscore and look for patterns - parts = kernel_name.split("_") - - # Look for boolean flags (sequence of True/False values) - bool_sequence = [] - for i, part in enumerate(parts): - if part in ["True", "False"]: - bool_sequence.append(part == "True") - # Continue collecting consecutive boolean values - j = i + 1 - while j < len(parts) and parts[j] in ["True", "False"]: - bool_sequence.append(parts[j] == "True") - j += 1 - break - - # Assign boolean flags if we found them - # Order: pad_m, pad_n, pad_k, persistent (4 flags total) - if len(bool_sequence) >= 4: - config["optimization_flags"]["pad_m"] = bool_sequence[0] - config["optimization_flags"]["pad_n"] = bool_sequence[1] - config["optimization_flags"]["pad_k"] = bool_sequence[2] - config["optimization_flags"]["persistent"] = bool_sequence[3] - - # Look for tile size patterns (e.g., 256x256x32_2x2x1_4x64x16) - # The pattern is: tile_sizes_warp_config_warp_tile - dimension_groups = [] - for part in parts: - if "x" in part and len(part.split("x")) == 3: - try: - dims = [int(x) for x in part.split("x")] - if all(d > 0 for d in dims): - dimension_groups.append(dims) - except ValueError: - continue - - # Assign dimensions based on order and magnitude - if len(dimension_groups) >= 3: - # Sort by magnitude to identify: largest=tile_sizes, smallest=warp_config, middle=warp_tile - sorted_groups = sorted(dimension_groups, key=max, reverse=True) - - # Largest dimensions = tile sizes - config["tile_sizes"]["tile_m"] = sorted_groups[0][0] - config["tile_sizes"]["tile_n"] = sorted_groups[0][1] - config["tile_sizes"]["tile_k"] = sorted_groups[0][2] - - # Smallest dimensions = warp config - config["warp_config"]["warp_m"] = sorted_groups[2][0] - config["warp_config"]["warp_n"] = sorted_groups[2][1] - config["warp_config"]["warp_k"] = sorted_groups[2][2] - - # Middle dimensions = warp tile - config["warp_tile"]["warp_tile_m"] = sorted_groups[1][0] - config["warp_tile"]["warp_tile_n"] = sorted_groups[1][1] - config["warp_tile"]["warp_tile_k"] = sorted_groups[1][2] - elif len(dimension_groups) == 2: - # If only 2 groups, assign based on magnitude - sorted_groups = sorted(dimension_groups, key=max, reverse=True) - - # Larger = tile sizes - config["tile_sizes"]["tile_m"] = sorted_groups[0][0] - config["tile_sizes"]["tile_n"] = sorted_groups[0][1] - config["tile_sizes"]["tile_k"] = sorted_groups[0][2] - - # Smaller = warp config - config["warp_config"]["warp_m"] = sorted_groups[1][0] - config["warp_config"]["warp_n"] = sorted_groups[1][1] - config["warp_config"]["warp_k"] = sorted_groups[1][2] - elif len(dimension_groups) == 1: - # Only one group - assume it's tile sizes - config["tile_sizes"]["tile_m"] = dimension_groups[0][0] - config["tile_sizes"]["tile_n"] = dimension_groups[0][1] - config["tile_sizes"]["tile_k"] = dimension_groups[0][2] - - return config - - def generate_config_id(self, info: Dict) -> str: - """Generate a compact config ID from kernel info""" - # Create a compact identifier - parts = [ - info.get("data_type", "unk"), - info.get("layout", "unk"), - info.get("pipeline", "unk"), - info.get("scheduler", "unk"), - ] - - # Add tile configuration if available - tile_sizes = info.get("tile_sizes", {}) - if tile_sizes.get("tile_m", 0) > 0: - tile_str = ( - f"{tile_sizes['tile_m']}x{tile_sizes['tile_n']}x{tile_sizes['tile_k']}" - ) - parts.append(tile_str) - - # Add warp config if available - warp_config = info.get("warp_config", {}) - if warp_config.get("warp_m", 0) > 0: - warp_str = f"w{warp_config['warp_m']}x{warp_config['warp_n']}x{warp_config['warp_k']}" - parts.append(warp_str) - - # Add warp tile if available - warp_tile = info.get("warp_tile", {}) - if warp_tile.get("warp_tile_m", 0) > 0: - warp_tile_str = f"wt{warp_tile['warp_tile_m']}x{warp_tile['warp_tile_n']}x{warp_tile['warp_tile_k']}" - parts.append(warp_tile_str) - - return "_".join(parts) - - def run_kernel(self, kernel_path: Path, params: Dict[str, str]) -> Optional[Dict]: - """Run a single kernel with given parameters and save output to individual JSON file""" - # Create results directory - results_dir = self.build_dir / "results" - results_dir.mkdir(exist_ok=True) - - # Generate unique JSON filename for this kernel - json_file = results_dir / f"{kernel_path.stem}.json" - - cmd = [str(kernel_path)] - - # Add parameters - for key, value in params.items(): - cmd.append(f"-{key}={value}") - - # Add JSON output flag for clean JSON output - cmd.append("-json_output=true") - - if self.verbose: - print(f"Running: {' '.join(cmd)}") - - try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - - if result.returncode != 0: - print(f"Error running {kernel_path.name}: {result.stderr}") - return None - - # Save raw output to individual JSON file - output = result.stdout.strip() - if output: - with open(json_file, "w") as f: - f.write(output) - - # Parse the JSON file - return self.parse_json_file(json_file) - else: - print(f"No output from {kernel_path.name}") - return None - - except subprocess.TimeoutExpired: - print(f"Timeout running {kernel_path.name}") - return None - except Exception as e: - print(f"Error running {kernel_path.name}: {e}") - return None - - def parse_json_file(self, json_file: Path) -> Optional[Dict]: - """Parse JSON data from individual kernel output file""" - try: - with open(json_file, "r") as f: - content = f.read().strip() - - # Parse the JSON directly since executables produce clean JSON - data = json.loads(content) - - # Return the complete JSON data as-is, just add some convenience fields - result = data.copy() - if "perf_result" in data: - perf = data["perf_result"] - # Add convenience fields for backward compatibility - result["time_ms"] = perf.get("latency(ms)", 0) - result["tflops"] = perf.get("tflops(TFlops)", 0) - result["bandwidth_gb_s"] = perf.get("bandwidth(GB/s)", 0) - - return result - - except json.JSONDecodeError as e: - if self.verbose: - print(f"Failed to parse JSON from {json_file}: {e}") - return None - except Exception as e: - if self.verbose: - print(f"Error reading JSON file {json_file}: {e}") - return None - - def benchmark_problem_size( - self, - kernels: List[Path], - m: int, - n: int, - k: int, - split_k: int = 1, - verify: int = 0, - warmup: int = 50, - repeat: int = 100, - flush_cache: bool = True, - rotating_count: int = 1000, - ) -> List[Dict]: - """Benchmark all kernels for a specific problem size""" - results = [] - - params = { - "m": m, - "n": n, - "k": k, - "split_k": split_k, - "verify": verify, - "warmup": warmup, - "repeat": repeat, - "flush_cache": str(flush_cache).lower(), - "rotating_count": rotating_count, - } - - print(f"\nBenchmarking M={m}, N={n}, K={k}, split_k={split_k}") - - for kernel_path in kernels: - kernel_info = self.extract_kernel_info(kernel_path) - result = self.run_kernel(kernel_path, params) - - if result: - # Create new structured result format - structured_result = { - "name": kernel_info["name"], # Add name field for compatibility - "config_id": kernel_info["config_id"], - "problem": result.get("problem", {}), - "perf_result": result.get("perf_result", {}), - "config": { - "data_type": kernel_info["data_type"], - "layout": kernel_info["layout"], - "pipeline": kernel_info["pipeline"], - "scheduler": kernel_info["scheduler"], - "epilogue": kernel_info["epilogue"], - "tile_sizes": kernel_info.get("tile_sizes", {}), - "warp_config": kernel_info.get("warp_config", {}), - "warp_tile": kernel_info.get("warp_tile", {}), - "optimization_flags": kernel_info.get("optimization_flags", {}), - }, - "executable": kernel_info["executable"], - # Keep backward compatibility fields - "time_ms": result.get("time_ms", 0), - "tflops": result.get("tflops", 0), - "bandwidth_gb_s": result.get("bandwidth_gb_s", 0), - } - - results.append(structured_result) - - if self.verbose: - print( - f" {kernel_info['config_id']}: {structured_result['tflops']:.2f} TFLOPS, {structured_result['bandwidth_gb_s']:.2f} GB/s, {structured_result['time_ms']:.2f}ms" - ) - - return results - - def find_best_kernel( - self, results: List[Dict], metric: str = "tflops" - ) -> Optional[Dict]: - """Find the best performing kernel based on metric""" - if not results: - return None - - if metric == "tflops": - return max(results, key=lambda x: x.get("tflops", 0)) - elif metric == "time_ms": - return min(results, key=lambda x: x.get("time_ms", float("inf"))) - elif metric == "bandwidth_gb_s": - return max(results, key=lambda x: x.get("bandwidth_gb_s", 0)) - else: - raise ValueError(f"Unknown metric: {metric}") - - def benchmark_sweep( - self, - problem_sizes: List[Tuple[int, int, int]], - split_k_values: List[int] = [1], - verify: bool = False, - warmup: int = 50, - repeat: int = 100, - flush_cache: bool = True, - rotating_count: int = 1000, - ) -> Dict: - """Run comprehensive benchmark sweep""" - kernels = self.discover_kernels() - if not kernels: - print("No kernels found!") - return {} - - all_results = [] - best_kernels = {} - - for m, n, k in problem_sizes: - for split_k in split_k_values: - results = self.benchmark_problem_size( - kernels, - m, - n, - k, - split_k, - verify=2 if verify else 0, - warmup=warmup, - repeat=repeat, - flush_cache=flush_cache, - rotating_count=rotating_count, - ) - - all_results.extend(results) - - # Find best kernel for this configuration - best = self.find_best_kernel(results) - if best: - key = f"m{m}_n{n}_k{k}_splitk{split_k}" - best_kernels[key] = best - print( - f"Best for {key}: {best['name']} ({best['tflops']:.2f} TFLOPS, {best['bandwidth_gb_s']:.2f} GB/s, {best['time_ms']:.2f}ms)" - ) - - self.results = all_results - return best_kernels - - def export_csv(self, filename: str): - """Export all results to CSV""" - if not self.results: - print("No results to export") - return - - # Get all unique keys from results - all_keys = set() - for result in self.results: - all_keys.update(result.keys()) - - # Sort keys for consistent output - fieldnames = sorted(all_keys) - - with open(filename, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(self.results) - - print(f"Results exported to {filename}") - - def export_best_kernels(self, best_kernels: Dict, filename: str): - """Export best kernel selections to file""" - with open(filename, "w") as f: - f.write("# Best kernel selections\n") - f.write( - "# Format: problem_size -> kernel_name (TFLOPS, bandwidth, latency)\n\n" - ) - - for key, kernel in sorted(best_kernels.items()): - f.write( - f"{key}: {kernel['name']} ({kernel['tflops']:.2f} TFLOPS, {kernel['bandwidth_gb_s']:.2f} GB/s, {kernel['time_ms']:.2f}ms)\n" - ) - - print(f"Best kernels exported to {filename}") - - def export_json(self, filename: str, best_kernels: Dict = None): - """Export all results and best kernels to JSON with comprehensive metadata""" - from datetime import datetime - - # Calculate comprehensive summary statistics for all metrics - successful_results = [r for r in self.results if r.get("tflops", 0) > 0] - - tflops_values = [r.get("tflops", 0) for r in successful_results] - bandwidth_values = [r.get("bandwidth_gb_s", 0) for r in successful_results] - latency_values = [ - r.get("time_ms", 0) for r in successful_results if r.get("time_ms", 0) > 0 - ] - - # Performance breakdown by kernel type - pipeline_stats = {} - scheduler_stats = {} - data_type_stats = {} - - for result in successful_results: - # Get config info from the new structure - config = result.get("config", {}) - - # Pipeline statistics - pipeline = config.get("pipeline", "unknown") - if pipeline not in pipeline_stats: - pipeline_stats[pipeline] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - pipeline_stats[pipeline]["count"] += 1 - pipeline_stats[pipeline]["best_tflops"] = max( - pipeline_stats[pipeline]["best_tflops"], result.get("tflops", 0) - ) - - # Scheduler statistics - scheduler = config.get("scheduler", "unknown") - if scheduler not in scheduler_stats: - scheduler_stats[scheduler] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - scheduler_stats[scheduler]["count"] += 1 - scheduler_stats[scheduler]["best_tflops"] = max( - scheduler_stats[scheduler]["best_tflops"], result.get("tflops", 0) - ) - - # Data type statistics - data_type = config.get("data_type", "unknown") - if data_type not in data_type_stats: - data_type_stats[data_type] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - data_type_stats[data_type]["count"] += 1 - data_type_stats[data_type]["best_tflops"] = max( - data_type_stats[data_type]["best_tflops"], result.get("tflops", 0) - ) - - # Calculate averages for breakdown stats - for stats_dict, field_name in [ - (pipeline_stats, "pipeline"), - (scheduler_stats, "scheduler"), - (data_type_stats, "data_type"), - ]: - for key in stats_dict: - relevant_results = [ - r - for r in successful_results - if r.get("config", {}).get(field_name, "unknown") == key - ] - if relevant_results: - stats_dict[key]["avg_tflops"] = sum( - r.get("tflops", 0) for r in relevant_results - ) / len(relevant_results) - - output_data = { - "benchmark_metadata": { - "timestamp": datetime.now().isoformat(), - "total_kernels_tested": len(self.results), - "unique_kernels": len( - set(r.get("name", "unknown") for r in self.results) - ), - "successful_runs": len(successful_results), - "failed_runs": len(self.results) - len(successful_results), - }, - "performance_summary": { - "tflops_stats": { - "best": max(tflops_values, default=0), - "average": sum(tflops_values) / len(tflops_values) - if tflops_values - else 0, - "min": min(tflops_values, default=0), - "median": sorted(tflops_values)[len(tflops_values) // 2] - if tflops_values - else 0, - }, - "bandwidth_stats": { - "best_gb_s": max(bandwidth_values, default=0), - "average_gb_s": sum(bandwidth_values) / len(bandwidth_values) - if bandwidth_values - else 0, - "min_gb_s": min(bandwidth_values, default=0), - "median_gb_s": sorted(bandwidth_values)[len(bandwidth_values) // 2] - if bandwidth_values - else 0, - }, - "latency_stats": { - "best_ms": min(latency_values, default=0), - "average_ms": sum(latency_values) / len(latency_values) - if latency_values - else 0, - "max_ms": max(latency_values, default=0), - "median_ms": sorted(latency_values)[len(latency_values) // 2] - if latency_values - else 0, - }, - "kernel_type_breakdown": { - "by_pipeline": pipeline_stats, - "by_scheduler": scheduler_stats, - "by_data_type": data_type_stats, - }, - "total_problem_configurations": len(best_kernels) - if best_kernels - else 0, - }, - "kernel_results": self.results, - "best_kernels_by_problem": best_kernels or {}, - } - - with open(filename, "w") as f: - json.dump(output_data, f, indent=2) - - print(f"JSON results exported to {filename}") - print(f" - Total kernels: {len(self.results)}") - print(f" - Successful runs: {len(successful_results)}") - print(f" - Best TFLOPS: {max(tflops_values, default=0):.2f}") - print(f" - Best bandwidth: {max(bandwidth_values, default=0):.2f} GB/s") - print(f" - Best latency: {min(latency_values, default=0):.2f}ms") - + super().__init__(build_dir, verbose, name="benchmark_gemm_multi_d_") def main(): parser = argparse.ArgumentParser( @@ -668,12 +134,12 @@ def main(): print(f"\nBenchmark completed in {elapsed_time:.2f} seconds") # Export results - benchmark.export_csv(args.csv) - benchmark.export_best_kernels(best_kernels, args.best) + benchmark_utils.export_csv(benchmark.results, args.csv) + benchmark_utils.export_best_kernels(best_kernels, args.best) # Export JSON if requested if args.json: - benchmark.export_json(args.json, best_kernels) + benchmark_utils.export_json(benchmark.results, args.json, best_kernels) return 0 diff --git a/tile_engine/ops/gemm/gemm_preshuffle/gemm_preshuffle_benchmark.py b/tile_engine/ops/gemm/gemm_preshuffle/gemm_preshuffle_benchmark.py index 53ae6336fa..1ea33834d7 100644 --- a/tile_engine/ops/gemm/gemm_preshuffle/gemm_preshuffle_benchmark.py +++ b/tile_engine/ops/gemm/gemm_preshuffle/gemm_preshuffle_benchmark.py @@ -1,588 +1,54 @@ # Copyright (c) Advanced Micro Devices, Inc., or its affiliates. # SPDX-License-Identifier: MIT +import os import sys import json import subprocess import argparse import csv import time +import importlib.util from pathlib import Path from typing import List, Dict, Tuple, Optional -class GemmPreshuffleBenchmark: +def _import_gemm_benchmark(): + """Import validation utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(current_dir) + + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "gemm_benchmark", + os.path.join(parent_dir, "gemm_benchmark.py"), + ) + gemm_benchmark_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(gemm_benchmark_module) + + return gemm_benchmark_module.GemmBenchmark + +def _import_benchmark_utils(): + """Import benchmark utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(os.path.dirname(current_dir)) + + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "benchmark_utils", + os.path.join(parent_dir, "commons", "benchmark_utils.py"), + ) + benchmark_utils = importlib.util.module_from_spec(spec) + spec.loader.exec_module(benchmark_utils) + + return benchmark_utils + +GemmBenchmark = _import_gemm_benchmark() +benchmark_utils = _import_benchmark_utils() + +class GemmPreshuffleBenchmark(GemmBenchmark): def __init__(self, build_dir: str, verbose: bool = False): - self.build_dir = Path(build_dir) - self.verbose = verbose - self.results = [] - - def discover_kernels(self) -> List[Path]: - """Find all benchmark_gemm_preshuffle* executables in the build directory""" - bin_dir = self.build_dir / "bin" - if not bin_dir.exists(): - print(f"Error: Binary directory {bin_dir} does not exist") - return [] - - kernels = list(bin_dir.glob("benchmark_gemm_preshuffle*")) - if self.verbose: - print(f"Found {len(kernels)} kernel executables") - for k in kernels: - print(f" - {k.name}") - return kernels - - def extract_kernel_info(self, kernel_path: Path) -> Dict[str, str]: - """Extract comprehensive kernel information from filename""" - name = kernel_path.stem - - # Initialize with basic info - info = { - "executable": str(kernel_path), - "name": name, - "data_type": "unknown", - "layout": "unknown", - "pipeline": "unknown", - "scheduler": "unknown", - "epilogue": "unknown", - } - - # Parse the kernel name pattern: - # benchmark_gemm_preshuffle_fp16_rcr_mem_default_intrawave_False_False_False_False_False_256x256x32_2x2x1_4x64x16 - parts = name.split("_") - - if len(parts) >= 4: - # Extract data type (4rd part after benchmark_gemm_preshuffle_) - info["data_type"] = parts[3] if len(parts) > 2 else "unknown" - - # Extract layout (5th part) - info["layout"] = parts[4] if len(parts) > 3 else "unknown" - - # Extract pipeline (6th part) - info["pipeline"] = parts[5] if len(parts) > 4 else "unknown" - - # Extract epilogue (7th part) - info["epilogue"] = parts[6] if len(parts) > 5 else "unknown" - - # Extract scheduler (8th part) - info["scheduler"] = parts[7] if len(parts) > 6 else "unknown" - - # Extract detailed configuration from the end of the name - config_info = self.parse_detailed_config(name) - info.update(config_info) - - # Generate config ID - info["config_id"] = self.generate_config_id(info) - - return info - - def parse_detailed_config(self, kernel_name: str) -> Dict: - """Parse detailed configuration from kernel name""" - config = { - "tile_sizes": {"tile_m": 0, "tile_n": 0, "tile_k": 0}, - "warp_config": {"warp_m": 0, "warp_n": 0, "warp_k": 0}, - "warp_tile": {"warp_tile_m": 0, "warp_tile_n": 0, "warp_tile_k": 0}, - "optimization_flags": { - "pad_m": False, - "pad_n": False, - "pad_k": False, - "persistent": False, - }, - } - - # Split by underscore and look for patterns - parts = kernel_name.split("_") - - # Look for boolean flags (sequence of True/False values) - bool_sequence = [] - for i, part in enumerate(parts): - if part in ["True", "False"]: - bool_sequence.append(part == "True") - # Continue collecting consecutive boolean values - j = i + 1 - while j < len(parts) and parts[j] in ["True", "False"]: - bool_sequence.append(parts[j] == "True") - j += 1 - break - - # Assign boolean flags if we found them - # Order: pad_m, pad_n, pad_k, persistent (4 flags total) - if len(bool_sequence) >= 4: - config["optimization_flags"]["pad_m"] = bool_sequence[0] - config["optimization_flags"]["pad_n"] = bool_sequence[1] - config["optimization_flags"]["pad_k"] = bool_sequence[2] - config["optimization_flags"]["persistent"] = bool_sequence[3] - - # Look for tile size patterns (e.g., 256x256x32_2x2x1_4x64x16) - # The pattern is: tile_sizes_warp_config_warp_tile - dimension_groups = [] - for part in parts: - if "x" in part and len(part.split("x")) == 3: - try: - dims = [int(x) for x in part.split("x")] - if all(d > 0 for d in dims): - dimension_groups.append(dims) - except ValueError: - continue - - # Assign dimensions based on order and magnitude - if len(dimension_groups) >= 3: - # Sort by magnitude to identify: largest=tile_sizes, smallest=warp_config, middle=warp_tile - sorted_groups = sorted(dimension_groups, key=lambda x: max(x), reverse=True) - - # Largest dimensions = tile sizes - config["tile_sizes"]["tile_m"] = sorted_groups[0][0] - config["tile_sizes"]["tile_n"] = sorted_groups[0][1] - config["tile_sizes"]["tile_k"] = sorted_groups[0][2] - - # Smallest dimensions = warp config - config["warp_config"]["warp_m"] = sorted_groups[2][0] - config["warp_config"]["warp_n"] = sorted_groups[2][1] - config["warp_config"]["warp_k"] = sorted_groups[2][2] - - # Middle dimensions = warp tile - config["warp_tile"]["warp_tile_m"] = sorted_groups[1][0] - config["warp_tile"]["warp_tile_n"] = sorted_groups[1][1] - config["warp_tile"]["warp_tile_k"] = sorted_groups[1][2] - elif len(dimension_groups) == 2: - # If only 2 groups, assign based on magnitude - sorted_groups = sorted(dimension_groups, key=lambda x: max(x), reverse=True) - - # Larger = tile sizes - config["tile_sizes"]["tile_m"] = sorted_groups[0][0] - config["tile_sizes"]["tile_n"] = sorted_groups[0][1] - config["tile_sizes"]["tile_k"] = sorted_groups[0][2] - - # Smaller = warp config - config["warp_config"]["warp_m"] = sorted_groups[1][0] - config["warp_config"]["warp_n"] = sorted_groups[1][1] - config["warp_config"]["warp_k"] = sorted_groups[1][2] - elif len(dimension_groups) == 1: - # Only one group - assume it's tile sizes - config["tile_sizes"]["tile_m"] = dimension_groups[0][0] - config["tile_sizes"]["tile_n"] = dimension_groups[0][1] - config["tile_sizes"]["tile_k"] = dimension_groups[0][2] - - return config - - def generate_config_id(self, info: Dict) -> str: - """Generate a compact config ID from kernel info""" - # Create a compact identifier - parts = [ - info.get("data_type", "unk"), - info.get("layout", "unk"), - info.get("pipeline", "unk"), - info.get("scheduler", "unk"), - ] - - # Add tile configuration if available - tile_sizes = info.get("tile_sizes", {}) - if tile_sizes.get("tile_m", 0) > 0: - tile_str = ( - f"{tile_sizes['tile_m']}x{tile_sizes['tile_n']}x{tile_sizes['tile_k']}" - ) - parts.append(tile_str) - - # Add warp config if available - warp_config = info.get("warp_config", {}) - if warp_config.get("warp_m", 0) > 0: - warp_str = f"w{warp_config['warp_m']}x{warp_config['warp_n']}x{warp_config['warp_k']}" - parts.append(warp_str) - - # Add warp tile if available - warp_tile = info.get("warp_tile", {}) - if warp_tile.get("warp_tile_m", 0) > 0: - warp_tile_str = f"wt{warp_tile['warp_tile_m']}x{warp_tile['warp_tile_n']}x{warp_tile['warp_tile_k']}" - parts.append(warp_tile_str) - - return "_".join(parts) - - def run_kernel(self, kernel_path: Path, params: Dict[str, str]) -> Optional[Dict]: - """Run a single kernel with given parameters and save output to individual JSON file""" - # Create results directory - results_dir = self.build_dir / "results" - results_dir.mkdir(exist_ok=True) - - # Generate unique JSON filename for this kernel - json_file = results_dir / f"{kernel_path.stem}.json" - - cmd = [str(kernel_path)] - - # Add parameters - for key, value in params.items(): - cmd.append(f"-{key}={value}") - - # Add JSON output flag for clean JSON output - cmd.append("-json_output=true") - - if self.verbose: - print(f"Running: {' '.join(cmd)}") - - try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - - if result.returncode != 0: - print(f"Error running {kernel_path.name}: {result.stderr}") - return None - - # Save raw output to individual JSON file - output = result.stdout.strip() - - if output: - with open(json_file, "w") as f: - f.write(output) - - # Parse the JSON file - return self.parse_json_file(json_file) - else: - print(f"No output from {kernel_path.name}") - return None - - except subprocess.TimeoutExpired: - print(f"Timeout running {kernel_path.name}") - return None - except Exception as e: - print(f"Error running {kernel_path.name}: {e}") - return None - - def parse_json_file(self, json_file: Path) -> Optional[Dict]: - """Parse JSON data from individual kernel output file""" - try: - with open(json_file, "r") as f: - content = f.read().strip() - - # Parse the JSON directly since executables produce clean JSON - data = json.loads(content) - - # Return the complete JSON data as-is, just add some convenience fields - result = data.copy() - if "perf_result" in data: - perf = data["perf_result"] - # Add convenience fields for backward compatibility - result["time_ms"] = perf.get("latency(ms)", 0) - result["tflops"] = perf.get("tflops(TFlops)", 0) - result["bandwidth_gb_s"] = perf.get("bandwidth(GB/s)", 0) - - return result - - except json.JSONDecodeError as e: - if self.verbose: - print(f"Failed to parse JSON from {json_file}: {e}") - return None - except Exception as e: - if self.verbose: - print(f"Error reading JSON file {json_file}: {e}") - return None - - def benchmark_problem_size( - self, - kernels: List[Path], - m: int, - n: int, - k: int, - split_k: int = 1, - verify: int = 0, - warmup: int = 50, - repeat: int = 100, - flush_cache: bool = True, - rotating_count: int = 1000, - ) -> List[Dict]: - """Benchmark all kernels for a specific problem size""" - results = [] - - params = { - "m": m, - "n": n, - "k": k, - "split_k": split_k, - "verify": verify, - "warmup": warmup, - "repeat": repeat, - "flush_cache": str(flush_cache).lower(), - "rotating_count": rotating_count, - } - - print(f"\nBenchmarking M={m}, N={n}, K={k}, split_k={split_k}") - - for kernel_path in kernels: - kernel_info = self.extract_kernel_info(kernel_path) - result = self.run_kernel(kernel_path, params) - - if result: - # Create new structured result format - structured_result = { - "name": kernel_info["name"], # Add name field for compatibility - "config_id": kernel_info["config_id"], - "problem": result.get("problem", {}), - "perf_result": result.get("perf_result", {}), - "config": { - "data_type": kernel_info["data_type"], - "layout": kernel_info["layout"], - "pipeline": kernel_info["pipeline"], - "scheduler": kernel_info["scheduler"], - "epilogue": kernel_info["epilogue"], - "tile_sizes": kernel_info.get("tile_sizes", {}), - "warp_config": kernel_info.get("warp_config", {}), - "warp_tile": kernel_info.get("warp_tile", {}), - "optimization_flags": kernel_info.get("optimization_flags", {}), - }, - "executable": kernel_info["executable"], - # Keep backward compatibility fields - "time_ms": result.get("time_ms", 0), - "tflops": result.get("tflops", 0), - "bandwidth_gb_s": result.get("bandwidth_gb_s", 0), - } - - results.append(structured_result) - - if self.verbose: - print( - f" {kernel_info['config_id']}: {structured_result['tflops']:.2f} TFLOPS, {structured_result['bandwidth_gb_s']:.2f} GB/s, {structured_result['time_ms']:.2f}ms" - ) - - return results - - def find_best_kernel( - self, results: List[Dict], metric: str = "tflops" - ) -> Optional[Dict]: - """Find the best performing kernel based on metric""" - if not results: - return None - - if metric == "tflops": - return max(results, key=lambda x: x.get("tflops", 0)) - elif metric == "time_ms": - return min(results, key=lambda x: x.get("time_ms", float("inf"))) - elif metric == "bandwidth_gb_s": - return max(results, key=lambda x: x.get("bandwidth_gb_s", 0)) - else: - raise ValueError(f"Unknown metric: {metric}") - - def benchmark_sweep( - self, - problem_sizes: List[Tuple[int, int, int]], - split_k_values: List[int] = [1], - verify: bool = False, - warmup: int = 50, - repeat: int = 100, - flush_cache: bool = True, - rotating_count: int = 1000, - ) -> Dict: - """Run comprehensive benchmark sweep""" - kernels = self.discover_kernels() - if not kernels: - print("No kernels found!") - return {} - - all_results = [] - best_kernels = {} - - for m, n, k in problem_sizes: - for split_k in split_k_values: - results = self.benchmark_problem_size( - kernels, - m, - n, - k, - split_k, - verify=2 if verify else 0, - warmup=warmup, - repeat=repeat, - flush_cache=flush_cache, - rotating_count=rotating_count, - ) - - all_results.extend(results) - - # Find best kernel for this configuration - best = self.find_best_kernel(results) - if best: - key = f"m{m}_n{n}_k{k}_splitk{split_k}" - best_kernels[key] = best - print( - f"Best for {key}: {best['name']} ({best['tflops']:.2f} TFLOPS, {best['bandwidth_gb_s']:.2f} GB/s, {best['time_ms']:.2f}ms)" - ) - - self.results = all_results - return best_kernels - - def export_csv(self, filename: str): - """Export all results to CSV""" - if not self.results: - print("No results to export") - return - - # Get all unique keys from results - all_keys = set() - for result in self.results: - all_keys.update(result.keys()) - - # Sort keys for consistent output - fieldnames = sorted(all_keys) - - with open(filename, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(self.results) - - print(f"Results exported to {filename}") - - def export_best_kernels(self, best_kernels: Dict, filename: str): - """Export best kernel selections to file""" - with open(filename, "w") as f: - f.write("# Best kernel selections\n") - f.write( - "# Format: problem_size -> kernel_name (TFLOPS, bandwidth, latency)\n\n" - ) - - for key, kernel in sorted(best_kernels.items()): - f.write( - f"{key}: {kernel['name']} ({kernel['tflops']:.2f} TFLOPS, {kernel['bandwidth_gb_s']:.2f} GB/s, {kernel['time_ms']:.2f}ms)\n" - ) - - print(f"Best kernels exported to {filename}") - - def export_json(self, filename: str, best_kernels: Dict = None): - """Export all results and best kernels to JSON with comprehensive metadata""" - from datetime import datetime - - # Calculate comprehensive summary statistics for all metrics - successful_results = [r for r in self.results if r.get("tflops", 0) > 0] - - tflops_values = [r.get("tflops", 0) for r in successful_results] - bandwidth_values = [r.get("bandwidth_gb_s", 0) for r in successful_results] - latency_values = [ - r.get("time_ms", 0) for r in successful_results if r.get("time_ms", 0) > 0 - ] - - # Performance breakdown by kernel type - pipeline_stats = {} - scheduler_stats = {} - data_type_stats = {} - - for result in successful_results: - # Get config info from the new structure - config = result.get("config", {}) - - # Pipeline statistics - pipeline = config.get("pipeline", "unknown") - if pipeline not in pipeline_stats: - pipeline_stats[pipeline] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - pipeline_stats[pipeline]["count"] += 1 - pipeline_stats[pipeline]["best_tflops"] = max( - pipeline_stats[pipeline]["best_tflops"], result.get("tflops", 0) - ) - - # Scheduler statistics - scheduler = config.get("scheduler", "unknown") - if scheduler not in scheduler_stats: - scheduler_stats[scheduler] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - scheduler_stats[scheduler]["count"] += 1 - scheduler_stats[scheduler]["best_tflops"] = max( - scheduler_stats[scheduler]["best_tflops"], result.get("tflops", 0) - ) - - # Data type statistics - data_type = config.get("data_type", "unknown") - if data_type not in data_type_stats: - data_type_stats[data_type] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - data_type_stats[data_type]["count"] += 1 - data_type_stats[data_type]["best_tflops"] = max( - data_type_stats[data_type]["best_tflops"], result.get("tflops", 0) - ) - - # Calculate averages for breakdown stats - for stats_dict, field_name in [ - (pipeline_stats, "pipeline"), - (scheduler_stats, "scheduler"), - (data_type_stats, "data_type"), - ]: - for key in stats_dict: - relevant_results = [ - r - for r in successful_results - if r.get("config", {}).get(field_name, "unknown") == key - ] - if relevant_results: - stats_dict[key]["avg_tflops"] = sum( - r.get("tflops", 0) for r in relevant_results - ) / len(relevant_results) - - output_data = { - "benchmark_metadata": { - "timestamp": datetime.now().isoformat(), - "total_kernels_tested": len(self.results), - "unique_kernels": len( - set(r.get("name", "unknown") for r in self.results) - ), - "successful_runs": len(successful_results), - "failed_runs": len(self.results) - len(successful_results), - }, - "performance_summary": { - "tflops_stats": { - "best": max(tflops_values, default=0), - "average": sum(tflops_values) / len(tflops_values) - if tflops_values - else 0, - "min": min(tflops_values, default=0), - "median": sorted(tflops_values)[len(tflops_values) // 2] - if tflops_values - else 0, - }, - "bandwidth_stats": { - "best_gb_s": max(bandwidth_values, default=0), - "average_gb_s": sum(bandwidth_values) / len(bandwidth_values) - if bandwidth_values - else 0, - "min_gb_s": min(bandwidth_values, default=0), - "median_gb_s": sorted(bandwidth_values)[len(bandwidth_values) // 2] - if bandwidth_values - else 0, - }, - "latency_stats": { - "best_ms": min(latency_values, default=0), - "average_ms": sum(latency_values) / len(latency_values) - if latency_values - else 0, - "max_ms": max(latency_values, default=0), - "median_ms": sorted(latency_values)[len(latency_values) // 2] - if latency_values - else 0, - }, - "kernel_type_breakdown": { - "by_pipeline": pipeline_stats, - "by_scheduler": scheduler_stats, - "by_data_type": data_type_stats, - }, - "total_problem_configurations": len(best_kernels) - if best_kernels - else 0, - }, - "kernel_results": self.results, - "best_kernels_by_problem": best_kernels or {}, - } - - with open(filename, "w") as f: - json.dump(output_data, f, indent=2) - - print(f"JSON results exported to {filename}") - print(f" - Total kernels: {len(self.results)}") - print(f" - Successful runs: {len(successful_results)}") - print(f" - Best TFLOPS: {max(tflops_values, default=0):.2f}") - print(f" - Best bandwidth: {max(bandwidth_values, default=0):.2f} GB/s") - print(f" - Best latency: {min(latency_values, default=0):.2f}ms") - + super().__init__(build_dir, verbose, name="benchmark_gemm_preshuffle_") def main(): parser = argparse.ArgumentParser( @@ -669,12 +135,12 @@ def main(): print(f"\nBenchmark completed in {elapsed_time:.2f} seconds") # Export results - benchmark.export_csv(args.csv) - benchmark.export_best_kernels(best_kernels, args.best) + benchmark_utils.export_csv(benchmark.results, args.csv) + benchmark_utils.export_best_kernels(best_kernels, args.best) # Export JSON if requested if args.json: - benchmark.export_json(args.json, best_kernels) + benchmark_utils.export_json(benchmark.results, args.json, best_kernels) return 0 diff --git a/tile_engine/ops/gemm/gemm_universal/CMakeLists.txt b/tile_engine/ops/gemm/gemm_universal/CMakeLists.txt index 7505fcd6d0..7f8048b594 100644 --- a/tile_engine/ops/gemm/gemm_universal/CMakeLists.txt +++ b/tile_engine/ops/gemm/gemm_universal/CMakeLists.txt @@ -68,7 +68,7 @@ function(create_individual_gemm_universal_target datatype layout trait tile_conf # Create the executable add_executable(${target_name} EXCLUDE_FROM_ALL - ${GEMM_UNIVERSAL_SOURCE_DIR}/gemm_benchmark_single.cpp + ${GEMM_UNIVERSAL_SOURCE_DIR}/gemm_universal_benchmark_single.cpp ${instance_header} ) diff --git a/tile_engine/ops/gemm/gemm_universal/gemm_benchmark.py b/tile_engine/ops/gemm/gemm_universal/gemm_benchmark.py deleted file mode 100644 index b7424c6d1d..0000000000 --- a/tile_engine/ops/gemm/gemm_universal/gemm_benchmark.py +++ /dev/null @@ -1,678 +0,0 @@ -# Copyright (c) Advanced Micro Devices, Inc., or its affiliates. -# SPDX-License-Identifier: MIT - -import sys -import json -import subprocess -import argparse -import csv -import time -from pathlib import Path -from typing import List, Dict, Tuple, Optional - - -class GemmBenchmark: - def __init__(self, build_dir: str, verbose: bool = False): - self.build_dir = Path(build_dir) - self.verbose = verbose - self.results = [] - - def discover_kernels(self) -> List[Path]: - """Find all benchmark_gemm_* executables in the build directory""" - bin_dir = self.build_dir / "bin" - if not bin_dir.exists(): - print(f"Error: Binary directory {bin_dir} does not exist") - return [] - - kernels = list(bin_dir.glob("benchmark_gemm_*")) - if self.verbose: - print(f"Found {len(kernels)} kernel executables") - for k in kernels: - print(f" - {k.name}") - return kernels - - def extract_kernel_info(self, kernel_path: Path) -> Dict[str, str]: - """Extract comprehensive kernel information from filename""" - name = kernel_path.stem - - # Initialize with basic info - info = { - "executable": str(kernel_path), - "name": name, - "data_type": "unknown", - "layout": "unknown", - "pipeline": "unknown", - "scheduler": "unknown", - "epilogue": "unknown", - } - - # Parse the kernel name pattern: - # benchmark_gemm_fp16_rcr_mem_default_intrawave_False_False_False_False_False_256x256x32_2x2x1_4x64x16 - parts = name.split("_") - - if len(parts) >= 3: - # Extract data type (3rd part after benchmark_gemm_) - info["data_type"] = parts[2] if len(parts) > 2 else "unknown" - - # Extract layout (4th part) - info["layout"] = parts[3] if len(parts) > 3 else "unknown" - - # Extract pipeline (5th part) - info["pipeline"] = parts[4] if len(parts) > 4 else "unknown" - - # Extract epilogue (6th part) - info["epilogue"] = parts[5] if len(parts) > 5 else "unknown" - - # Extract scheduler (7th part) - info["scheduler"] = parts[6] if len(parts) > 6 else "unknown" - - # Extract detailed configuration from the end of the name - config_info = self.parse_detailed_config(name) - info.update(config_info) - - # Generate config ID - info["config_id"] = self.generate_config_id(info) - - return info - - def parse_detailed_config(self, kernel_name: str) -> Dict: - """Parse detailed configuration from kernel name""" - config = { - "tile_sizes": {"tile_m": 0, "tile_n": 0, "tile_k": 0}, - "warp_config": {"warp_m": 0, "warp_n": 0, "warp_k": 0}, - "warp_tile": {"warp_tile_m": 0, "warp_tile_n": 0, "warp_tile_k": 0}, - "optimization_flags": { - "pad_m": False, - "pad_n": False, - "pad_k": False, - "persistent": False, - }, - } - - # Split by underscore and look for patterns - parts = kernel_name.split("_") - - # Look for boolean flags (sequence of True/False values) - bool_sequence = [] - for i, part in enumerate(parts): - if part in ["True", "False"]: - bool_sequence.append(part == "True") - # Continue collecting consecutive boolean values - j = i + 1 - while j < len(parts) and parts[j] in ["True", "False"]: - bool_sequence.append(parts[j] == "True") - j += 1 - break - - # Assign boolean flags if we found them - # Order: pad_m, pad_n, pad_k, persistent (4 flags total) - if len(bool_sequence) >= 4: - config["optimization_flags"]["pad_m"] = bool_sequence[0] - config["optimization_flags"]["pad_n"] = bool_sequence[1] - config["optimization_flags"]["pad_k"] = bool_sequence[2] - config["optimization_flags"]["persistent"] = bool_sequence[3] - - # Look for tile size patterns (e.g., 256x256x32_2x2x1_4x64x16) - # The pattern is: tile_sizes_warp_config_warp_tile - dimension_groups = [] - for part in parts: - if "x" in part and len(part.split("x")) == 3: - try: - dims = [int(x) for x in part.split("x")] - if all(d > 0 for d in dims): - dimension_groups.append(dims) - except ValueError: - continue - - # Assign dimensions based on order and magnitude - if len(dimension_groups) >= 3: - # Sort by magnitude to identify: largest=tile_sizes, smallest=warp_config, middle=warp_tile - sorted_groups = sorted(dimension_groups, key=lambda x: max(x), reverse=True) - - # Largest dimensions = tile sizes - config["tile_sizes"]["tile_m"] = sorted_groups[0][0] - config["tile_sizes"]["tile_n"] = sorted_groups[0][1] - config["tile_sizes"]["tile_k"] = sorted_groups[0][2] - - # Smallest dimensions = warp config - config["warp_config"]["warp_m"] = sorted_groups[2][0] - config["warp_config"]["warp_n"] = sorted_groups[2][1] - config["warp_config"]["warp_k"] = sorted_groups[2][2] - - # Middle dimensions = warp tile - config["warp_tile"]["warp_tile_m"] = sorted_groups[1][0] - config["warp_tile"]["warp_tile_n"] = sorted_groups[1][1] - config["warp_tile"]["warp_tile_k"] = sorted_groups[1][2] - elif len(dimension_groups) == 2: - # If only 2 groups, assign based on magnitude - sorted_groups = sorted(dimension_groups, key=lambda x: max(x), reverse=True) - - # Larger = tile sizes - config["tile_sizes"]["tile_m"] = sorted_groups[0][0] - config["tile_sizes"]["tile_n"] = sorted_groups[0][1] - config["tile_sizes"]["tile_k"] = sorted_groups[0][2] - - # Smaller = warp config - config["warp_config"]["warp_m"] = sorted_groups[1][0] - config["warp_config"]["warp_n"] = sorted_groups[1][1] - config["warp_config"]["warp_k"] = sorted_groups[1][2] - elif len(dimension_groups) == 1: - # Only one group - assume it's tile sizes - config["tile_sizes"]["tile_m"] = dimension_groups[0][0] - config["tile_sizes"]["tile_n"] = dimension_groups[0][1] - config["tile_sizes"]["tile_k"] = dimension_groups[0][2] - - return config - - def generate_config_id(self, info: Dict) -> str: - """Generate a compact config ID from kernel info""" - # Create a compact identifier - parts = [ - info.get("data_type", "unk"), - info.get("layout", "unk"), - info.get("pipeline", "unk"), - info.get("scheduler", "unk"), - ] - - # Add tile configuration if available - tile_sizes = info.get("tile_sizes", {}) - if tile_sizes.get("tile_m", 0) > 0: - tile_str = ( - f"{tile_sizes['tile_m']}x{tile_sizes['tile_n']}x{tile_sizes['tile_k']}" - ) - parts.append(tile_str) - - # Add warp config if available - warp_config = info.get("warp_config", {}) - if warp_config.get("warp_m", 0) > 0: - warp_str = f"w{warp_config['warp_m']}x{warp_config['warp_n']}x{warp_config['warp_k']}" - parts.append(warp_str) - - # Add warp tile if available - warp_tile = info.get("warp_tile", {}) - if warp_tile.get("warp_tile_m", 0) > 0: - warp_tile_str = f"wt{warp_tile['warp_tile_m']}x{warp_tile['warp_tile_n']}x{warp_tile['warp_tile_k']}" - parts.append(warp_tile_str) - - return "_".join(parts) - - def run_kernel(self, kernel_path: Path, params: Dict[str, str]) -> Optional[Dict]: - """Run a single kernel with given parameters and save output to individual JSON file""" - # Create results directory - results_dir = self.build_dir / "results" - results_dir.mkdir(exist_ok=True) - - # Generate unique JSON filename for this kernel - json_file = results_dir / f"{kernel_path.stem}.json" - - cmd = [str(kernel_path)] - - # Add parameters - for key, value in params.items(): - cmd.append(f"-{key}={value}") - - # Add JSON output flag for clean JSON output - cmd.append("-json_output=true") - - if self.verbose: - print(f"Running: {' '.join(cmd)}") - - try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - - if result.returncode != 0: - print(f"Error running {kernel_path.name}: {result.stderr}") - return None - - # Save raw output to individual JSON file - output = result.stdout.strip() - if output: - with open(json_file, "w") as f: - f.write(output) - - # Parse the JSON file - return self.parse_json_file(json_file) - else: - print(f"No output from {kernel_path.name}") - return None - - except subprocess.TimeoutExpired: - print(f"Timeout running {kernel_path.name}") - return None - except Exception as e: - print(f"Error running {kernel_path.name}: {e}") - return None - - def parse_json_file(self, json_file: Path) -> Optional[Dict]: - """Parse JSON data from individual kernel output file""" - try: - with open(json_file, "r") as f: - content = f.read().strip() - - # Parse the JSON directly since executables produce clean JSON - data = json.loads(content) - - # Return the complete JSON data as-is, just add some convenience fields - result = data.copy() - if "perf_result" in data: - perf = data["perf_result"] - # Add convenience fields for backward compatibility - result["time_ms"] = perf.get("latency(ms)", 0) - result["tflops"] = perf.get("tflops(TFlops)", 0) - result["bandwidth_gb_s"] = perf.get("bandwidth(GB/s)", 0) - - return result - - except json.JSONDecodeError as e: - if self.verbose: - print(f"Failed to parse JSON from {json_file}: {e}") - return None - except Exception as e: - if self.verbose: - print(f"Error reading JSON file {json_file}: {e}") - return None - - def benchmark_problem_size( - self, - kernels: List[Path], - m: int, - n: int, - k: int, - split_k: int = 1, - verify: int = 0, - warmup: int = 50, - repeat: int = 100, - flush_cache: bool = True, - rotating_count: int = 1000, - ) -> List[Dict]: - """Benchmark all kernels for a specific problem size""" - results = [] - - params = { - "m": m, - "n": n, - "k": k, - "split_k": split_k, - "verify": verify, - "warmup": warmup, - "repeat": repeat, - "flush_cache": str(flush_cache).lower(), - "rotating_count": rotating_count, - } - - print(f"\nBenchmarking M={m}, N={n}, K={k}, split_k={split_k}") - - for kernel_path in kernels: - kernel_info = self.extract_kernel_info(kernel_path) - result = self.run_kernel(kernel_path, params) - - if result: - # Create new structured result format - structured_result = { - "name": kernel_info["name"], # Add name field for compatibility - "config_id": kernel_info["config_id"], - "problem": result.get("problem", {}), - "perf_result": result.get("perf_result", {}), - "config": { - "data_type": kernel_info["data_type"], - "layout": kernel_info["layout"], - "pipeline": kernel_info["pipeline"], - "scheduler": kernel_info["scheduler"], - "epilogue": kernel_info["epilogue"], - "tile_sizes": kernel_info.get("tile_sizes", {}), - "warp_config": kernel_info.get("warp_config", {}), - "warp_tile": kernel_info.get("warp_tile", {}), - "optimization_flags": kernel_info.get("optimization_flags", {}), - }, - "executable": kernel_info["executable"], - # Keep backward compatibility fields - "time_ms": result.get("time_ms", 0), - "tflops": result.get("tflops", 0), - "bandwidth_gb_s": result.get("bandwidth_gb_s", 0), - } - - results.append(structured_result) - - if self.verbose: - print( - f" {kernel_info['config_id']}: {structured_result['tflops']:.2f} TFLOPS, {structured_result['bandwidth_gb_s']:.2f} GB/s, {structured_result['time_ms']:.2f}ms" - ) - - return results - - def find_best_kernel( - self, results: List[Dict], metric: str = "tflops" - ) -> Optional[Dict]: - """Find the best performing kernel based on metric""" - if not results: - return None - - if metric == "tflops": - return max(results, key=lambda x: x.get("tflops", 0)) - elif metric == "time_ms": - return min(results, key=lambda x: x.get("time_ms", float("inf"))) - elif metric == "bandwidth_gb_s": - return max(results, key=lambda x: x.get("bandwidth_gb_s", 0)) - else: - raise ValueError(f"Unknown metric: {metric}") - - def benchmark_sweep( - self, - problem_sizes: List[Tuple[int, int, int]], - split_k_values: List[int] = [1], - verify: bool = False, - warmup: int = 50, - repeat: int = 100, - flush_cache: bool = True, - rotating_count: int = 1000, - ) -> Dict: - """Run comprehensive benchmark sweep""" - kernels = self.discover_kernels() - if not kernels: - print("No kernels found!") - return {} - - all_results = [] - best_kernels = {} - - for m, n, k in problem_sizes: - for split_k in split_k_values: - results = self.benchmark_problem_size( - kernels, - m, - n, - k, - split_k, - verify=2 if verify else 0, - warmup=warmup, - repeat=repeat, - flush_cache=flush_cache, - rotating_count=rotating_count, - ) - - all_results.extend(results) - - # Find best kernel for this configuration - best = self.find_best_kernel(results) - if best: - key = f"m{m}_n{n}_k{k}_splitk{split_k}" - best_kernels[key] = best - print( - f"Best for {key}: {best['name']} ({best['tflops']:.2f} TFLOPS, {best['bandwidth_gb_s']:.2f} GB/s, {best['time_ms']:.2f}ms)" - ) - - self.results = all_results - return best_kernels - - def export_csv(self, filename: str): - """Export all results to CSV""" - if not self.results: - print("No results to export") - return - - # Get all unique keys from results - all_keys = set() - for result in self.results: - all_keys.update(result.keys()) - - # Sort keys for consistent output - fieldnames = sorted(all_keys) - - with open(filename, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(self.results) - - print(f"Results exported to {filename}") - - def export_best_kernels(self, best_kernels: Dict, filename: str): - """Export best kernel selections to file""" - with open(filename, "w") as f: - f.write("# Best kernel selections\n") - f.write( - "# Format: problem_size -> kernel_name (TFLOPS, bandwidth, latency)\n\n" - ) - - for key, kernel in sorted(best_kernels.items()): - f.write( - f"{key}: {kernel['name']} ({kernel['tflops']:.2f} TFLOPS, {kernel['bandwidth_gb_s']:.2f} GB/s, {kernel['time_ms']:.2f}ms)\n" - ) - - print(f"Best kernels exported to {filename}") - - def export_json(self, filename: str, best_kernels: Dict = None): - """Export all results and best kernels to JSON with comprehensive metadata""" - from datetime import datetime - - # Calculate comprehensive summary statistics for all metrics - successful_results = [r for r in self.results if r.get("tflops", 0) > 0] - - tflops_values = [r.get("tflops", 0) for r in successful_results] - bandwidth_values = [r.get("bandwidth_gb_s", 0) for r in successful_results] - latency_values = [ - r.get("time_ms", 0) for r in successful_results if r.get("time_ms", 0) > 0 - ] - - # Performance breakdown by kernel type - pipeline_stats = {} - scheduler_stats = {} - data_type_stats = {} - - for result in successful_results: - # Get config info from the new structure - config = result.get("config", {}) - - # Pipeline statistics - pipeline = config.get("pipeline", "unknown") - if pipeline not in pipeline_stats: - pipeline_stats[pipeline] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - pipeline_stats[pipeline]["count"] += 1 - pipeline_stats[pipeline]["best_tflops"] = max( - pipeline_stats[pipeline]["best_tflops"], result.get("tflops", 0) - ) - - # Scheduler statistics - scheduler = config.get("scheduler", "unknown") - if scheduler not in scheduler_stats: - scheduler_stats[scheduler] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - scheduler_stats[scheduler]["count"] += 1 - scheduler_stats[scheduler]["best_tflops"] = max( - scheduler_stats[scheduler]["best_tflops"], result.get("tflops", 0) - ) - - # Data type statistics - data_type = config.get("data_type", "unknown") - if data_type not in data_type_stats: - data_type_stats[data_type] = { - "count": 0, - "avg_tflops": 0, - "best_tflops": 0, - } - data_type_stats[data_type]["count"] += 1 - data_type_stats[data_type]["best_tflops"] = max( - data_type_stats[data_type]["best_tflops"], result.get("tflops", 0) - ) - - # Calculate averages for breakdown stats - for stats_dict, field_name in [ - (pipeline_stats, "pipeline"), - (scheduler_stats, "scheduler"), - (data_type_stats, "data_type"), - ]: - for key in stats_dict: - relevant_results = [ - r - for r in successful_results - if r.get("config", {}).get(field_name, "unknown") == key - ] - if relevant_results: - stats_dict[key]["avg_tflops"] = sum( - r.get("tflops", 0) for r in relevant_results - ) / len(relevant_results) - - output_data = { - "benchmark_metadata": { - "timestamp": datetime.now().isoformat(), - "total_kernels_tested": len(self.results), - "unique_kernels": len( - set(r.get("name", "unknown") for r in self.results) - ), - "successful_runs": len(successful_results), - "failed_runs": len(self.results) - len(successful_results), - }, - "performance_summary": { - "tflops_stats": { - "best": max(tflops_values, default=0), - "average": sum(tflops_values) / len(tflops_values) - if tflops_values - else 0, - "min": min(tflops_values, default=0), - "median": sorted(tflops_values)[len(tflops_values) // 2] - if tflops_values - else 0, - }, - "bandwidth_stats": { - "best_gb_s": max(bandwidth_values, default=0), - "average_gb_s": sum(bandwidth_values) / len(bandwidth_values) - if bandwidth_values - else 0, - "min_gb_s": min(bandwidth_values, default=0), - "median_gb_s": sorted(bandwidth_values)[len(bandwidth_values) // 2] - if bandwidth_values - else 0, - }, - "latency_stats": { - "best_ms": min(latency_values, default=0), - "average_ms": sum(latency_values) / len(latency_values) - if latency_values - else 0, - "max_ms": max(latency_values, default=0), - "median_ms": sorted(latency_values)[len(latency_values) // 2] - if latency_values - else 0, - }, - "kernel_type_breakdown": { - "by_pipeline": pipeline_stats, - "by_scheduler": scheduler_stats, - "by_data_type": data_type_stats, - }, - "total_problem_configurations": len(best_kernels) - if best_kernels - else 0, - }, - "kernel_results": self.results, - "best_kernels_by_problem": best_kernels or {}, - } - - with open(filename, "w") as f: - json.dump(output_data, f, indent=2) - - print(f"JSON results exported to {filename}") - print(f" - Total kernels: {len(self.results)}") - print(f" - Successful runs: {len(successful_results)}") - print(f" - Best TFLOPS: {max(tflops_values, default=0):.2f}") - print(f" - Best bandwidth: {max(bandwidth_values, default=0):.2f} GB/s") - print(f" - Best latency: {min(latency_values, default=0):.2f}ms") - - -def main(): - parser = argparse.ArgumentParser(description="GEMM Kernel Benchmarking Tool") - parser.add_argument( - "build_dir", help="Build directory containing kernel executables" - ) - parser.add_argument( - "--problem-sizes", - nargs="+", - default=["1024,1024,1024", "2048,2048,2048", "4096,4096,4096"], - help="Problem sizes as M,N,K tuples", - ) - parser.add_argument( - "--split-k", nargs="+", type=int, default=[1], help="Split-K values to test" - ) - parser.add_argument("--verify", action="store_true", help="Enable verification") - parser.add_argument( - "--csv", default="gemm_benchmark_results.csv", help="CSV output filename" - ) - parser.add_argument( - "--best", default="best_kernels.txt", help="Best kernels output filename" - ) - parser.add_argument("--verbose", action="store_true", help="Verbose output") - parser.add_argument( - "--warmup", - type=int, - default=50, - help="Number of warmup iterations (default: 50)", - ) - parser.add_argument( - "--repeat", - type=int, - default=100, - help="Number of benchmark iterations (default: 100)", - ) - parser.add_argument( - "--flush-cache", - action="store_true", - default=True, - help="Enable cache flushing (default: True)", - ) - parser.add_argument( - "--rotating-count", - type=int, - default=1000, - help="Number of iterations to rotate cache (default: 1000)", - ) - parser.add_argument("--json", help="JSON output filename (optional)") - - args = parser.parse_args() - - # Parse problem sizes - problem_sizes = [] - for size_str in args.problem_sizes: - try: - m, n, k = map(int, size_str.split(",")) - problem_sizes.append((m, n, k)) - except ValueError: - print(f"Invalid problem size: {size_str}") - return 1 - - # Create benchmark instance - benchmark = GemmBenchmark(args.build_dir, verbose=args.verbose) - - # Run benchmark sweep - print("Starting GEMM kernel benchmark sweep...") - start_time = time.time() - - best_kernels = benchmark.benchmark_sweep( - problem_sizes=problem_sizes, - split_k_values=args.split_k, - verify=args.verify, - warmup=args.warmup, - repeat=args.repeat, - flush_cache=args.flush_cache, - rotating_count=args.rotating_count, - ) - - elapsed_time = time.time() - start_time - print(f"\nBenchmark completed in {elapsed_time:.2f} seconds") - - # Export results - benchmark.export_csv(args.csv) - benchmark.export_best_kernels(best_kernels, args.best) - - # Export JSON if requested - if args.json: - benchmark.export_json(args.json, best_kernels) - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/tile_engine/ops/gemm/gemm_universal/gemm_benchmark.hpp b/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.hpp similarity index 99% rename from tile_engine/ops/gemm/gemm_universal/gemm_benchmark.hpp rename to tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.hpp index 7c8df32ad8..c7f4f470b0 100644 --- a/tile_engine/ops/gemm/gemm_universal/gemm_benchmark.hpp +++ b/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.hpp @@ -11,7 +11,7 @@ #include "ck_tile/core.hpp" #include "ck_tile/host.hpp" -#include "gemm_common.hpp" +#include "gemm_universal_common.hpp" // Data types and Layouts are defined by the generated kernel headers // No hardcoded type definitions here to avoid conflicts diff --git a/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.py b/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.py new file mode 100755 index 0000000000..88ed4465af --- /dev/null +++ b/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +# Copyright (c) Advanced Micro Devices, Inc., or its affiliates. +# SPDX-License-Identifier: MIT + +import os +import sys +import json +import subprocess +import argparse +import csv +import time +import importlib.util +from pathlib import Path +from typing import List, Dict, Tuple, Optional + +def _import_gemm_benchmark(): + """Import validation utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(current_dir) + + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "gemm_benchmark", + os.path.join(parent_dir, "gemm_benchmark.py"), + ) + gemm_benchmark_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(gemm_benchmark_module) + + return gemm_benchmark_module.GemmBenchmark + +def _import_benchmark_utils(): + """Import benchmark utilities from commons directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + parent_dir = os.path.dirname(os.path.dirname(current_dir)) + + # Load the module dynamically + spec = importlib.util.spec_from_file_location( + "benchmark_utils", + os.path.join(parent_dir, "commons", "benchmark_utils.py"), + ) + benchmark_utils = importlib.util.module_from_spec(spec) + spec.loader.exec_module(benchmark_utils) + + return benchmark_utils + +GemmBenchmark = _import_gemm_benchmark() +benchmark_utils = _import_benchmark_utils() + +class GemmUniversalBenchmark(GemmBenchmark): + def __init__(self, build_dir: str, verbose: bool = False): + super().__init__(build_dir, verbose, name="benchmark_gemm_") + + +def main(): + parser = argparse.ArgumentParser(description="GEMM Kernel Benchmarking Tool") + parser.add_argument( + "build_dir", help="Build directory containing kernel executables" + ) + parser.add_argument( + "--problem-sizes", + nargs="+", + default=["1024,1024,1024", "2048,2048,2048", "4096,4096,4096"], + help="Problem sizes as M,N,K tuples", + ) + parser.add_argument( + "--split-k", nargs="+", type=int, default=[1], help="Split-K values to test" + ) + parser.add_argument("--verify", action="store_true", help="Enable verification") + parser.add_argument( + "--csv", default="gemm_benchmark_results.csv", help="CSV output filename" + ) + parser.add_argument( + "--best", default="best_kernels.txt", help="Best kernels output filename" + ) + parser.add_argument("--verbose", action="store_true", help="Verbose output") + parser.add_argument( + "--warmup", + type=int, + default=50, + help="Number of warmup iterations (default: 50)", + ) + parser.add_argument( + "--repeat", + type=int, + default=100, + help="Number of benchmark iterations (default: 100)", + ) + parser.add_argument( + "--flush-cache", + action="store_true", + default=True, + help="Enable cache flushing (default: True)", + ) + parser.add_argument( + "--rotating-count", + type=int, + default=1000, + help="Number of iterations to rotate cache (default: 1000)", + ) + parser.add_argument("--json", help="JSON output filename (optional)") + + args = parser.parse_args() + + # Parse problem sizes + problem_sizes = [] + for size_str in args.problem_sizes: + try: + m, n, k = map(int, size_str.split(",")) + problem_sizes.append((m, n, k)) + except ValueError: + print(f"Invalid problem size: {size_str}") + return 1 + + # Create benchmark instance + benchmark = GemmUniversalBenchmark(args.build_dir, verbose=args.verbose) + + # Run benchmark sweep + print("Starting GEMM kernel benchmark sweep...") + start_time = time.time() + + best_kernels = benchmark.benchmark_sweep( + problem_sizes=problem_sizes, + split_k_values=args.split_k, + verify=args.verify, + warmup=args.warmup, + repeat=args.repeat, + flush_cache=args.flush_cache, + rotating_count=args.rotating_count, + ) + + elapsed_time = time.time() - start_time + print(f"\nBenchmark completed in {elapsed_time:.2f} seconds") + + # Export results + benchmark_utils.export_csv(benchmark.results, args.csv) + benchmark_utils.export_best_kernels(best_kernels, args.best) + + # Export JSON if requested + if args.json: + benchmark_utils.export_json(benchmark.results, args.json, best_kernels) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tile_engine/ops/gemm/gemm_universal/gemm_benchmark_single.cpp b/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark_single.cpp similarity index 55% rename from tile_engine/ops/gemm/gemm_universal/gemm_benchmark_single.cpp rename to tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark_single.cpp index 6323c066a1..613a42ff80 100644 --- a/tile_engine/ops/gemm/gemm_universal/gemm_benchmark_single.cpp +++ b/tile_engine/ops/gemm/gemm_universal/gemm_universal_benchmark_single.cpp @@ -11,70 +11,13 @@ #include "ck_tile/core.hpp" #include "ck_tile/host.hpp" -#include "gemm_profiler.hpp" -#include "gemm_common.hpp" +#include "gemm_universal_profiler.hpp" +#include "gemm_universal_common.hpp" // The kernel header is included via the compile command line with -include flag // It defines SelectedKernel struct and KERNEL_NAME // DataTypeTraits are now defined in gemm_common.hpp -// Create argument parser -inline auto create_args(int argc, char* argv[]) -{ - ck_tile::ArgParser arg_parser; - arg_parser.insert("m", "3840", "The value for m dimension. Default is 3840.") - .insert("n", "4096", "The value for n dimension. Default is 4096.") - .insert("k", "2048", "The value for k dimension. Default is 2048.") - .insert("stride_a", "0", "The stride value for tensor A. Default is 0.") - .insert("stride_b", "0", "The stride value for tensor B. Default is 0.") - .insert("stride_c", "0", "The stride value for tensor C. Default is 0.") - .insert("split_k", "1", "The split value for k dimension. Default is 1.") - .insert("verify", - "2", - "The type of validation. Set to 0 for no validation, 1 for validation on CPU, or 2 " - "for validation on GPU. Default is 2, GPU validation.") - .insert("log", - "false", - "Whether output kernel instance information or not. Possible values are true or " - "false. Default is false") - .insert( - "warmup", "50", "The number of iterations before benchmark the kernel. Default is 50.") - .insert( - "repeat", "100", "The number of iterations to benchmark the kernel. Default is 100.") - .insert("timer", - "true", - "Whether if the timer is gpu timer or not. Possible values are false or true. " - "Default is true.") - .insert("init", - "0", - "The method of tensor initialization. Set to 0 for random, to 1 for linear, or 2 " - "for constant(1). Default is 0, random.") - .insert("flush_cache", - "true", - "To flush cache, possible values are true or false. " - "Default is false.") - .insert("rotating_count", "1000", "number of iterations to rotate the cache. default is 5.") - .insert("metric", - "0", - "Metric with which to measure kernel performance. Set to 0 for latency, 1 for " - "tflops, or 2 for bandwidth. Default is 0, latency.") - .insert("csv_filename", - "", - "The filename of benchmark result. Default is empty (no CSV output).") - .insert("structured_sparsity", - "false", - "Whether use sparsity kernel or not. Possible values are true or false. Default is " - "false") - .insert("json_output", - "false", - "Whether to output results in JSON format only. Possible values are true or false. " - "Default is " - "false"); - - bool result = arg_parser.parse(argc, argv); - return std::make_tuple(result, arg_parser); -} - void benchmark_single(const ck_tile::ArgParser& arg_parser) { // Use DataTypeTraits to get the actual type names from the generated header diff --git a/tile_engine/ops/gemm/gemm_universal/gemm_common.hpp b/tile_engine/ops/gemm/gemm_universal/gemm_universal_common.hpp similarity index 100% rename from tile_engine/ops/gemm/gemm_universal/gemm_common.hpp rename to tile_engine/ops/gemm/gemm_universal/gemm_universal_common.hpp diff --git a/tile_engine/ops/gemm/gemm_universal/gemm_profiler.hpp b/tile_engine/ops/gemm/gemm_universal/gemm_universal_profiler.hpp similarity index 99% rename from tile_engine/ops/gemm/gemm_universal/gemm_profiler.hpp rename to tile_engine/ops/gemm/gemm_universal/gemm_universal_profiler.hpp index 3c6bbc34d3..9b728c52d6 100644 --- a/tile_engine/ops/gemm/gemm_universal/gemm_profiler.hpp +++ b/tile_engine/ops/gemm/gemm_universal/gemm_universal_profiler.hpp @@ -9,7 +9,7 @@ #include "ck_tile/host/device_prop.hpp" #include "ck_tile/ops/gemm.hpp" -#include "gemm_benchmark.hpp" +#include "gemm_universal_benchmark.hpp" class GemmProfiler {