Files
nvbench/python/examples/benchmark_result_autotune.py
Oleksandr Pavlyk f885d4cb69 Add python/examples/benchmark_result_autotune.py
This example demonstrates using cuda.bench and cuda.bench.results
to implement simple auto-tuning, demonstrated on selecting of
tile shape hyperparameter for naive stencil kernel implemented
in numba-cuda.
2026-05-12 14:12:24 -05:00

375 lines
12 KiB
Python

# Copyright 2026 NVIDIA Corporation
#
# Licensed under the Apache License, Version 2.0 with the LLVM exception
# (the "License"); you may not use this file except in compliance with
# the License.
#
# You may obtain a copy of the License at
#
# http://llvm.org/foundation/relicensing/LICENSE.txt
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import statistics
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
from cuda.bench.results import BenchmarkResult, BenchmarkResultSummary
from tabulate import tabulate
TILE_SHAPES = ("4x32", "8x16", "16x16", "32x8", "16x8", "8x8")
BENCHMARK_NAME = "stencil_autotune"
MEDIAN_TIE_RELATIVE_TOLERANCE = 0.01
MIN_RECOMMENDED_INTERIOR_PIXELS = 1_000_000
def parse_tile_shape(tile_shape: str) -> tuple[int, int]:
block_x, block_y = tile_shape.split("x", maxsplit=1)
return int(block_x), int(block_y)
def format_duration(seconds: float) -> str:
if seconds >= 1.0:
return f"{seconds:.3f} s"
if seconds >= 1e-3:
return f"{seconds * 1e3:.3f} ms"
if seconds >= 1e-6:
return f"{seconds * 1e6:.3f} us"
return f"{seconds * 1e9:.3f} ns"
def format_optional_duration(seconds: float | None) -> str:
if seconds is None:
return "-"
return format_duration(seconds)
def format_byte_rate(summary: BenchmarkResultSummary | None) -> str:
if summary is None or summary.value is None:
return "-"
bytes_per_second = float(summary.value)
if bytes_per_second >= 1e12:
return f"{bytes_per_second * 1e-12:.3f} TB/s"
if bytes_per_second >= 1e9:
return f"{bytes_per_second * 1e-9:.3f} GB/s"
if bytes_per_second >= 1e6:
return f"{bytes_per_second * 1e-6:.3f} MB/s"
if bytes_per_second >= 1e3:
return f"{bytes_per_second * 1e-3:.3f} KB/s"
return f"{bytes_per_second:.3f} B/s"
def state_tile_shape(state_name: str) -> str:
prefix = "TileShape="
for field in state_name.split():
if field.startswith(prefix):
return field.removeprefix(prefix)
return state_name
def interior_pixel_count(width: int, height: int) -> int:
return max(width - 2, 0) * max(height - 2, 0)
def median_ties_best(row: dict[str, Any], best_median_seconds: float) -> bool:
tolerance = abs(best_median_seconds) * MEDIAN_TIE_RELATIVE_TOLERANCE
return abs(row["median_seconds"] - best_median_seconds) <= tolerance
def summarize_result(result: BenchmarkResult) -> list[dict[str, Any]]:
subbenchmark = result[BENCHMARK_NAME]
medians = subbenchmark.centers(statistics.median)
metadata = result.metadata if isinstance(result.metadata, dict) else {}
rows = []
for state in subbenchmark:
median_seconds = medians[state.name()]
if median_seconds is None:
continue
bandwidth = state.summaries.get("nv/cold/bw/global/bytes_per_second")
mean_summary = state.summaries.get("nv/cold/time/gpu/mean")
mean_seconds = (
None
if mean_summary is None or mean_summary.value is None
else float(mean_summary.value)
)
rows.append(
{
"tile_shape": state_tile_shape(state.name()),
"median_seconds": median_seconds,
"mean_seconds": mean_seconds,
"sample_count": len(state.samples) if state.samples is not None else 0,
"bandwidth": format_byte_rate(bandwidth),
"subprocess_seconds": metadata.get("elapsed_seconds", 0.0),
}
)
return sorted(rows, key=lambda row: row["median_seconds"])
def print_summary(rows: list[dict[str, Any]]) -> None:
if not rows:
raise RuntimeError("No benchmark states with sample data were found.")
total_subprocess_seconds = sum(row["subprocess_seconds"] for row in rows)
print()
print(f"Total benchmark subprocess wall time: {total_subprocess_seconds:.3f} s")
print()
best_median_seconds = rows[0]["median_seconds"]
tied_rows = [row for row in rows if median_ties_best(row, best_median_seconds)]
table = [
[
"*" if row in tied_rows else "",
row["tile_shape"],
format_duration(row["median_seconds"]),
format_optional_duration(row["mean_seconds"]),
row["sample_count"],
row["bandwidth"],
f"{row['subprocess_seconds']:.3f} s",
]
for row in rows
]
print(
tabulate(
table,
headers=[
"Best",
"TileShape",
"Median GPU Time",
"Mean GPU Time",
"Samples",
"GlobalMem BW",
"Subprocess",
],
tablefmt="simple",
disable_numparse=True,
)
)
print()
if len(tied_rows) == 1:
best = tied_rows[0]
print(
"Best tile shape by median isolated GPU time: "
f"{best['tile_shape']} ({format_duration(best['median_seconds'])})"
)
else:
tile_shapes = ", ".join(row["tile_shape"] for row in tied_rows)
print(
"No unique best tile shape by median isolated GPU time: "
f"{len(tied_rows)} states are within "
f"{MEDIAN_TIE_RELATIVE_TOLERANCE:.1%} of "
f"{format_duration(best_median_seconds)} ({tile_shapes})."
)
def run_driver(args: argparse.Namespace, nvbench_args: list[str]) -> int:
with tempfile.TemporaryDirectory(prefix="nvbench-autotune-") as tmp_dir:
rows = []
total = len(TILE_SHAPES)
interior_pixels = interior_pixel_count(args.image_width, args.image_height)
print(
f"Image size: {args.image_width}x{args.image_height} "
f"({interior_pixels} interior stencil points)"
)
print(f"Sampling {total} tile shapes:")
if interior_pixels < MIN_RECOMMENDED_INTERIOR_PIXELS:
print(
"Warning: this problem has only "
f"{interior_pixels} interior stencil points. "
"Small problems are usually dominated by kernel launch overhead, "
"so median timings may tie across tile shapes."
)
for index, tile_shape in enumerate(TILE_SHAPES, start=1):
json_path = Path(tmp_dir) / f"stencil_autotune_{tile_shape}.json"
command = [
sys.executable,
str(Path(__file__).resolve()),
"--run-benchmark",
"--stopping-criterion",
"entropy",
"--tile-shape",
tile_shape,
"--image-width",
str(args.image_width),
"--image-height",
str(args.image_height),
"--jsonbin",
str(json_path),
]
if nvbench_args:
command.extend(["--", *nvbench_args])
print(f"[{index}/{total}] TileShape={tile_shape} ... ", end="", flush=True)
start = time.perf_counter()
completed = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=False,
)
elapsed_seconds = time.perf_counter() - start
if completed.returncode != 0:
print(f"failed after {elapsed_seconds:.3f} s")
print(completed.stdout, end="")
return completed.returncode
metadata = {
"command": command,
"returncode": completed.returncode,
"elapsed_seconds": elapsed_seconds,
"tile_shape": tile_shape,
}
result = BenchmarkResult.from_json(json_path, metadata=metadata)
tile_rows = summarize_result(result)
rows.extend(tile_rows)
if tile_rows:
row = tile_rows[0]
print(
f"done in {elapsed_seconds:.3f} s, "
f"median {format_duration(row['median_seconds'])}, "
f"{row['bandwidth']}"
)
else:
print(f"done in {elapsed_seconds:.3f} s, no samples")
print_summary(sorted(rows, key=lambda row: row["median_seconds"]))
return 0
def run_benchmark(args: argparse.Namespace, nvbench_args: list[str]) -> None:
import cuda.bench as bench
import numpy as np
from numba import cuda
def as_cuda_stream(cs: bench.CudaStream) -> cuda.cudadrv.driver.Stream:
return cuda.external_stream(cs.addressof())
@cuda.jit
def stencil_kernel(inp, out, width, height):
x, y = cuda.grid(2)
if 0 < x < width - 1 and 0 < y < height - 1:
idx = y * width + x
out[idx] = 0.2 * (
inp[idx]
+ inp[idx - 1]
+ inp[idx + 1]
+ inp[idx - width]
+ inp[idx + width]
)
def stencil_autotune(state: bench.State) -> None:
tile_shape = state.get_string("TileShape")
block_x, block_y = parse_tile_shape(tile_shape)
width = args.image_width
height = args.image_height
interior_pixels = (width - 2) * (height - 2)
state.add_element_count(interior_pixels, column_name="Pixels")
state.add_global_memory_reads(
interior_pixels * 5 * np.dtype(np.float32).itemsize
)
state.add_global_memory_writes(interior_pixels * np.dtype(np.float32).itemsize)
host_input = np.ones(width * height, dtype=np.float32)
dev_input = cuda.to_device(host_input)
dev_output = cuda.device_array_like(dev_input)
block_shape = (block_x, block_y)
grid_shape = (
(width + block_x - 1) // block_x,
(height + block_y - 1) // block_y,
)
# Compile the Numba kernel outside NVBench measurement.
stencil_kernel[grid_shape, block_shape](dev_input, dev_output, width, height)
cuda.synchronize()
def launcher(launch: bench.Launch) -> None:
stream = as_cuda_stream(launch.get_stream())
stencil_kernel[grid_shape, block_shape, stream, 0](
dev_input,
dev_output,
width,
height,
)
state.exec(launcher)
benchmark = bench.register(stencil_autotune)
benchmark.set_name(BENCHMARK_NAME)
tile_shapes = [args.tile_shape] if args.tile_shape is not None else TILE_SHAPES
benchmark.add_string_axis("TileShape", tile_shapes)
bench.run_all_benchmarks([sys.argv[0], *nvbench_args])
def parse_args(argv: list[str] | None = None) -> tuple[argparse.Namespace, list[str]]:
parser = argparse.ArgumentParser(
description=(
"Autotune a simple stencil benchmark and select the best state "
"from NVBench JSON-bin output."
),
epilog=(
"Additional NVBench options may be passed after '--'. "
"For example: benchmark_result_autotune.py -- --timeout 30"
),
)
parser.add_argument(
"--run-benchmark",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--tile-shape",
choices=TILE_SHAPES,
default=None,
help=argparse.SUPPRESS,
)
parser.add_argument(
"--image-width",
type=int,
default=4096,
help="Stencil input width used by the subprocess benchmark.",
)
parser.add_argument(
"--image-height",
type=int,
default=4096,
help="Stencil input height used by the subprocess benchmark.",
)
args, nvbench_args = parser.parse_known_args(argv)
if args.image_width < 3 or args.image_height < 3:
parser.error("--image-width and --image-height must both be at least 3")
nvbench_args = [arg for arg in nvbench_args if arg != "--"]
return args, nvbench_args
def main(argv: list[str] | None = None) -> int:
args, nvbench_args = parse_args(argv)
if args.run_benchmark:
run_benchmark(args, nvbench_args)
return 0
return run_driver(args, nvbench_args)
if __name__ == "__main__":
sys.exit(main())