mirror of
https://github.com/ROCm/composable_kernel.git
synced 2026-03-28 11:07:39 +00:00
625 lines
21 KiB
Python
625 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
import argparse
|
|
import csv
|
|
import time
|
|
from pathlib import Path
|
|
from typing import List, Dict, Tuple, Optional
|
|
|
|
|
|
class PoolBenchmark:
|
|
def __init__(self, build_dir: str, verbose: bool = False):
|
|
self.build_dir = Path(build_dir)
|
|
self.verbose = verbose
|
|
self.results = []
|
|
|
|
def discover_kernels(self) -> List[Path]:
|
|
"""Find all benchmark_pool_* executables in the build directory"""
|
|
bin_dir = self.build_dir / "bin"
|
|
if not bin_dir.exists():
|
|
print(f"Error: Binary directory {bin_dir} does not exist")
|
|
return []
|
|
|
|
kernels = list(bin_dir.glob("benchmark_pool*"))
|
|
if self.verbose:
|
|
print(f"Found {len(kernels)} kernel executables")
|
|
for k in kernels:
|
|
print(f" - {k.name}")
|
|
return kernels
|
|
|
|
def extract_kernel_info(self, kernel_path: Path) -> Dict[str, str]:
|
|
"""Extract comprehensive kernel information from filename"""
|
|
name = kernel_path.stem
|
|
|
|
# Initialize with basic info
|
|
info = {
|
|
"executable": str(kernel_path),
|
|
"name": name,
|
|
"data_type": "unknown",
|
|
"reduce_op": "unknown",
|
|
"pool_dim": 0,
|
|
"output_index": False,
|
|
"propagate_nan": False,
|
|
}
|
|
|
|
# Parse the kernel name pattern:
|
|
# benchmark_pool3d_fp16_max_True_False_128x1_1x1_2x1
|
|
parts = name.split("_")
|
|
|
|
if len(parts) >= 3:
|
|
# Extract pool dimension (e.g., pool3d -> 3)
|
|
if "pool2d" in parts[1]:
|
|
info["pool_dim"] = 2
|
|
elif "pool3d" in parts[1]:
|
|
info["pool_dim"] = 3
|
|
|
|
# Extract data type
|
|
info["data_type"] = parts[2] if len(parts) > 2 else "unknown"
|
|
|
|
# Extract reduce op
|
|
info["reduce_op"] = parts[3] if len(parts) > 3 else "unknown"
|
|
|
|
# Extract flags
|
|
if len(parts) > 4:
|
|
info["output_index"] = parts[4] == "True"
|
|
if len(parts) > 5:
|
|
info["propagate_nan"] = parts[5] == "True"
|
|
|
|
# Extract block configuration
|
|
config_info = self.parse_block_config(name)
|
|
info.update(config_info)
|
|
|
|
# Generate config ID
|
|
info["config_id"] = self.generate_config_id(info)
|
|
|
|
return info
|
|
|
|
def parse_block_config(self, kernel_name: str) -> Dict:
|
|
"""Parse block configuration from kernel name"""
|
|
config = {
|
|
"block_sizes": {"block_m": 0, "block_n": 0},
|
|
"warp_config": {"warp_m": 0, "warp_n": 0},
|
|
"thread_tile": {"thread_tile_m": 0, "thread_tile_n": 0},
|
|
}
|
|
|
|
parts = kernel_name.split("_")
|
|
|
|
# Look for dimension patterns (e.g., 128x1)
|
|
dimension_groups = []
|
|
for part in parts:
|
|
if "x" in part and len(part.split("x")) == 2:
|
|
try:
|
|
dims = [int(x) for x in part.split("x")]
|
|
if all(d >= 0 for d in dims):
|
|
dimension_groups.append(dims)
|
|
except ValueError:
|
|
continue
|
|
|
|
# Assign dimensions based on order
|
|
if len(dimension_groups) >= 3:
|
|
config["block_sizes"]["block_m"] = dimension_groups[0][0]
|
|
config["block_sizes"]["block_n"] = dimension_groups[0][1]
|
|
config["warp_config"]["warp_m"] = dimension_groups[1][0]
|
|
config["warp_config"]["warp_n"] = dimension_groups[1][1]
|
|
config["thread_tile"]["thread_tile_m"] = dimension_groups[2][0]
|
|
config["thread_tile"]["thread_tile_n"] = dimension_groups[2][1]
|
|
elif len(dimension_groups) == 2:
|
|
config["block_sizes"]["block_m"] = dimension_groups[0][0]
|
|
config["block_sizes"]["block_n"] = dimension_groups[0][1]
|
|
config["warp_config"]["warp_m"] = dimension_groups[1][0]
|
|
config["warp_config"]["warp_n"] = dimension_groups[1][1]
|
|
elif len(dimension_groups) == 1:
|
|
config["block_sizes"]["block_m"] = dimension_groups[0][0]
|
|
config["block_sizes"]["block_n"] = dimension_groups[0][1]
|
|
|
|
return config
|
|
|
|
def generate_config_id(self, info: Dict) -> str:
|
|
"""Generate a compact config ID from kernel info"""
|
|
parts = [
|
|
f"pool{info.get('pool_dim', 0)}d",
|
|
info.get("data_type", "unk"),
|
|
info.get("reduce_op", "unk"),
|
|
]
|
|
|
|
block_sizes = info.get("block_sizes", {})
|
|
if block_sizes.get("block_m", 0) > 0:
|
|
block_str = f"{block_sizes['block_m']}x{block_sizes['block_n']}"
|
|
parts.append(block_str)
|
|
|
|
return "_".join(parts)
|
|
|
|
def run_kernel(self, kernel_path: Path, params: Dict[str, str]) -> Optional[Dict]:
|
|
"""Run a single kernel with given parameters"""
|
|
results_dir = self.build_dir / "results"
|
|
results_dir.mkdir(exist_ok=True)
|
|
|
|
json_file = results_dir / f"{kernel_path.stem}.json"
|
|
|
|
cmd = [str(kernel_path)]
|
|
|
|
for key, value in params.items():
|
|
cmd.append(f"-{key}={value}")
|
|
|
|
cmd.append("-json_output=true")
|
|
|
|
if self.verbose:
|
|
print(f"Running: {' '.join(cmd)}")
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
|
|
if result.returncode != 0:
|
|
print(f"Error running {kernel_path.name}: {result.stderr}")
|
|
return None
|
|
|
|
output = result.stdout.strip()
|
|
if output:
|
|
with open(json_file, "w") as f:
|
|
f.write(output)
|
|
|
|
return self.parse_json_file(json_file)
|
|
else:
|
|
print(f"No output from {kernel_path.name}")
|
|
return None
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f"Timeout running {kernel_path.name}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error running {kernel_path.name}: {e}")
|
|
return None
|
|
|
|
def parse_json_file(self, json_file: Path) -> Optional[Dict]:
|
|
"""Parse JSON data from individual kernel output file"""
|
|
try:
|
|
with open(json_file, "r") as f:
|
|
content = f.read().strip()
|
|
|
|
data = json.loads(content)
|
|
|
|
result = data.copy()
|
|
if "perf_result" in data:
|
|
perf = data["perf_result"]
|
|
result["time_ms"] = perf.get("latency(ms)", 0)
|
|
result["tflops"] = perf.get("tflops(TFlops)", 0)
|
|
result["bandwidth_gb_s"] = perf.get("bandwidth(GB/s)", 0)
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
if self.verbose:
|
|
print(f"Failed to parse JSON from {json_file}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f"Error reading JSON file {json_file}: {e}")
|
|
return None
|
|
|
|
def benchmark_problem_size(
|
|
self,
|
|
kernels: List[Path],
|
|
N: int,
|
|
D: int,
|
|
H: int,
|
|
W: int,
|
|
C: int,
|
|
window_z: int = 2,
|
|
window_y: int = 2,
|
|
window_x: int = 2,
|
|
stride_z: int = 2,
|
|
stride_y: int = 2,
|
|
stride_x: int = 2,
|
|
pool_dim: int = 3,
|
|
verify: int = 0,
|
|
warmup: int = 20,
|
|
repeat: int = 100,
|
|
flush_cache: bool = True,
|
|
rotating_count: int = 1000,
|
|
) -> List[Dict]:
|
|
"""Benchmark all kernels for a specific problem size"""
|
|
results = []
|
|
|
|
params = {
|
|
"N": N,
|
|
"D": D,
|
|
"H": H,
|
|
"W": W,
|
|
"C": C,
|
|
"Z": window_z,
|
|
"Y": window_y,
|
|
"X": window_x,
|
|
"Sz": stride_z,
|
|
"Sy": stride_y,
|
|
"Sx": stride_x,
|
|
"pool_dim": pool_dim,
|
|
"verify": verify,
|
|
"warmup": warmup,
|
|
"repeat": repeat,
|
|
"flush_cache": str(flush_cache).lower(),
|
|
"rotating_count": rotating_count,
|
|
}
|
|
|
|
print(f"\nBenchmarking N={N}, D={D}, H={H}, W={W}, C={C}")
|
|
print(
|
|
f" Window: {window_z}x{window_y}x{window_x}, Stride: {stride_z}x{stride_y}x{stride_x}"
|
|
)
|
|
|
|
for kernel_path in kernels:
|
|
kernel_info = self.extract_kernel_info(kernel_path)
|
|
result = self.run_kernel(kernel_path, params)
|
|
|
|
if result:
|
|
structured_result = {
|
|
"name": kernel_info["name"],
|
|
"config_id": kernel_info["config_id"],
|
|
"problem": result.get("problem", {}),
|
|
"perf_result": result.get("perf_result", {}),
|
|
"config": {
|
|
"data_type": kernel_info["data_type"],
|
|
"reduce_op": kernel_info["reduce_op"],
|
|
"pool_dim": kernel_info["pool_dim"],
|
|
"output_index": kernel_info["output_index"],
|
|
"propagate_nan": kernel_info["propagate_nan"],
|
|
"block_sizes": kernel_info.get("block_sizes", {}),
|
|
"warp_config": kernel_info.get("warp_config", {}),
|
|
"thread_tile": kernel_info.get("thread_tile", {}),
|
|
},
|
|
"executable": kernel_info["executable"],
|
|
"time_ms": result.get("time_ms", 0),
|
|
"tflops": result.get("tflops", 0),
|
|
"bandwidth_gb_s": result.get("bandwidth_gb_s", 0),
|
|
}
|
|
|
|
results.append(structured_result)
|
|
|
|
if self.verbose:
|
|
print(
|
|
f" {kernel_info['config_id']}: {structured_result['bandwidth_gb_s']:.2f} GB/s, {structured_result['time_ms']:.2f}ms"
|
|
)
|
|
|
|
return results
|
|
|
|
def find_best_kernel(
|
|
self, results: List[Dict], metric: str = "bandwidth_gb_s"
|
|
) -> Optional[Dict]:
|
|
"""Find the best performing kernel based on metric"""
|
|
if not results:
|
|
return None
|
|
|
|
if metric == "bandwidth_gb_s":
|
|
return max(results, key=lambda x: x.get("bandwidth_gb_s", 0))
|
|
elif metric == "time_ms":
|
|
return min(results, key=lambda x: x.get("time_ms", float("inf")))
|
|
elif metric == "tflops":
|
|
return max(results, key=lambda x: x.get("tflops", 0))
|
|
else:
|
|
raise ValueError(f"Unknown metric: {metric}")
|
|
|
|
def benchmark_sweep(
|
|
self,
|
|
problem_sizes: List[Tuple[int, int, int, int, int]], # N, D, H, W, C
|
|
window_sizes: List[Tuple[int, int, int]] = [(2, 2, 2)],
|
|
stride_sizes: List[Tuple[int, int, int]] = [(2, 2, 2)],
|
|
pool_dim: int = 3,
|
|
verify: bool = False,
|
|
warmup: int = 20,
|
|
repeat: int = 100,
|
|
flush_cache: bool = True,
|
|
rotating_count: int = 1000,
|
|
) -> Dict:
|
|
"""Run comprehensive benchmark sweep"""
|
|
kernels = self.discover_kernels()
|
|
if not kernels:
|
|
print("No kernels found!")
|
|
return {}
|
|
|
|
all_results = []
|
|
best_kernels = {}
|
|
|
|
for N, D, H, W, C in problem_sizes:
|
|
for wz, wy, wx in window_sizes:
|
|
for sz, sy, sx in stride_sizes:
|
|
results = self.benchmark_problem_size(
|
|
kernels,
|
|
N,
|
|
D,
|
|
H,
|
|
W,
|
|
C,
|
|
window_z=wz,
|
|
window_y=wy,
|
|
window_x=wx,
|
|
stride_z=sz,
|
|
stride_y=sy,
|
|
stride_x=sx,
|
|
pool_dim=pool_dim,
|
|
verify=1 if verify else 0,
|
|
warmup=warmup,
|
|
repeat=repeat,
|
|
flush_cache=flush_cache,
|
|
rotating_count=rotating_count,
|
|
)
|
|
|
|
all_results.extend(results)
|
|
|
|
best = self.find_best_kernel(results)
|
|
if best:
|
|
key = (
|
|
f"N{N}_D{D}_H{H}_W{W}_C{C}_w{wz}x{wy}x{wx}_s{sz}x{sy}x{sx}"
|
|
)
|
|
best_kernels[key] = best
|
|
print(
|
|
f"Best for {key}: {best['name']} ({best['bandwidth_gb_s']:.2f} GB/s, {best['time_ms']:.2f}ms)"
|
|
)
|
|
|
|
self.results = all_results
|
|
return best_kernels
|
|
|
|
def export_csv(self, filename: str):
|
|
"""Export all results to CSV"""
|
|
if not self.results:
|
|
print("No results to export")
|
|
return
|
|
|
|
all_keys = set()
|
|
for result in self.results:
|
|
all_keys.update(result.keys())
|
|
|
|
fieldnames = sorted(all_keys)
|
|
|
|
with open(filename, "w", newline="") as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(self.results)
|
|
|
|
print(f"Results exported to {filename}")
|
|
|
|
def export_best_kernels(self, best_kernels: Dict, filename: str):
|
|
"""Export best kernel selections to file"""
|
|
with open(filename, "w") as f:
|
|
f.write("# Best kernel selections for pooling\n")
|
|
f.write("# Format: problem_size -> kernel_name (bandwidth, latency)\n\n")
|
|
|
|
for key, kernel in sorted(best_kernels.items()):
|
|
f.write(
|
|
f"{key}: {kernel['name']} ({kernel['bandwidth_gb_s']:.2f} GB/s, {kernel['time_ms']:.2f}ms)\n"
|
|
)
|
|
|
|
print(f"Best kernels exported to {filename}")
|
|
|
|
def export_json(self, filename: str, best_kernels: Dict = None):
|
|
"""Export all results and best kernels to JSON"""
|
|
from datetime import datetime
|
|
|
|
successful_results = [r for r in self.results if r.get("bandwidth_gb_s", 0) > 0]
|
|
|
|
bandwidth_values = [r.get("bandwidth_gb_s", 0) for r in successful_results]
|
|
latency_values = [
|
|
r.get("time_ms", 0) for r in successful_results if r.get("time_ms", 0) > 0
|
|
]
|
|
|
|
# Performance breakdown by kernel type
|
|
reduce_op_stats = {}
|
|
data_type_stats = {}
|
|
|
|
for result in successful_results:
|
|
config = result.get("config", {})
|
|
|
|
reduce_op = config.get("reduce_op", "unknown")
|
|
if reduce_op not in reduce_op_stats:
|
|
reduce_op_stats[reduce_op] = {
|
|
"count": 0,
|
|
"avg_bandwidth": 0,
|
|
"best_bandwidth": 0,
|
|
}
|
|
reduce_op_stats[reduce_op]["count"] += 1
|
|
reduce_op_stats[reduce_op]["best_bandwidth"] = max(
|
|
reduce_op_stats[reduce_op]["best_bandwidth"],
|
|
result.get("bandwidth_gb_s", 0),
|
|
)
|
|
|
|
data_type = config.get("data_type", "unknown")
|
|
if data_type not in data_type_stats:
|
|
data_type_stats[data_type] = {
|
|
"count": 0,
|
|
"avg_bandwidth": 0,
|
|
"best_bandwidth": 0,
|
|
}
|
|
data_type_stats[data_type]["count"] += 1
|
|
data_type_stats[data_type]["best_bandwidth"] = max(
|
|
data_type_stats[data_type]["best_bandwidth"],
|
|
result.get("bandwidth_gb_s", 0),
|
|
)
|
|
|
|
output_data = {
|
|
"benchmark_metadata": {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total_kernels_tested": len(self.results),
|
|
"unique_kernels": len(
|
|
set(r.get("name", "unknown") for r in self.results)
|
|
),
|
|
"successful_runs": len(successful_results),
|
|
"failed_runs": len(self.results) - len(successful_results),
|
|
},
|
|
"performance_summary": {
|
|
"bandwidth_stats": {
|
|
"best_gb_s": max(bandwidth_values, default=0),
|
|
"average_gb_s": sum(bandwidth_values) / len(bandwidth_values)
|
|
if bandwidth_values
|
|
else 0,
|
|
"min_gb_s": min(bandwidth_values, default=0),
|
|
},
|
|
"latency_stats": {
|
|
"best_ms": min(latency_values, default=0),
|
|
"average_ms": sum(latency_values) / len(latency_values)
|
|
if latency_values
|
|
else 0,
|
|
"max_ms": max(latency_values, default=0),
|
|
},
|
|
"kernel_type_breakdown": {
|
|
"by_reduce_op": reduce_op_stats,
|
|
"by_data_type": data_type_stats,
|
|
},
|
|
"total_problem_configurations": len(best_kernels)
|
|
if best_kernels
|
|
else 0,
|
|
},
|
|
"kernel_results": self.results,
|
|
"best_kernels_by_problem": best_kernels or {},
|
|
}
|
|
|
|
with open(filename, "w") as f:
|
|
json.dump(output_data, f, indent=2)
|
|
|
|
print(f"JSON results exported to {filename}")
|
|
print(f" - Total kernels: {len(self.results)}")
|
|
print(f" - Successful runs: {len(successful_results)}")
|
|
print(f" - Best bandwidth: {max(bandwidth_values, default=0):.2f} GB/s")
|
|
print(f" - Best latency: {min(latency_values, default=0):.2f}ms")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Pool Kernel Benchmarking Tool")
|
|
parser.add_argument(
|
|
"build_dir", help="Build directory containing kernel executables"
|
|
)
|
|
parser.add_argument(
|
|
"--problem-sizes",
|
|
nargs="+",
|
|
default=["2,30,30,30,32", "4,64,64,64,64", "8,128,128,128,128"],
|
|
help="Problem sizes as N,D,H,W,C tuples",
|
|
)
|
|
parser.add_argument(
|
|
"--window-sizes",
|
|
nargs="+",
|
|
default=["2,2,2", "3,3,3"],
|
|
help="Window sizes as Z,Y,X tuples",
|
|
)
|
|
parser.add_argument(
|
|
"--stride-sizes",
|
|
nargs="+",
|
|
default=["2,2,2"],
|
|
help="Stride sizes as Z,Y,X tuples",
|
|
)
|
|
parser.add_argument(
|
|
"--pool-dim", type=int, default=3, help="Pooling dimension (2 or 3)"
|
|
)
|
|
parser.add_argument("--verify", action="store_true", help="Enable verification")
|
|
parser.add_argument(
|
|
"--csv", default="pool_benchmark_results.csv", help="CSV output filename"
|
|
)
|
|
parser.add_argument(
|
|
"--best", default="best_pool_kernels.txt", help="Best kernels output filename"
|
|
)
|
|
parser.add_argument("--verbose", action="store_true", help="Verbose output")
|
|
parser.add_argument(
|
|
"--warmup",
|
|
type=int,
|
|
default=20,
|
|
help="Number of warmup iterations (default: 20)",
|
|
)
|
|
parser.add_argument(
|
|
"--repeat",
|
|
type=int,
|
|
default=100,
|
|
help="Number of benchmark iterations (default: 100)",
|
|
)
|
|
parser.add_argument(
|
|
"--flush-cache",
|
|
action="store_true",
|
|
default=True,
|
|
help="Enable cache flushing (default: True)",
|
|
)
|
|
parser.add_argument(
|
|
"--rotating-count",
|
|
type=int,
|
|
default=1000,
|
|
help="Number of iterations to rotate cache (default: 1000)",
|
|
)
|
|
parser.add_argument("--json", help="JSON output filename (optional)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Parse problem sizes
|
|
problem_sizes = []
|
|
for size_str in args.problem_sizes:
|
|
try:
|
|
parts = list(map(int, size_str.split(",")))
|
|
if len(parts) == 5:
|
|
problem_sizes.append(tuple(parts))
|
|
else:
|
|
print(f"Invalid problem size: {size_str} (expected N,D,H,W,C)")
|
|
return 1
|
|
except ValueError:
|
|
print(f"Invalid problem size: {size_str}")
|
|
return 1
|
|
|
|
# Parse window sizes
|
|
window_sizes = []
|
|
for size_str in args.window_sizes:
|
|
try:
|
|
parts = list(map(int, size_str.split(",")))
|
|
if len(parts) == 3:
|
|
window_sizes.append(tuple(parts))
|
|
else:
|
|
print(f"Invalid window size: {size_str} (expected Z,Y,X)")
|
|
return 1
|
|
except ValueError:
|
|
print(f"Invalid window size: {size_str}")
|
|
return 1
|
|
|
|
# Parse stride sizes
|
|
stride_sizes = []
|
|
for size_str in args.stride_sizes:
|
|
try:
|
|
parts = list(map(int, size_str.split(",")))
|
|
if len(parts) == 3:
|
|
stride_sizes.append(tuple(parts))
|
|
else:
|
|
print(f"Invalid stride size: {size_str} (expected Z,Y,X)")
|
|
return 1
|
|
except ValueError:
|
|
print(f"Invalid stride size: {size_str}")
|
|
return 1
|
|
|
|
# Create benchmark instance
|
|
benchmark = PoolBenchmark(args.build_dir, verbose=args.verbose)
|
|
|
|
# Run benchmark sweep
|
|
print("Starting Pool kernel benchmark sweep...")
|
|
start_time = time.time()
|
|
|
|
best_kernels = benchmark.benchmark_sweep(
|
|
problem_sizes=problem_sizes,
|
|
window_sizes=window_sizes,
|
|
stride_sizes=stride_sizes,
|
|
pool_dim=args.pool_dim,
|
|
verify=args.verify,
|
|
warmup=args.warmup,
|
|
repeat=args.repeat,
|
|
flush_cache=args.flush_cache,
|
|
rotating_count=args.rotating_count,
|
|
)
|
|
|
|
elapsed_time = time.time() - start_time
|
|
print(f"\nBenchmark completed in {elapsed_time:.2f} seconds")
|
|
|
|
# Export results
|
|
benchmark.export_csv(args.csv)
|
|
benchmark.export_best_kernels(best_kernels, args.best)
|
|
|
|
if args.json:
|
|
benchmark.export_json(args.json, best_kernels)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|