Files
composable_kernel/tile_engine/ops/pooling/pool_benchmark.py

625 lines
21 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
import sys
import json
import subprocess
import argparse
import csv
import time
from pathlib import Path
from typing import List, Dict, Tuple, Optional
class PoolBenchmark:
def __init__(self, build_dir: str, verbose: bool = False):
self.build_dir = Path(build_dir)
self.verbose = verbose
self.results = []
def discover_kernels(self) -> List[Path]:
"""Find all benchmark_pool_* executables in the build directory"""
bin_dir = self.build_dir / "bin"
if not bin_dir.exists():
print(f"Error: Binary directory {bin_dir} does not exist")
return []
kernels = list(bin_dir.glob("benchmark_pool*"))
if self.verbose:
print(f"Found {len(kernels)} kernel executables")
for k in kernels:
print(f" - {k.name}")
return kernels
def extract_kernel_info(self, kernel_path: Path) -> Dict[str, str]:
"""Extract comprehensive kernel information from filename"""
name = kernel_path.stem
# Initialize with basic info
info = {
"executable": str(kernel_path),
"name": name,
"data_type": "unknown",
"reduce_op": "unknown",
"pool_dim": 0,
"output_index": False,
"propagate_nan": False,
}
# Parse the kernel name pattern:
# benchmark_pool3d_fp16_max_True_False_128x1_1x1_2x1
parts = name.split("_")
if len(parts) >= 3:
# Extract pool dimension (e.g., pool3d -> 3)
if "pool2d" in parts[1]:
info["pool_dim"] = 2
elif "pool3d" in parts[1]:
info["pool_dim"] = 3
# Extract data type
info["data_type"] = parts[2] if len(parts) > 2 else "unknown"
# Extract reduce op
info["reduce_op"] = parts[3] if len(parts) > 3 else "unknown"
# Extract flags
if len(parts) > 4:
info["output_index"] = parts[4] == "True"
if len(parts) > 5:
info["propagate_nan"] = parts[5] == "True"
# Extract block configuration
config_info = self.parse_block_config(name)
info.update(config_info)
# Generate config ID
info["config_id"] = self.generate_config_id(info)
return info
def parse_block_config(self, kernel_name: str) -> Dict:
"""Parse block configuration from kernel name"""
config = {
"block_sizes": {"block_m": 0, "block_n": 0},
"warp_config": {"warp_m": 0, "warp_n": 0},
"thread_tile": {"thread_tile_m": 0, "thread_tile_n": 0},
}
parts = kernel_name.split("_")
# Look for dimension patterns (e.g., 128x1)
dimension_groups = []
for part in parts:
if "x" in part and len(part.split("x")) == 2:
try:
dims = [int(x) for x in part.split("x")]
if all(d >= 0 for d in dims):
dimension_groups.append(dims)
except ValueError:
continue
# Assign dimensions based on order
if len(dimension_groups) >= 3:
config["block_sizes"]["block_m"] = dimension_groups[0][0]
config["block_sizes"]["block_n"] = dimension_groups[0][1]
config["warp_config"]["warp_m"] = dimension_groups[1][0]
config["warp_config"]["warp_n"] = dimension_groups[1][1]
config["thread_tile"]["thread_tile_m"] = dimension_groups[2][0]
config["thread_tile"]["thread_tile_n"] = dimension_groups[2][1]
elif len(dimension_groups) == 2:
config["block_sizes"]["block_m"] = dimension_groups[0][0]
config["block_sizes"]["block_n"] = dimension_groups[0][1]
config["warp_config"]["warp_m"] = dimension_groups[1][0]
config["warp_config"]["warp_n"] = dimension_groups[1][1]
elif len(dimension_groups) == 1:
config["block_sizes"]["block_m"] = dimension_groups[0][0]
config["block_sizes"]["block_n"] = dimension_groups[0][1]
return config
def generate_config_id(self, info: Dict) -> str:
"""Generate a compact config ID from kernel info"""
parts = [
f"pool{info.get('pool_dim', 0)}d",
info.get("data_type", "unk"),
info.get("reduce_op", "unk"),
]
block_sizes = info.get("block_sizes", {})
if block_sizes.get("block_m", 0) > 0:
block_str = f"{block_sizes['block_m']}x{block_sizes['block_n']}"
parts.append(block_str)
return "_".join(parts)
def run_kernel(self, kernel_path: Path, params: Dict[str, str]) -> Optional[Dict]:
"""Run a single kernel with given parameters"""
results_dir = self.build_dir / "results"
results_dir.mkdir(exist_ok=True)
json_file = results_dir / f"{kernel_path.stem}.json"
cmd = [str(kernel_path)]
for key, value in params.items():
cmd.append(f"-{key}={value}")
cmd.append("-json_output=true")
if self.verbose:
print(f"Running: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
print(f"Error running {kernel_path.name}: {result.stderr}")
return None
output = result.stdout.strip()
if output:
with open(json_file, "w") as f:
f.write(output)
return self.parse_json_file(json_file)
else:
print(f"No output from {kernel_path.name}")
return None
except subprocess.TimeoutExpired:
print(f"Timeout running {kernel_path.name}")
return None
except Exception as e:
print(f"Error running {kernel_path.name}: {e}")
return None
def parse_json_file(self, json_file: Path) -> Optional[Dict]:
"""Parse JSON data from individual kernel output file"""
try:
with open(json_file, "r") as f:
content = f.read().strip()
data = json.loads(content)
result = data.copy()
if "perf_result" in data:
perf = data["perf_result"]
result["time_ms"] = perf.get("latency(ms)", 0)
result["tflops"] = perf.get("tflops(TFlops)", 0)
result["bandwidth_gb_s"] = perf.get("bandwidth(GB/s)", 0)
return result
except json.JSONDecodeError as e:
if self.verbose:
print(f"Failed to parse JSON from {json_file}: {e}")
return None
except Exception as e:
if self.verbose:
print(f"Error reading JSON file {json_file}: {e}")
return None
def benchmark_problem_size(
self,
kernels: List[Path],
N: int,
D: int,
H: int,
W: int,
C: int,
window_z: int = 2,
window_y: int = 2,
window_x: int = 2,
stride_z: int = 2,
stride_y: int = 2,
stride_x: int = 2,
pool_dim: int = 3,
verify: int = 0,
warmup: int = 20,
repeat: int = 100,
flush_cache: bool = True,
rotating_count: int = 1000,
) -> List[Dict]:
"""Benchmark all kernels for a specific problem size"""
results = []
params = {
"N": N,
"D": D,
"H": H,
"W": W,
"C": C,
"Z": window_z,
"Y": window_y,
"X": window_x,
"Sz": stride_z,
"Sy": stride_y,
"Sx": stride_x,
"pool_dim": pool_dim,
"verify": verify,
"warmup": warmup,
"repeat": repeat,
"flush_cache": str(flush_cache).lower(),
"rotating_count": rotating_count,
}
print(f"\nBenchmarking N={N}, D={D}, H={H}, W={W}, C={C}")
print(
f" Window: {window_z}x{window_y}x{window_x}, Stride: {stride_z}x{stride_y}x{stride_x}"
)
for kernel_path in kernels:
kernel_info = self.extract_kernel_info(kernel_path)
result = self.run_kernel(kernel_path, params)
if result:
structured_result = {
"name": kernel_info["name"],
"config_id": kernel_info["config_id"],
"problem": result.get("problem", {}),
"perf_result": result.get("perf_result", {}),
"config": {
"data_type": kernel_info["data_type"],
"reduce_op": kernel_info["reduce_op"],
"pool_dim": kernel_info["pool_dim"],
"output_index": kernel_info["output_index"],
"propagate_nan": kernel_info["propagate_nan"],
"block_sizes": kernel_info.get("block_sizes", {}),
"warp_config": kernel_info.get("warp_config", {}),
"thread_tile": kernel_info.get("thread_tile", {}),
},
"executable": kernel_info["executable"],
"time_ms": result.get("time_ms", 0),
"tflops": result.get("tflops", 0),
"bandwidth_gb_s": result.get("bandwidth_gb_s", 0),
}
results.append(structured_result)
if self.verbose:
print(
f" {kernel_info['config_id']}: {structured_result['bandwidth_gb_s']:.2f} GB/s, {structured_result['time_ms']:.2f}ms"
)
return results
def find_best_kernel(
self, results: List[Dict], metric: str = "bandwidth_gb_s"
) -> Optional[Dict]:
"""Find the best performing kernel based on metric"""
if not results:
return None
if metric == "bandwidth_gb_s":
return max(results, key=lambda x: x.get("bandwidth_gb_s", 0))
elif metric == "time_ms":
return min(results, key=lambda x: x.get("time_ms", float("inf")))
elif metric == "tflops":
return max(results, key=lambda x: x.get("tflops", 0))
else:
raise ValueError(f"Unknown metric: {metric}")
def benchmark_sweep(
self,
problem_sizes: List[Tuple[int, int, int, int, int]], # N, D, H, W, C
window_sizes: List[Tuple[int, int, int]] = [(2, 2, 2)],
stride_sizes: List[Tuple[int, int, int]] = [(2, 2, 2)],
pool_dim: int = 3,
verify: bool = False,
warmup: int = 20,
repeat: int = 100,
flush_cache: bool = True,
rotating_count: int = 1000,
) -> Dict:
"""Run comprehensive benchmark sweep"""
kernels = self.discover_kernels()
if not kernels:
print("No kernels found!")
return {}
all_results = []
best_kernels = {}
for N, D, H, W, C in problem_sizes:
for wz, wy, wx in window_sizes:
for sz, sy, sx in stride_sizes:
results = self.benchmark_problem_size(
kernels,
N,
D,
H,
W,
C,
window_z=wz,
window_y=wy,
window_x=wx,
stride_z=sz,
stride_y=sy,
stride_x=sx,
pool_dim=pool_dim,
verify=1 if verify else 0,
warmup=warmup,
repeat=repeat,
flush_cache=flush_cache,
rotating_count=rotating_count,
)
all_results.extend(results)
best = self.find_best_kernel(results)
if best:
key = (
f"N{N}_D{D}_H{H}_W{W}_C{C}_w{wz}x{wy}x{wx}_s{sz}x{sy}x{sx}"
)
best_kernels[key] = best
print(
f"Best for {key}: {best['name']} ({best['bandwidth_gb_s']:.2f} GB/s, {best['time_ms']:.2f}ms)"
)
self.results = all_results
return best_kernels
def export_csv(self, filename: str):
"""Export all results to CSV"""
if not self.results:
print("No results to export")
return
all_keys = set()
for result in self.results:
all_keys.update(result.keys())
fieldnames = sorted(all_keys)
with open(filename, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.results)
print(f"Results exported to {filename}")
def export_best_kernels(self, best_kernels: Dict, filename: str):
"""Export best kernel selections to file"""
with open(filename, "w") as f:
f.write("# Best kernel selections for pooling\n")
f.write("# Format: problem_size -> kernel_name (bandwidth, latency)\n\n")
for key, kernel in sorted(best_kernels.items()):
f.write(
f"{key}: {kernel['name']} ({kernel['bandwidth_gb_s']:.2f} GB/s, {kernel['time_ms']:.2f}ms)\n"
)
print(f"Best kernels exported to {filename}")
def export_json(self, filename: str, best_kernels: Dict = None):
"""Export all results and best kernels to JSON"""
from datetime import datetime
successful_results = [r for r in self.results if r.get("bandwidth_gb_s", 0) > 0]
bandwidth_values = [r.get("bandwidth_gb_s", 0) for r in successful_results]
latency_values = [
r.get("time_ms", 0) for r in successful_results if r.get("time_ms", 0) > 0
]
# Performance breakdown by kernel type
reduce_op_stats = {}
data_type_stats = {}
for result in successful_results:
config = result.get("config", {})
reduce_op = config.get("reduce_op", "unknown")
if reduce_op not in reduce_op_stats:
reduce_op_stats[reduce_op] = {
"count": 0,
"avg_bandwidth": 0,
"best_bandwidth": 0,
}
reduce_op_stats[reduce_op]["count"] += 1
reduce_op_stats[reduce_op]["best_bandwidth"] = max(
reduce_op_stats[reduce_op]["best_bandwidth"],
result.get("bandwidth_gb_s", 0),
)
data_type = config.get("data_type", "unknown")
if data_type not in data_type_stats:
data_type_stats[data_type] = {
"count": 0,
"avg_bandwidth": 0,
"best_bandwidth": 0,
}
data_type_stats[data_type]["count"] += 1
data_type_stats[data_type]["best_bandwidth"] = max(
data_type_stats[data_type]["best_bandwidth"],
result.get("bandwidth_gb_s", 0),
)
output_data = {
"benchmark_metadata": {
"timestamp": datetime.now().isoformat(),
"total_kernels_tested": len(self.results),
"unique_kernels": len(
set(r.get("name", "unknown") for r in self.results)
),
"successful_runs": len(successful_results),
"failed_runs": len(self.results) - len(successful_results),
},
"performance_summary": {
"bandwidth_stats": {
"best_gb_s": max(bandwidth_values, default=0),
"average_gb_s": sum(bandwidth_values) / len(bandwidth_values)
if bandwidth_values
else 0,
"min_gb_s": min(bandwidth_values, default=0),
},
"latency_stats": {
"best_ms": min(latency_values, default=0),
"average_ms": sum(latency_values) / len(latency_values)
if latency_values
else 0,
"max_ms": max(latency_values, default=0),
},
"kernel_type_breakdown": {
"by_reduce_op": reduce_op_stats,
"by_data_type": data_type_stats,
},
"total_problem_configurations": len(best_kernels)
if best_kernels
else 0,
},
"kernel_results": self.results,
"best_kernels_by_problem": best_kernels or {},
}
with open(filename, "w") as f:
json.dump(output_data, f, indent=2)
print(f"JSON results exported to {filename}")
print(f" - Total kernels: {len(self.results)}")
print(f" - Successful runs: {len(successful_results)}")
print(f" - Best bandwidth: {max(bandwidth_values, default=0):.2f} GB/s")
print(f" - Best latency: {min(latency_values, default=0):.2f}ms")
def main():
parser = argparse.ArgumentParser(description="Pool Kernel Benchmarking Tool")
parser.add_argument(
"build_dir", help="Build directory containing kernel executables"
)
parser.add_argument(
"--problem-sizes",
nargs="+",
default=["2,30,30,30,32", "4,64,64,64,64", "8,128,128,128,128"],
help="Problem sizes as N,D,H,W,C tuples",
)
parser.add_argument(
"--window-sizes",
nargs="+",
default=["2,2,2", "3,3,3"],
help="Window sizes as Z,Y,X tuples",
)
parser.add_argument(
"--stride-sizes",
nargs="+",
default=["2,2,2"],
help="Stride sizes as Z,Y,X tuples",
)
parser.add_argument(
"--pool-dim", type=int, default=3, help="Pooling dimension (2 or 3)"
)
parser.add_argument("--verify", action="store_true", help="Enable verification")
parser.add_argument(
"--csv", default="pool_benchmark_results.csv", help="CSV output filename"
)
parser.add_argument(
"--best", default="best_pool_kernels.txt", help="Best kernels output filename"
)
parser.add_argument("--verbose", action="store_true", help="Verbose output")
parser.add_argument(
"--warmup",
type=int,
default=20,
help="Number of warmup iterations (default: 20)",
)
parser.add_argument(
"--repeat",
type=int,
default=100,
help="Number of benchmark iterations (default: 100)",
)
parser.add_argument(
"--flush-cache",
action="store_true",
default=True,
help="Enable cache flushing (default: True)",
)
parser.add_argument(
"--rotating-count",
type=int,
default=1000,
help="Number of iterations to rotate cache (default: 1000)",
)
parser.add_argument("--json", help="JSON output filename (optional)")
args = parser.parse_args()
# Parse problem sizes
problem_sizes = []
for size_str in args.problem_sizes:
try:
parts = list(map(int, size_str.split(",")))
if len(parts) == 5:
problem_sizes.append(tuple(parts))
else:
print(f"Invalid problem size: {size_str} (expected N,D,H,W,C)")
return 1
except ValueError:
print(f"Invalid problem size: {size_str}")
return 1
# Parse window sizes
window_sizes = []
for size_str in args.window_sizes:
try:
parts = list(map(int, size_str.split(",")))
if len(parts) == 3:
window_sizes.append(tuple(parts))
else:
print(f"Invalid window size: {size_str} (expected Z,Y,X)")
return 1
except ValueError:
print(f"Invalid window size: {size_str}")
return 1
# Parse stride sizes
stride_sizes = []
for size_str in args.stride_sizes:
try:
parts = list(map(int, size_str.split(",")))
if len(parts) == 3:
stride_sizes.append(tuple(parts))
else:
print(f"Invalid stride size: {size_str} (expected Z,Y,X)")
return 1
except ValueError:
print(f"Invalid stride size: {size_str}")
return 1
# Create benchmark instance
benchmark = PoolBenchmark(args.build_dir, verbose=args.verbose)
# Run benchmark sweep
print("Starting Pool kernel benchmark sweep...")
start_time = time.time()
best_kernels = benchmark.benchmark_sweep(
problem_sizes=problem_sizes,
window_sizes=window_sizes,
stride_sizes=stride_sizes,
pool_dim=args.pool_dim,
verify=args.verify,
warmup=args.warmup,
repeat=args.repeat,
flush_cache=args.flush_cache,
rotating_count=args.rotating_count,
)
elapsed_time = time.time() - start_time
print(f"\nBenchmark completed in {elapsed_time:.2f} seconds")
# Export results
benchmark.export_csv(args.csv)
benchmark.export_best_kernels(best_kernels, args.best)
if args.json:
benchmark.export_json(args.json, best_kernels)
return 0
if __name__ == "__main__":
sys.exit(main())