Provide BenchmarkResult class for parsing JSON output of NVBench-instrumented benchmarks (#356)

Implements `cuda.bench.results.BenchmarkResult` class to represent data from JSON output of benchmark execution.

The contains implements two class methods `BenchmarkResult.from_json(filename : str | os.PathLike, *, metadata : Any = None)` which expects well-formed JSON filename and `BenchmarkResult.empty(*, metadata : Any = None)` intended to represent failed result with reasons that can be recorded in metadata at user's discretion.

The `BenchmarkResult` implements mapping interface, supporting `.keys()`, `.values()`, `.items()` methods, `__len__`, `__contains__`, `__getitem__` and `__iter__` special methods. 

Values in `BenchmarkResult` has type `cuda.bench.results.SubBenchmarkResult` which implements a list-like interface, i.e. implements `__len__`, `__getitem__`, and `__iter__` special methods. Values in this list-like structure correspond to measurements of individual states of a particular benchmark (the key in `BenchmarkResult`).

Elements of `SubBenchmarkResult` structure have type `SubBenchmarkState` that supports mapping protocol with axis_values as a key and represent data corresponding to measurements for a particular state (combination of settings for each axis). 

The state provides `.samples` and `.frequencies` attributes storing raw execution duration values and estimates for average GPU frequencies. 

Example usage:

```
import array, numpy as np, cuda.bench.results

r = cuda.bench.results.BenchmarkResult("perf_data/axes_run1.json")

r["copy_sweep_grid_shape"].centers_with_frequencies(
     lambda t, f: np.median(np.asarray(t)*np.asarray(f)))

```

```
In [1]: import array, numpy as np, cuda.bench.results

In [2]: r = cuda.bench.results.BenchmarkResult("temp_data/axes_run1.json")

In [3]: list(r)
Out[3]:
['simple',
 'single_float64_axis',
 'copy_sweep_grid_shape',
 'copy_type_sweep',
 'copy_type_conversion_sweep',
 'copy_type_and_block_size_sweep']

In [4]: r["simple"].centers(lambda t: np.percentile(t, [25,75]))
Out[4]: {'Device=0': array([0.00100966, 0.00101299])}

In [5]: r.centers(lambda t: np.percentile(t, [25,75]))["simple"]
Out[5]: {'Device=0': array([0.00100966, 0.00101299])}

In [6]: len(r)
Out[6]: 6

In [7]: "fake" in r
Out[7]: False
```

Each `SubBenchmarkState` implements 
`.summaries` attribute - rich object that retains tag/name/hint/hide/description metadata.

* Add nvbench-json-summary to render NVBench JSON output as an NVBench-style
markdown summary table, including axis formatting, device sections, hidden
summary filtering, and summary hint formatting.

Update packaging, type stubs, and tests for the new namespace, renamed
classes, Python 3.10-compatible annotations, and summary-table generation.

* Split tests in test_benchmark_result into smaller tests

* Fix break due to file name change

* Add python/examples/benchmark_result_autotune.py

This example demonstrates using cuda.bench and cuda.bench.results
to implement simple auto-tuning, demonstrated on selecting of
tile shape hyperparameter for naive stencil kernel implemented
in numba-cuda.

* Resolve ruff PLE0604

* Fix for format_axis_value in json format script to handle None value

Add tests to cover such input.

* Address code rabbit review feedback

* Fix license header, add validation

* Addressed both issues raised in review

Malformed values are now represented in result as None.

Skipped benchmarks are no longer dropped, i.e., they are present
in BenchmarkResult data, but they are not reflected in summary
table in line with what NVBench-instrumented benchmarks do.
This commit is contained in:
Oleksandr Pavlyk
2026-05-13 13:23:58 -05:00
committed by GitHub
parent 6df6dc8d89
commit 338936b6fe
12 changed files with 2480 additions and 45 deletions

View File

@@ -29,6 +29,26 @@ except Exception as e:
)
_NVBENCH_EXPORTS = (
"Benchmark",
"CudaStream",
"Launch",
"NVBenchRuntimeError",
"State",
"register",
"run_all_benchmarks",
)
_NVBENCH_TEST_EXPORTS = (
"_test_cpp_exception",
"_test_py_exception",
)
__all__ = list(_NVBENCH_EXPORTS)
_nvbench_module = None
# Detect CUDA runtime version and load appropriate extension
def _get_cuda_major_version():
"""Detect the CUDA runtime major version."""
@@ -47,51 +67,63 @@ def _get_cuda_major_version():
)
_cuda_major = _get_cuda_major_version()
_extra_name = f"cu{_cuda_major}"
_module_fullname = f"cuda.bench.{_extra_name}._nvbench"
def _bind_nvbench_module(module):
for name in _NVBENCH_EXPORTS:
globals()[name] = getattr(module, name)
# Set module of exposed objects
globals()[name].__module__ = __name__
try:
_nvbench_module = importlib.import_module(_module_fullname)
except ImportError as e:
raise ImportError(
f"No cuda-bench extension found for CUDA {_cuda_major}.x. "
f"This wheel may not include support for your CUDA version. "
f"Supported CUDA versions: 12, 13. "
f"Original error: {e}"
for name in _NVBENCH_TEST_EXPORTS:
globals()[name] = getattr(module, name)
# Expose the module as _nvbench for backward compatibility (e.g., for tests)
globals()["_nvbench"] = module
def _load_nvbench_module():
global _nvbench_module
if _nvbench_module is not None:
return _nvbench_module
cuda_major = _get_cuda_major_version()
extra_name = f"cu{cuda_major}"
module_fullname = f"cuda.bench.{extra_name}._nvbench"
try:
module = importlib.import_module(module_fullname)
except ImportError as e:
raise ImportError(
f"No cuda-bench extension found for CUDA {cuda_major}.x. "
f"This wheel may not include support for your CUDA version. "
f"Supported CUDA versions: 12, 13. "
f"Original error: {e}"
) from e
_bind_nvbench_module(module)
_nvbench_module = module
return module
def __getattr__(name):
if name == "_nvbench":
return _load_nvbench_module()
if name in _NVBENCH_EXPORTS + _NVBENCH_TEST_EXPORTS:
_load_nvbench_module()
return globals()[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
def __dir__():
return sorted(
set(globals())
| set(_NVBENCH_EXPORTS)
| set(_NVBENCH_TEST_EXPORTS)
| {"_nvbench"}
)
# Import and expose all public symbols from the CUDA-specific extension
Benchmark = _nvbench_module.Benchmark
CudaStream = _nvbench_module.CudaStream
Launch = _nvbench_module.Launch
NVBenchRuntimeError = _nvbench_module.NVBenchRuntimeError
State = _nvbench_module.State
register = _nvbench_module.register
run_all_benchmarks = _nvbench_module.run_all_benchmarks
_test_cpp_exception = _nvbench_module._test_cpp_exception
_test_py_exception = _nvbench_module._test_py_exception
# Expose the module as _nvbench for backward compatibility (e.g., for tests)
_nvbench = _nvbench_module
# Set module of exposed objects
Benchmark.__module__ = __name__
CudaStream.__module__ = __name__
Launch.__module__ = __name__
NVBenchRuntimeError.__module__ = __name__
State.__module__ = __name__
register.__module__ = __name__
run_all_benchmarks.__module__ = __name__
# Clean up internal symbols
del (
_nvbench_module,
_cuda_major,
_extra_name,
_module_fullname,
_get_cuda_major_version,
)
__doc__ = """
CUDA Kernel Benchmarking Library Python API

View File

@@ -25,8 +25,17 @@
# stubs in generated out/cuda/nvbench/_nvbench.pyi
# with definitions given here.
from collections.abc import Callable, Sequence
from typing import Optional, Self, SupportsFloat, SupportsInt, Union
from collections.abc import (
Callable,
Sequence,
)
from typing import (
Optional,
Self,
SupportsFloat,
SupportsInt,
Union,
)
class CudaStream:
def __cuda_stream__(self) -> tuple[int, int]: ...

View File

@@ -0,0 +1,26 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
"""Utilities for reading NVBench JSON benchmark result files."""
from ._benchmark_result import (
BenchmarkResult,
BenchmarkResultDevice,
BenchmarkResultSummary,
SubBenchmarkResult,
SubBenchmarkState,
)
BenchmarkResult.__module__ = __name__
BenchmarkResultDevice.__module__ = __name__
BenchmarkResultSummary.__module__ = __name__
SubBenchmarkResult.__module__ = __name__
SubBenchmarkState.__module__ = __name__
__all__ = [
"BenchmarkResult",
"BenchmarkResultDevice",
"BenchmarkResultSummary",
"SubBenchmarkResult",
"SubBenchmarkState",
]

View File

@@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
from array import array
from collections.abc import Callable, ItemsView, Iterator, KeysView, ValuesView
from os import PathLike
from typing import Any, TypeVar, overload
ResultT = TypeVar("ResultT")
BenchmarkResultT = TypeVar("BenchmarkResultT", bound="BenchmarkResult")
_SummaryValue = int | float | str | None
class BenchmarkResultDevice:
id: int
name: str
data: dict[str, Any]
class BenchmarkResultSummary:
tag: str
name: str | None
hint: str | None
hide: str | None
description: str | None
data: dict[str, _SummaryValue]
@property
def value(self) -> _SummaryValue | None: ...
def __getitem__(self, key: str) -> _SummaryValue: ...
def get(
self, key: str, default: _SummaryValue | None = None
) -> _SummaryValue | None: ...
class SubBenchmarkState:
state_name: str
device: int | None
type_config_index: int | None
axis_values: list[dict[str, Any]]
is_skipped: bool
skip_reason: str | None
summaries: dict[str, BenchmarkResultSummary]
samples: array | None
frequencies: array | None
bw: float | None
point: dict[str, str]
def name(self) -> str: ...
def center(self, estimator: Callable[[array], ResultT]) -> ResultT | None: ...
def center_with_frequencies(
self, estimator: Callable[[array, array], ResultT]
) -> ResultT | None: ...
class SubBenchmarkResult:
name: str
devices: list[int]
axes: list[dict[str, Any]]
states: list[SubBenchmarkState]
def __len__(self) -> int: ...
@overload
def __getitem__(self, state_index: int) -> SubBenchmarkState: ...
@overload
def __getitem__(self, state_index: slice) -> list[SubBenchmarkState]: ...
def __iter__(self) -> Iterator[SubBenchmarkState]: ...
def centers(
self, estimator: Callable[[array], ResultT]
) -> dict[str, ResultT | None]: ...
def centers_with_frequencies(
self, estimator: Callable[[array, array], ResultT]
) -> dict[str, ResultT | None]: ...
class BenchmarkResult:
metadata: Any
devices: dict[int, BenchmarkResultDevice]
subbenches: dict[str, SubBenchmarkResult]
def __init__(self, token: object | None = None) -> None: ...
@classmethod
def empty(
cls: type[BenchmarkResultT], *, metadata: Any = None
) -> BenchmarkResultT: ...
@classmethod
def from_json(
cls: type[BenchmarkResultT],
json_path: str | PathLike[str],
*,
metadata: Any = None,
) -> BenchmarkResultT: ...
def __len__(self) -> int: ...
def __iter__(self) -> Iterator[str]: ...
def __contains__(self, subbench_name: object) -> bool: ...
def __getitem__(self, subbench_name: str) -> SubBenchmarkResult: ...
def keys(self) -> KeysView[str]: ...
def values(self) -> ValuesView[SubBenchmarkResult]: ...
def items(self) -> ItemsView[str, SubBenchmarkResult]: ...
def centers(
self, estimator: Callable[[array], ResultT]
) -> dict[str, dict[str, ResultT | None]]: ...
def centers_with_frequencies(
self, estimator: Callable[[array, array], ResultT]
) -> dict[str, dict[str, ResultT | None]]: ...

View File

@@ -0,0 +1,511 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
from __future__ import annotations
import array
import json
import os
import sys
from collections.abc import ItemsView, Iterator, KeysView, ValuesView
from dataclasses import dataclass
from typing import Any, Callable, TypeVar
__all__ = [
"BenchmarkResult",
"BenchmarkResultDevice",
"BenchmarkResultSummary",
"SubBenchmarkResult",
"SubBenchmarkState",
]
ResultT = TypeVar("ResultT")
BenchmarkResultT = TypeVar("BenchmarkResultT", bound="BenchmarkResult")
_SummaryValue = int | float | str | None
@dataclass(frozen=True)
class BenchmarkResultDevice:
"""Device metadata parsed from an NVBench JSON result file."""
id: int
name: str
data: dict[str, Any]
def read_json(filename: str | os.PathLike[str]) -> dict:
with open(filename, "r", encoding="utf-8") as f:
file_root = json.load(f)
return file_root
def extract_summary_data_value(summary: dict, name: str, expected_type: str) -> Any:
summary_tag = summary.get("tag", "<unknown>")
for value_data in summary.get("data", []):
if value_data.get("name") != name:
continue
value_type = value_data.get("type")
if value_type != expected_type:
raise ValueError(
f"summary {summary_tag!r} field {name!r} has type "
f"{value_type!r}; expected {expected_type!r}"
)
if "value" not in value_data:
raise ValueError(f"summary {summary_tag!r} field {name!r} is missing value")
return value_data["value"]
raise ValueError(f"summary {summary_tag!r} is missing field {name!r}")
def extract_filename(summary: dict) -> str:
value = extract_summary_data_value(summary, "filename", "string")
if not isinstance(value, str):
raise ValueError(
f"summary {summary.get('tag', '<unknown>')!r} field 'filename' "
"value must be a string"
)
return value
def extract_size(summary: dict) -> int:
value = extract_summary_data_value(summary, "size", "int64")
try:
return int(value)
except (TypeError, ValueError) as e:
raise ValueError(
f"summary {summary.get('tag', '<unknown>')!r} field 'size' "
f"value {value!r} is not an int64"
) from e
def parse_summary_value(
value_data: dict,
*,
summary_tag: str,
field_name: str,
) -> _SummaryValue:
value_type = value_data.get("type")
if "value" not in value_data:
raise ValueError(
f"summary {summary_tag!r} field {field_name!r} is missing value"
)
value = value_data["value"]
if value is None:
return None
if value_type == "int64":
try:
return int(value)
except (TypeError, ValueError) as e:
raise ValueError(
f"summary {summary_tag!r} field {field_name!r} value {value!r} "
"is not an int64"
) from e
if value_type == "float64":
try:
return float(value)
except (TypeError, ValueError) as e:
raise ValueError(
f"summary {summary_tag!r} field {field_name!r} value {value!r} "
"is not a float64"
) from e
if value_type == "string":
if not isinstance(value, str):
raise ValueError(
f"summary {summary_tag!r} field {field_name!r} value {value!r} "
"is not a string"
)
return value
raise ValueError(
f"summary {summary_tag!r} field {field_name!r} has unsupported "
f"value type {value_type!r}"
)
@dataclass(frozen=True)
class BenchmarkResultSummary:
"""Summary record parsed from one NVBench benchmark state."""
tag: str
name: str | None
hint: str | None
hide: str | None
description: str | None
data: dict[str, _SummaryValue]
@property
def value(self) -> _SummaryValue | None:
return self.data.get("value")
def __getitem__(self, key: str) -> _SummaryValue:
return self.data[key]
def get(
self, key: str, default: _SummaryValue | None = None
) -> _SummaryValue | None:
return self.data.get(key, default)
def parse_summary(summary: dict) -> BenchmarkResultSummary:
summary_tag = summary["tag"]
data = {}
for value_data in summary.get("data", []):
field_name = value_data.get("name")
if not isinstance(field_name, str):
raise ValueError(
f"summary {summary_tag!r} has a data entry with a missing "
"or non-string name"
)
data[field_name] = parse_summary_value(
value_data,
summary_tag=summary_tag,
field_name=field_name,
)
return BenchmarkResultSummary(
tag=summary_tag,
name=summary.get("name"),
hint=summary.get("hint"),
hide=summary.get("hide"),
description=summary.get("description"),
data=data,
)
def get_state_summaries(state: dict) -> list[dict]:
return state.get("summaries") or []
def parse_summaries(state: dict) -> dict[str, BenchmarkResultSummary]:
return {
summary["tag"]: parse_summary(summary) for summary in get_state_summaries(state)
}
def parse_binary_meta(state: dict, tag: str) -> tuple[int | None, str | None]:
summaries = get_state_summaries(state)
if not summaries:
return None, None
summary = next(
filter(lambda s: s["tag"] == tag, summaries),
None,
)
if not summary:
return None, None
sample_filename = extract_filename(summary)
sample_count = extract_size(summary)
return sample_count, sample_filename
def parse_samples_meta(state: dict) -> tuple[int | None, str | None]:
return parse_binary_meta(state, "nv/json/bin:nv/cold/sample_times")
def parse_frequencies_meta(state: dict) -> tuple[int | None, str | None]:
return parse_binary_meta(state, "nv/json/freqs-bin:nv/cold/sample_freqs")
def resolve_binary_filename(json_dir: str, binary_filename: str) -> str:
if os.path.isabs(binary_filename):
return binary_filename
json_relative_filename = os.path.join(json_dir, binary_filename)
if os.path.exists(json_relative_filename):
return json_relative_filename
parent_relative_filename = os.path.join(os.path.dirname(json_dir), binary_filename)
if os.path.exists(parent_relative_filename):
return parent_relative_filename
if os.path.exists(binary_filename):
return binary_filename
return json_relative_filename
def parse_float32_binary(
count: int | None, filename: str | None, json_dir: str
) -> array.array | None:
if count is None or filename is None:
return None
values = array.array("f")
if values.itemsize != 4:
raise RuntimeError("array('f') is not a 32-bit float on this platform")
filename = resolve_binary_filename(json_dir, filename)
try:
with open(filename, "rb") as f:
size = os.fstat(f.fileno()).st_size
if size % values.itemsize:
raise ValueError("file size is not a multiple of float size")
values.fromfile(f, size // values.itemsize)
except FileNotFoundError:
return None
# Match np.fromfile(fn, "<f4"): little-endian float32.
if sys.byteorder != "little":
values.byteswap()
if count != len(values):
raise ValueError(f"expected {count} values in {filename}, found {len(values)}")
return values
def parse_samples(state: dict, json_dir: str) -> array.array | None:
"""Return the state's sample times, or None if sample data is unavailable."""
sample_count, samples_filename = parse_samples_meta(state)
return parse_float32_binary(sample_count, samples_filename, json_dir)
def parse_frequencies(state: dict, json_dir: str) -> array.array | None:
"""Return the state's sample frequencies, or None if data is unavailable."""
frequency_count, frequencies_filename = parse_frequencies_meta(state)
return parse_float32_binary(frequency_count, frequencies_filename, json_dir)
def parse_bw(summaries: dict[str, BenchmarkResultSummary]) -> float | None:
bwutil = summaries.get("nv/cold/bw/global/utilization")
if bwutil is None or bwutil.value is None:
return None
return float(bwutil.value)
def get_axis_name(axis: dict) -> str:
name = axis["name"]
if af := axis.get("flags"):
name = name + f"[{af}]"
return name
class SubBenchmarkState:
"""Result data for one executed state of an NVBench benchmark."""
def __init__(self, state: dict, axes_names: dict, axes_values: dict, json_dir: str):
self.state_name = state["name"]
self.device = state.get("device")
self.type_config_index = state.get("type_config_index")
self.axis_values = state.get("axis_values") or []
self.is_skipped = state.get("is_skipped", False)
self.skip_reason = state.get("skip_reason")
self.summaries = parse_summaries(state)
self.samples = parse_samples(state, json_dir)
self.frequencies = parse_frequencies(state, json_dir)
if (
self.samples is not None
and self.frequencies is not None
and len(self.samples) != len(self.frequencies)
):
raise ValueError(
f"sample count ({len(self.samples)}) does not match "
f"frequency count ({len(self.frequencies)})"
)
self.bw = parse_bw(self.summaries)
self.point = {}
for axis in self.axis_values:
axis_name = axis["name"]
name = axes_names[axis_name]
axis_value_map = axes_values[axis_name]
if "value" in axis:
key = str(axis["value"])
value = axis_value_map.get(key, key)
else:
input_string = axis.get("input_string")
value = (
axis_value_map.get(input_string, input_string)
if input_string is not None
else ""
)
self.point[name] = value
def __repr__(self) -> str:
return str(self.__dict__)
def name(self) -> str:
if not self.point:
return self.state_name
return " ".join(f"{k}={v}" for k, v in self.point.items())
def center(self, estimator: Callable[[array.array], ResultT]) -> ResultT | None:
if self.samples is None:
return None
return estimator(self.samples)
def center_with_frequencies(
self, estimator: Callable[[array.array, array.array], ResultT]
) -> ResultT | None:
if self.samples is None or self.frequencies is None:
return None
return estimator(self.samples, self.frequencies)
class SubBenchmarkResult:
"""Result data for one NVBench benchmark and its executed states."""
def __init__(self, bench: dict, json_dir: str):
self.name = bench["name"]
self.devices = bench.get("devices") or []
self.axes = bench.get("axes") or []
axes_names = {}
axes_values = {}
for axis in self.axes:
short_name = axis["name"]
full_name = get_axis_name(axis)
this_axis_values = {}
for value in axis["values"]:
input_string = value["input_string"]
this_axis_values[input_string] = input_string
if "value" in value:
this_axis_values[str(value["value"])] = input_string
axes_names[short_name] = full_name
axes_values[short_name] = this_axis_values
self.states = [
SubBenchmarkState(state, axes_names, axes_values, json_dir)
for state in bench["states"]
]
def __repr__(self) -> str:
return str(self.__dict__)
def __len__(self) -> int:
return len(self.states)
def __getitem__(
self, state_index: int | slice
) -> SubBenchmarkState | list[SubBenchmarkState]:
return self.states[state_index]
def __iter__(self) -> Iterator[SubBenchmarkState]:
return iter(self.states)
def centers(
self, estimator: Callable[[array.array], ResultT]
) -> dict[str, ResultT | None]:
result = {}
for state in self.states:
result[state.name()] = state.center(estimator)
return result
def centers_with_frequencies(
self, estimator: Callable[[array.array, array.array], ResultT]
) -> dict[str, ResultT | None]:
result = {}
for state in self.states:
result[state.name()] = state.center_with_frequencies(estimator)
return result
class BenchmarkResult:
"""Container for benchmark result data parsed from NVBench JSON output.
Instances are created with :meth:`from_json` or :meth:`empty`. Direct
construction is intentionally disabled to keep creation paths explicit.
"""
_construction_token = object()
def __init__(
self,
token=None,
):
"""Initialize an instance created by a BenchmarkResult class method.
Users should call :meth:`from_json` or :meth:`empty` instead. The token
argument is an implementation detail used to prevent direct
construction.
"""
if token is not self._construction_token:
raise TypeError(
"BenchmarkResult cannot be constructed directly; "
"use BenchmarkResult.from_json() or BenchmarkResult.empty()"
)
self.metadata: Any = None
self.devices: dict[int, BenchmarkResultDevice] = {}
self.subbenches: dict[str, SubBenchmarkResult] = {}
@classmethod
def empty(cls: type[BenchmarkResultT], *, metadata: Any = None) -> BenchmarkResultT:
"""Create an empty result container with optional user metadata."""
result = cls(cls._construction_token)
result.metadata = metadata
return result
@classmethod
def from_json(
cls: type[BenchmarkResultT],
json_path: str | os.PathLike[str],
*,
metadata: Any = None,
) -> BenchmarkResultT:
"""Read benchmark result data from an NVBench JSON output file."""
result = cls.empty(metadata=metadata)
result._parse_json(json_path)
return result
def _parse_json(self, json_path: str | os.PathLike[str]) -> None:
"""Populate this instance from an NVBench JSON output file."""
json_path = os.fspath(json_path)
json_dir = os.path.dirname(os.path.abspath(json_path))
result_json = read_json(json_path)
self.devices = {
int(device["id"]): BenchmarkResultDevice(
id=int(device["id"]),
name=device["name"],
data=device,
)
for device in result_json.get("devices", [])
}
for bench in result_json["benchmarks"]:
bench_name: str = bench["name"]
self.subbenches[bench_name] = SubBenchmarkResult(bench, json_dir)
def __repr__(self) -> str:
return str(self.__dict__)
def __len__(self) -> int:
return len(self.subbenches)
def __iter__(self) -> Iterator[str]:
return iter(self.subbenches)
def __contains__(self, subbench_name: object) -> bool:
return subbench_name in self.subbenches
def __getitem__(self, subbench_name: str) -> SubBenchmarkResult:
return self.subbenches[subbench_name]
def keys(self) -> KeysView[str]:
return self.subbenches.keys()
def values(self) -> ValuesView[SubBenchmarkResult]:
return self.subbenches.values()
def items(self) -> ItemsView[str, SubBenchmarkResult]:
return self.subbenches.items()
def centers(
self, estimator: Callable[[array.array], ResultT]
) -> dict[str, dict[str, ResultT | None]]:
result = {}
for subbench in self.subbenches:
result[subbench] = self.subbenches[subbench].centers(estimator)
return result
def centers_with_frequencies(
self, estimator: Callable[[array.array, array.array], ResultT]
) -> dict[str, dict[str, ResultT | None]]:
result = {}
for subbench in self.subbenches:
result[subbench] = self.subbenches[subbench].centers_with_frequencies(
estimator
)
return result

View File

@@ -0,0 +1,361 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
from __future__ import annotations
import argparse
import statistics
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
from cuda.bench.results import BenchmarkResult, BenchmarkResultSummary
from tabulate import tabulate
TILE_SHAPES = ("4x32", "8x16", "16x16", "32x8", "16x8", "8x8")
BENCHMARK_NAME = "stencil_autotune"
MEDIAN_TIE_RELATIVE_TOLERANCE = 0.01
MIN_RECOMMENDED_INTERIOR_PIXELS = 1_000_000
def parse_tile_shape(tile_shape: str) -> tuple[int, int]:
block_x, block_y = tile_shape.split("x", maxsplit=1)
return int(block_x), int(block_y)
def format_duration(seconds: float) -> str:
if seconds >= 1.0:
return f"{seconds:.3f} s"
if seconds >= 1e-3:
return f"{seconds * 1e3:.3f} ms"
if seconds >= 1e-6:
return f"{seconds * 1e6:.3f} us"
return f"{seconds * 1e9:.3f} ns"
def format_optional_duration(seconds: float | None) -> str:
if seconds is None:
return "-"
return format_duration(seconds)
def format_byte_rate(summary: BenchmarkResultSummary | None) -> str:
if summary is None or summary.value is None:
return "-"
bytes_per_second = float(summary.value)
if bytes_per_second >= 1e12:
return f"{bytes_per_second * 1e-12:.3f} TB/s"
if bytes_per_second >= 1e9:
return f"{bytes_per_second * 1e-9:.3f} GB/s"
if bytes_per_second >= 1e6:
return f"{bytes_per_second * 1e-6:.3f} MB/s"
if bytes_per_second >= 1e3:
return f"{bytes_per_second * 1e-3:.3f} KB/s"
return f"{bytes_per_second:.3f} B/s"
def state_tile_shape(state_name: str) -> str:
prefix = "TileShape="
for field in state_name.split():
if field.startswith(prefix):
return field.removeprefix(prefix)
return state_name
def interior_pixel_count(width: int, height: int) -> int:
return max(width - 2, 0) * max(height - 2, 0)
def median_ties_best(row: dict[str, Any], best_median_seconds: float) -> bool:
tolerance = abs(best_median_seconds) * MEDIAN_TIE_RELATIVE_TOLERANCE
return abs(row["median_seconds"] - best_median_seconds) <= tolerance
def summarize_result(result: BenchmarkResult) -> list[dict[str, Any]]:
subbenchmark = result[BENCHMARK_NAME]
medians = subbenchmark.centers(statistics.median)
metadata = result.metadata if isinstance(result.metadata, dict) else {}
rows = []
for state in subbenchmark:
median_seconds = medians[state.name()]
if median_seconds is None:
continue
bandwidth = state.summaries.get("nv/cold/bw/global/bytes_per_second")
mean_summary = state.summaries.get("nv/cold/time/gpu/mean")
mean_seconds = (
None
if mean_summary is None or mean_summary.value is None
else float(mean_summary.value)
)
rows.append(
{
"tile_shape": state_tile_shape(state.name()),
"median_seconds": median_seconds,
"mean_seconds": mean_seconds,
"sample_count": len(state.samples) if state.samples is not None else 0,
"bandwidth": format_byte_rate(bandwidth),
"subprocess_seconds": metadata.get("elapsed_seconds", 0.0),
}
)
return sorted(rows, key=lambda row: row["median_seconds"])
def print_summary(rows: list[dict[str, Any]]) -> None:
if not rows:
raise RuntimeError("No benchmark states with sample data were found.")
total_subprocess_seconds = sum(row["subprocess_seconds"] for row in rows)
print()
print(f"Total benchmark subprocess wall time: {total_subprocess_seconds:.3f} s")
print()
best_median_seconds = rows[0]["median_seconds"]
tied_rows = [row for row in rows if median_ties_best(row, best_median_seconds)]
table = [
[
"*" if row in tied_rows else "",
row["tile_shape"],
format_duration(row["median_seconds"]),
format_optional_duration(row["mean_seconds"]),
row["sample_count"],
row["bandwidth"],
f"{row['subprocess_seconds']:.3f} s",
]
for row in rows
]
print(
tabulate(
table,
headers=[
"Best",
"TileShape",
"Median GPU Time",
"Mean GPU Time",
"Samples",
"GlobalMem BW",
"Subprocess",
],
tablefmt="simple",
disable_numparse=True,
)
)
print()
if len(tied_rows) == 1:
best = tied_rows[0]
print(
"Best tile shape by median isolated GPU time: "
f"{best['tile_shape']} ({format_duration(best['median_seconds'])})"
)
else:
tile_shapes = ", ".join(row["tile_shape"] for row in tied_rows)
print(
"No unique best tile shape by median isolated GPU time: "
f"{len(tied_rows)} states are within "
f"{MEDIAN_TIE_RELATIVE_TOLERANCE:.1%} of "
f"{format_duration(best_median_seconds)} ({tile_shapes})."
)
def run_driver(args: argparse.Namespace, nvbench_args: list[str]) -> int:
with tempfile.TemporaryDirectory(prefix="nvbench-autotune-") as tmp_dir:
rows = []
total = len(TILE_SHAPES)
interior_pixels = interior_pixel_count(args.image_width, args.image_height)
print(
f"Image size: {args.image_width}x{args.image_height} "
f"({interior_pixels} interior stencil points)"
)
print(f"Sampling {total} tile shapes:")
if interior_pixels < MIN_RECOMMENDED_INTERIOR_PIXELS:
print(
"Warning: this problem has only "
f"{interior_pixels} interior stencil points. "
"Small problems are usually dominated by kernel launch overhead, "
"so median timings may tie across tile shapes."
)
for index, tile_shape in enumerate(TILE_SHAPES, start=1):
json_path = Path(tmp_dir) / f"stencil_autotune_{tile_shape}.json"
command = [
sys.executable,
str(Path(__file__).resolve()),
"--run-benchmark",
"--stopping-criterion",
"entropy",
"--tile-shape",
tile_shape,
"--image-width",
str(args.image_width),
"--image-height",
str(args.image_height),
"--jsonbin",
str(json_path),
]
if nvbench_args:
command.extend(["--", *nvbench_args])
print(f"[{index}/{total}] TileShape={tile_shape} ... ", end="", flush=True)
start = time.perf_counter()
completed = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=False,
)
elapsed_seconds = time.perf_counter() - start
if completed.returncode != 0:
print(f"failed after {elapsed_seconds:.3f} s")
print(completed.stdout, end="")
return completed.returncode
metadata = {
"command": command,
"returncode": completed.returncode,
"elapsed_seconds": elapsed_seconds,
"tile_shape": tile_shape,
}
result = BenchmarkResult.from_json(json_path, metadata=metadata)
tile_rows = summarize_result(result)
rows.extend(tile_rows)
if tile_rows:
row = tile_rows[0]
print(
f"done in {elapsed_seconds:.3f} s, "
f"median {format_duration(row['median_seconds'])}, "
f"{row['bandwidth']}"
)
else:
print(f"done in {elapsed_seconds:.3f} s, no samples")
print_summary(sorted(rows, key=lambda row: row["median_seconds"]))
return 0
def run_benchmark(args: argparse.Namespace, nvbench_args: list[str]) -> None:
import cuda.bench as bench
import numpy as np
from numba import cuda
def as_cuda_stream(cs: bench.CudaStream) -> cuda.cudadrv.driver.Stream:
return cuda.external_stream(cs.addressof())
@cuda.jit
def stencil_kernel(inp, out, width, height):
x, y = cuda.grid(2)
if 0 < x < width - 1 and 0 < y < height - 1:
idx = y * width + x
out[idx] = 0.2 * (
inp[idx]
+ inp[idx - 1]
+ inp[idx + 1]
+ inp[idx - width]
+ inp[idx + width]
)
def stencil_autotune(state: bench.State) -> None:
tile_shape = state.get_string("TileShape")
block_x, block_y = parse_tile_shape(tile_shape)
width = args.image_width
height = args.image_height
interior_pixels = (width - 2) * (height - 2)
state.add_element_count(interior_pixels, column_name="Pixels")
state.add_global_memory_reads(
interior_pixels * 5 * np.dtype(np.float32).itemsize
)
state.add_global_memory_writes(interior_pixels * np.dtype(np.float32).itemsize)
host_input = np.ones(width * height, dtype=np.float32)
dev_input = cuda.to_device(host_input)
dev_output = cuda.device_array_like(dev_input)
block_shape = (block_x, block_y)
grid_shape = (
(width + block_x - 1) // block_x,
(height + block_y - 1) // block_y,
)
# Compile the Numba kernel outside NVBench measurement.
stencil_kernel[grid_shape, block_shape](dev_input, dev_output, width, height)
cuda.synchronize()
def launcher(launch: bench.Launch) -> None:
stream = as_cuda_stream(launch.get_stream())
stencil_kernel[grid_shape, block_shape, stream, 0](
dev_input,
dev_output,
width,
height,
)
state.exec(launcher)
benchmark = bench.register(stencil_autotune)
benchmark.set_name(BENCHMARK_NAME)
tile_shapes = [args.tile_shape] if args.tile_shape is not None else TILE_SHAPES
benchmark.add_string_axis("TileShape", tile_shapes)
bench.run_all_benchmarks([sys.argv[0], *nvbench_args])
def parse_args(argv: list[str] | None = None) -> tuple[argparse.Namespace, list[str]]:
parser = argparse.ArgumentParser(
description=(
"Autotune a simple stencil benchmark and select the best state "
"from NVBench JSON-bin output."
),
epilog=(
"Additional NVBench options may be passed after '--'. "
"For example: benchmark_result_autotune.py -- --timeout 30"
),
)
parser.add_argument(
"--run-benchmark",
action="store_true",
help=argparse.SUPPRESS,
)
parser.add_argument(
"--tile-shape",
choices=TILE_SHAPES,
default=None,
help=argparse.SUPPRESS,
)
parser.add_argument(
"--image-width",
type=int,
default=4096,
help="Stencil input width used by the subprocess benchmark.",
)
parser.add_argument(
"--image-height",
type=int,
default=4096,
help="Stencil input height used by the subprocess benchmark.",
)
args, nvbench_args = parser.parse_known_args(argv)
if args.image_width < 3 or args.image_height < 3:
parser.error("--image-width and --image-height must both be at least 3")
nvbench_args = [arg for arg in nvbench_args if arg != "--"]
return args, nvbench_args
def main(argv: list[str] | None = None) -> int:
args, nvbench_args = parse_args(argv)
if args.run_benchmark:
run_benchmark(args, nvbench_args)
return 0
return run_driver(args, nvbench_args)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -6,4 +6,5 @@ numba-cuda
cuda-cccl
cupy
nvidia-cute-dsl[cu13]
tabulate
torch[cu13]

View File

@@ -1,3 +1,6 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
[build-system]
requires = ["scikit-build-core>=0.10", "setuptools_scm"]
build-backend = "scikit_build_core.build"
@@ -52,6 +55,7 @@ tools = [
[project.scripts]
nvbench-compare = "scripts.nvbench_compare:main"
nvbench-histogram = "scripts.nvbench_histogram:main"
nvbench-json-summary = "scripts.nvbench_json_summary:main"
nvbench-walltime = "scripts.nvbench_walltime:main"
[project.urls]
@@ -85,4 +89,5 @@ fallback_version = "0.0.0"
[tool.scikit-build.wheel.packages]
"cuda" = "cuda"
"cuda/bench" = "cuda/bench"
"cuda/bench/results" = "cuda/bench/results"
"scripts" = "scripts"

View File

@@ -0,0 +1,268 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from cuda.bench.results import (
BenchmarkResult,
BenchmarkResultSummary,
SubBenchmarkResult,
SubBenchmarkState,
)
class MarkdownTable:
def __init__(self):
self.columns = []
def add_cell(self, row: int, key: str, header: str, value: str) -> None:
column = next((col for col in self.columns if col["key"] == key), None)
if column is None:
column = {
"key": key,
"header": header,
"rows": [],
"max_width": len(header),
}
self.columns.append(column)
column["max_width"] = max(column["max_width"], len(value))
while len(column["rows"]) <= row:
column["rows"].append("")
column["rows"][row] = value
def to_string(self) -> str:
if not self.columns:
return ""
num_rows = max(len(column["rows"]) for column in self.columns)
for column in self.columns:
while len(column["rows"]) < num_rows:
column["rows"].append("")
header = "|"
divider = "|"
for column in self.columns:
width = column["max_width"]
header += f" {column['header']:^{width}} |"
divider += f"{'':-^{width + 2}}|"
rows = []
for row in range(num_rows):
row_text = "|"
for column in self.columns:
row_text += f" {column['rows'][row]:>{column['max_width']}} |"
rows.append(row_text)
return "\n".join([header, divider, *rows]) + "\n"
def format_default(summary: BenchmarkResultSummary) -> str:
value = summary.value
if isinstance(value, float):
return f"{value:.5g}"
if value is None:
return ""
return str(value)
def format_duration(summary: BenchmarkResultSummary) -> str:
seconds = float(summary["value"])
if seconds >= 1.0:
return f"{seconds:0.3f} s"
if seconds >= 1e-3:
return f"{seconds * 1e3:0.3f} ms"
if seconds >= 1e-6:
return f"{seconds * 1e6:0.3f} us"
return f"{seconds * 1e9:0.3f} ns"
def format_item_rate(summary: BenchmarkResultSummary) -> str:
items_per_second = float(summary["value"])
if items_per_second >= 1e15:
return f"{items_per_second * 1e-15:0.3f}P"
if items_per_second >= 1e12:
return f"{items_per_second * 1e-12:0.3f}T"
if items_per_second >= 1e9:
return f"{items_per_second * 1e-9:0.3f}G"
if items_per_second >= 1e6:
return f"{items_per_second * 1e-6:0.3f}M"
if items_per_second >= 1e3:
return f"{items_per_second * 1e-3:0.3f}K"
return f"{items_per_second:0.3f}"
def format_frequency(summary: BenchmarkResultSummary) -> str:
frequency_hz = float(summary["value"])
if frequency_hz >= 1e9:
return f"{frequency_hz * 1e-9:0.3f} GHz"
if frequency_hz >= 1e6:
return f"{frequency_hz * 1e-6:0.3f} MHz"
if frequency_hz >= 1e3:
return f"{frequency_hz * 1e-3:0.3f} KHz"
return f"{frequency_hz:0.3f} Hz"
def format_bytes(summary: BenchmarkResultSummary) -> str:
nbytes = float(summary["value"])
if nbytes >= 1024.0 * 1024.0 * 1024.0:
return f"{nbytes / (1024.0 * 1024.0 * 1024.0):0.3f} GiB"
if nbytes >= 1024.0 * 1024.0:
return f"{nbytes / (1024.0 * 1024.0):0.3f} MiB"
if nbytes >= 1024.0:
return f"{nbytes / 1024.0:0.3f} KiB"
return f"{nbytes:0.3f} B"
def format_byte_rate(summary: BenchmarkResultSummary) -> str:
bytes_per_second = float(summary["value"])
if bytes_per_second >= 1e15:
return f"{bytes_per_second * 1e-15:0.3f} PB/s"
if bytes_per_second >= 1e12:
return f"{bytes_per_second * 1e-12:0.3f} TB/s"
if bytes_per_second >= 1e9:
return f"{bytes_per_second * 1e-9:0.3f} GB/s"
if bytes_per_second >= 1e6:
return f"{bytes_per_second * 1e-6:0.3f} MB/s"
if bytes_per_second >= 1e3:
return f"{bytes_per_second * 1e-3:0.3f} KB/s"
return f"{bytes_per_second:0.3f} B/s"
def format_sample_size(summary: BenchmarkResultSummary) -> str:
return f"{int(summary['value'])}x"
def format_percentage(summary: BenchmarkResultSummary) -> str:
return f"{float(summary['value']) * 100.0:.2f}%"
def format_summary(summary: BenchmarkResultSummary) -> str:
if summary.value is None:
return ""
if summary.hint == "duration":
return format_duration(summary)
if summary.hint == "item_rate":
return format_item_rate(summary)
if summary.hint == "frequency":
return format_frequency(summary)
if summary.hint == "bytes":
return format_bytes(summary)
if summary.hint == "byte_rate":
return format_byte_rate(summary)
if summary.hint == "sample_size":
return format_sample_size(summary)
if summary.hint == "percentage":
return format_percentage(summary)
return format_default(summary)
def format_axis_value(
axis_value: dict, axes_by_name: dict[str, dict]
) -> tuple[str, str]:
name = axis_value["name"]
axis = axes_by_name.get(name, {})
value = axis_value["value"]
if value is None:
return name, ""
if axis.get("type") == "int64" and axis.get("flags") == "pow2":
int_value = int(value)
exponent = int_value.bit_length() - 1
return name, f"2^{exponent} = {int_value}"
value_type = axis_value.get("type", axis.get("type"))
if value_type == "int64":
return name, str(int(value))
if value_type == "float64":
return name, f"{float(value):.5g}"
return name, str(value)
def add_state_row(
table: MarkdownTable,
row: int,
state: SubBenchmarkState,
bench: SubBenchmarkResult,
) -> None:
axes_by_name = {axis["name"]: axis for axis in bench.axes}
for axis_value in state.axis_values:
header, value = format_axis_value(axis_value, axes_by_name)
table.add_cell(row, f"axis:{header}", header, value)
for summary in state.summaries.values():
if summary.hide:
continue
header = summary.name if summary.name is not None else summary.tag
table.add_cell(row, summary.tag, header, format_summary(summary))
def format_benchmark(result: BenchmarkResult, bench: SubBenchmarkResult) -> str:
parts = [f"## {bench.name}\n\n"]
device_ids: list[int | None] = list(bench.devices) if bench.devices else [None]
for device_id in device_ids:
if device_id is not None:
device = result.devices.get(device_id)
device_name = device.name if device is not None else f"Device {device_id}"
parts.append(f"### [{device_id}] {device_name}\n\n")
table = MarkdownTable()
row = 0
for state in bench.states:
if state.is_skipped:
continue
if device_id is not None and state.device != device_id:
continue
add_state_row(table, row, state, bench)
row += 1
table_text = table.to_string()
parts.append(table_text if table_text else "No data -- check log.\n")
return "".join(parts)
def format_result(result: BenchmarkResult) -> str:
parts = ["# Benchmark Results\n"]
for bench in result.values():
parts.append(f"\n{format_benchmark(result, bench)}")
return "".join(parts)
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="nvbench-json-summary",
description="Print an NVBench-style markdown summary table from NVBench JSON output.",
)
parser.add_argument("json_path", help="Path to an NVBench JSON output file.")
parser.add_argument(
"-o",
"--output",
type=Path,
help="Write markdown output to this file instead of stdout.",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
result = BenchmarkResult.from_json(args.json_path)
report = format_result(result)
if args.output is not None:
args.output.write_text(report, encoding="utf-8")
else:
print(report)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,750 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
import json
import struct
from dataclasses import dataclass
import cuda.bench
import cuda.bench.results as results
import pytest
def write_json(path, data):
path.write_text(json.dumps(data), encoding="utf-8")
def block_size_axis(*values):
return {
"name": "BlockSize",
"type": "int64",
"flags": "pow2",
"values": [
{
"input_string": str(value),
"description": f"2^{value} = {2**value}",
"value": 2**value,
}
for value in values
],
}
def sample_file_summary(tag, filename, size):
return {
"tag": tag,
"data": [
{
"name": "filename",
"type": "string",
"value": filename,
},
{
"name": "size",
"type": "int64",
"value": str(size),
},
],
}
def sample_times_summary(filename, size):
return sample_file_summary(
"nv/json/bin:nv/cold/sample_times",
filename,
size,
)
def sample_frequencies_summary(filename, size):
return sample_file_summary(
"nv/json/freqs-bin:nv/cold/sample_freqs",
filename,
size,
)
def bwutil_summary(value):
return {
"tag": "nv/cold/bw/global/utilization",
"name": "BWUtil",
"hint": "percentage",
"description": "Global memory utilization",
"data": [
{
"name": "value",
"type": "float64",
"value": str(value),
}
],
}
@pytest.fixture
def sample_result_path(tmp_path):
bin_dir = tmp_path / "result.json-bin"
bin_dir.mkdir()
(bin_dir / "0.bin").write_bytes(struct.pack("<3f", 1.0, 2.0, 4.0))
freq_bin_dir = tmp_path / "result.json-freqs-bin"
freq_bin_dir.mkdir()
(freq_bin_dir / "0.bin").write_bytes(struct.pack("<3f", 100.0, 200.0, 400.0))
json_fn = tmp_path / "result.json"
write_json(
json_fn,
{
"benchmarks": [
{
"name": "copy",
"axes": [block_size_axis(8)],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"summaries": [
sample_times_summary("result.json-bin/0.bin", 3),
bwutil_summary(0.75),
sample_frequencies_summary(
"result.json-freqs-bin/0.bin",
3,
),
],
"is_skipped": False,
}
],
}
]
},
)
return json_fn
@pytest.fixture
def sample_result(sample_result_path):
return results.BenchmarkResult.from_json(sample_result_path)
@pytest.fixture
def sample_subbenchmark(sample_result):
return sample_result["copy"]
@pytest.fixture
def sample_state(sample_subbenchmark):
return sample_subbenchmark[0]
def test_result_classes_are_exposed_from_results_namespace():
assert results.BenchmarkResult.__module__ == results.__name__
assert results.BenchmarkResultSummary.__module__ == results.__name__
assert not hasattr(cuda.bench, "BenchmarkResult")
def test_from_json_preserves_optional_metadata(sample_result_path):
metadata = {"returncode": 0, "elapsed_seconds": 0.25}
default_result = results.BenchmarkResult.from_json(sample_result_path)
result = results.BenchmarkResult.from_json(sample_result_path, metadata=metadata)
assert default_result.metadata is None
assert result.metadata is metadata
def test_benchmark_result_implements_mapping_protocol(sample_result):
subbenchmark = sample_result["copy"]
assert len(sample_result) == 1
assert list(sample_result) == ["copy"]
assert list(sample_result.keys()) == ["copy"]
assert list(sample_result.values()) == [subbenchmark]
assert list(sample_result.items()) == [("copy", subbenchmark)]
assert "copy" in sample_result
assert "missing" not in sample_result
assert subbenchmark is sample_result.subbenches["copy"]
with pytest.raises(KeyError):
sample_result["missing"]
def test_subbenchmark_result_implements_sequence_protocol(sample_subbenchmark):
state = sample_subbenchmark[0]
assert len(sample_subbenchmark) == 1
assert sample_subbenchmark[-1] is state
assert sample_subbenchmark[:] == sample_subbenchmark.states
assert list(sample_subbenchmark) == sample_subbenchmark.states
with pytest.raises(IndexError):
sample_subbenchmark[1]
def test_state_parses_axis_name_and_bandwidth(sample_state):
assert sample_state.name() == "BlockSize[pow2]=8"
assert sample_state.bw == 0.75
def test_state_stores_rich_summary_metadata(sample_state):
bw_summary = sample_state.summaries["nv/cold/bw/global/utilization"]
assert bw_summary.tag == "nv/cold/bw/global/utilization"
assert bw_summary.name == "BWUtil"
assert bw_summary.hint == "percentage"
assert bw_summary.hide is None
assert bw_summary.description == "Global memory utilization"
assert bw_summary.value == pytest.approx(0.75)
assert bw_summary["value"] == pytest.approx(0.75)
assert sample_state.summaries["nv/json/bin:nv/cold/sample_times"].data == {
"filename": "result.json-bin/0.bin",
"size": 3,
}
assert sample_state.summaries["nv/json/freqs-bin:nv/cold/sample_freqs"].data == {
"filename": "result.json-freqs-bin/0.bin",
"size": 3,
}
def test_state_preserves_null_summary_values(tmp_path):
json_fn = tmp_path / "result.json"
write_json(
json_fn,
{
"benchmarks": [
{
"name": "copy",
"axes": [],
"states": [
{
"name": "Device=0",
"axis_values": [],
"summaries": [
{
"tag": "nv/cold/time/gpu/stdev/relative",
"name": "Noise",
"hint": "percentage",
"data": [
{
"name": "value",
"type": "float64",
"value": None,
}
],
}
],
"is_skipped": False,
}
],
}
]
},
)
summary = results.BenchmarkResult.from_json(json_fn)["copy"][0].summaries[
"nv/cold/time/gpu/stdev/relative"
]
assert summary.value is None
assert summary["value"] is None
def test_state_reports_malformed_numeric_summary_values(tmp_path):
json_fn = tmp_path / "result.json"
write_json(
json_fn,
{
"benchmarks": [
{
"name": "copy",
"axes": [],
"states": [
{
"name": "Device=0",
"axis_values": [],
"summaries": [
{
"tag": "nv/cold/time/gpu/mean",
"name": "GPU Time",
"hint": "duration",
"data": [
{
"name": "value",
"type": "float64",
"value": "not-a-number",
}
],
}
],
"is_skipped": False,
}
],
}
]
},
)
with pytest.raises(
ValueError,
match=(
"summary 'nv/cold/time/gpu/mean' field 'value' "
"value 'not-a-number' is not a float64"
),
):
results.BenchmarkResult.from_json(json_fn)
def test_state_loads_samples_and_frequencies(sample_state):
assert sample_state.samples is not None
assert list(sample_state.samples) == pytest.approx([1.0, 2.0, 4.0])
assert sample_state.frequencies is not None
assert list(sample_state.frequencies) == pytest.approx([100.0, 200.0, 400.0])
def test_centers_apply_estimators_to_samples(sample_result):
centers = sample_result.centers(lambda samples: sum(samples) / len(samples))
assert centers == {"copy": {"BlockSize[pow2]=8": pytest.approx(7.0 / 3.0)}}
def test_centers_with_frequencies_apply_estimators(sample_result, sample_subbenchmark):
def weighted_mean(samples, frequencies):
return sum(
sample * frequency for sample, frequency in zip(samples, frequencies)
) / sum(frequencies)
weighted_centers = sample_result.centers_with_frequencies(weighted_mean)
assert weighted_centers == {"copy": {"BlockSize[pow2]=8": pytest.approx(3.0)}}
assert (
sample_subbenchmark.centers_with_frequencies(weighted_mean)
== weighted_centers["copy"]
)
def test_benchmark_result_constructor_is_private():
with pytest.raises(TypeError, match="from_json\\(\\).*empty\\(\\)"):
results.BenchmarkResult()
with pytest.raises(TypeError, match="from_json\\(\\).*empty\\(\\)"):
results.BenchmarkResult("result.json")
with pytest.raises(TypeError):
results.BenchmarkResult(metadata=None)
with pytest.raises(TypeError):
results.BenchmarkResult(json_path="result.json", parse=False)
def test_benchmark_result_empty_does_not_read_json(tmp_path):
@dataclass
class RunMetadata:
returncode: int
elapsed_seconds: float
metadata = RunMetadata(returncode=1, elapsed_seconds=0.25)
missing_json = tmp_path / "missing.json"
result = results.BenchmarkResult.empty(metadata=metadata)
assert result.metadata is metadata
assert result.subbenches == {}
with pytest.raises(FileNotFoundError):
results.BenchmarkResult.from_json(missing_json, metadata=metadata)
with pytest.raises(FileNotFoundError):
results.BenchmarkResult.from_json(json_path=missing_json, metadata=metadata)
def test_benchmark_result_accepts_no_axis_benchmark_with_recorded_binary_path(
tmp_path, monkeypatch
):
data_dir = tmp_path / "temp_data"
data_dir.mkdir()
bin_dir = data_dir / "axes_run1.json-bin"
bin_dir.mkdir()
(bin_dir / "0.bin").write_bytes(struct.pack("<2f", 1.0, 4.0))
freq_bin_dir = data_dir / "axes_run1.json-freqs-bin"
freq_bin_dir.mkdir()
(freq_bin_dir / "0.bin").write_bytes(struct.pack("<2f", 100.0, 400.0))
json_fn = data_dir / "axes_run1.json"
write_json(
json_fn,
{
"benchmarks": [
{
"name": "simple",
"axes": None,
"states": [
{
"name": "Device=0",
"axis_values": None,
"summaries": [
sample_times_summary(
"temp_data/axes_run1.json-bin/0.bin",
2,
),
sample_frequencies_summary(
"temp_data/axes_run1.json-freqs-bin/0.bin",
2,
),
],
"is_skipped": False,
}
],
}
]
},
)
monkeypatch.chdir(tmp_path)
result = results.BenchmarkResult.from_json("temp_data/axes_run1.json")
state = result.subbenches["simple"].states[0]
assert state.name() == "Device=0"
assert state.point == {}
assert state.samples is not None
assert list(state.samples) == pytest.approx([1.0, 4.0])
assert state.frequencies is not None
assert list(state.frequencies) == pytest.approx([100.0, 400.0])
def test_benchmark_result_accepts_axis_value_input_string():
result = results.SubBenchmarkResult(
{
"name": "single_float64_axis",
"axes": [
{
"name": "Duration",
"type": "float64",
"flags": "",
"values": [
{
"input_string": "0",
"description": "",
"value": 0.0,
}
],
}
],
"states": [
{
"name": "Device=0 Duration=0",
"axis_values": [
{
"name": "Duration",
"type": "float64",
"value": "0",
}
],
"summaries": [],
"is_skipped": False,
}
],
},
"",
)
state = result.states[0]
assert state.name() == "Duration=0"
assert state.point == {"Duration": "0"}
def test_benchmark_result_normalizes_axis_value_lookup_key():
result = results.SubBenchmarkResult(
{
"name": "num_blocks",
"axes": [
{
"name": "NumBlocks",
"type": "int64",
"flags": "",
"values": [
{
"input_string": "64",
"description": "",
"value": 64,
},
{
"input_string": "default",
"description": "",
"value": None,
},
],
}
],
"states": [
{
"name": "Device=0 NumBlocks=64",
"axis_values": [
{
"name": "NumBlocks",
"type": "int64",
"value": 64,
}
],
"summaries": [],
"is_skipped": False,
},
{
"name": "Device=0 NumBlocks=default",
"axis_values": [
{
"name": "NumBlocks",
"type": "int64",
"value": None,
}
],
"summaries": [],
"is_skipped": False,
},
{
"name": "Device=0 NumBlocks=64",
"axis_values": [
{
"name": "NumBlocks",
"type": "int64",
"input_string": "64",
}
],
"summaries": [],
"is_skipped": False,
},
],
},
"",
)
assert result.states[0].point == {"NumBlocks": "64"}
assert result.states[1].point == {"NumBlocks": "default"}
assert result.states[2].point == {"NumBlocks": "64"}
def test_benchmark_result_preserves_skipped_state_with_no_summaries():
result = results.SubBenchmarkResult(
{
"name": "copy_sweep_grid_shape",
"axes": [block_size_axis(6, 8)],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"summaries": None,
"is_skipped": True,
},
{
"name": "Device=0 BlockSize=2^6",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "64",
}
],
"summaries": [],
"is_skipped": False,
},
],
},
"",
)
assert len(result.states) == 2
assert result.states[0].name() == "BlockSize[pow2]=8"
assert result.states[0].is_skipped is True
assert result.states[0].summaries == {}
assert result.states[0].samples is None
assert result.states[0].frequencies is None
assert result.states[1].name() == "BlockSize[pow2]=6"
assert result.states[1].is_skipped is False
def test_benchmark_result_uses_empty_summaries_when_field_is_missing():
result = results.SubBenchmarkResult(
{
"name": "copy_sweep_grid_shape",
"axes": [block_size_axis(8)],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"is_skipped": False,
},
],
},
"",
)
state = result.states[0]
assert state.name() == "BlockSize[pow2]=8"
assert state.summaries == {}
assert state.samples is None
assert state.frequencies is None
assert state.bw is None
@pytest.mark.parametrize(
"field_name,bad_type,expected_type",
[
("filename", "int64", "string"),
("size", "string", "int64"),
],
)
def test_benchmark_result_validates_binary_summary_field_types(
field_name, bad_type, expected_type
):
summary = sample_times_summary("result.json-bin/0.bin", 3)
for value_data in summary["data"]:
if value_data["name"] == field_name:
value_data["type"] = bad_type
if field_name == "filename":
value_data["value"] = "123"
with pytest.raises(
ValueError,
match=rf"field '{field_name}' has type '{bad_type}'; expected '{expected_type}'",
):
results.SubBenchmarkResult(
{
"name": "copy",
"axes": [],
"states": [
{
"name": "Device=0",
"axis_values": [],
"summaries": [summary],
"is_skipped": False,
}
],
},
"",
)
def test_benchmark_result_uses_none_for_unavailable_samples(tmp_path):
json_fn = tmp_path / "result.json"
write_json(
json_fn,
{
"benchmarks": [
{
"name": "copy",
"axes": [block_size_axis(8, 9)],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"summaries": [],
"is_skipped": False,
},
{
"name": "Device=0 BlockSize=2^9",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "512",
}
],
"summaries": [
sample_times_summary(
"result.json-bin/missing.bin",
3,
),
sample_frequencies_summary(
"result.json-freqs-bin/missing.bin",
3,
),
],
"is_skipped": False,
},
],
}
]
},
)
result = results.BenchmarkResult.from_json(json_fn)
states = result.subbenches["copy"].states
assert states[0].samples is None
assert states[1].samples is None
assert states[0].frequencies is None
assert states[1].frequencies is None
assert result.centers(lambda samples: pytest.fail("estimator should not run")) == {
"copy": {
"BlockSize[pow2]=8": None,
"BlockSize[pow2]=9": None,
}
}
assert result.centers_with_frequencies(
lambda samples, frequencies: pytest.fail("estimator should not run")
) == {
"copy": {
"BlockSize[pow2]=8": None,
"BlockSize[pow2]=9": None,
}
}
def test_benchmark_result_rejects_mismatched_sample_and_frequency_counts(tmp_path):
bin_dir = tmp_path / "result.json-bin"
bin_dir.mkdir()
(bin_dir / "0.bin").write_bytes(struct.pack("<3f", 1.0, 2.0, 4.0))
freq_bin_dir = tmp_path / "result.json-freqs-bin"
freq_bin_dir.mkdir()
(freq_bin_dir / "0.bin").write_bytes(struct.pack("<2f", 100.0, 200.0))
json_fn = tmp_path / "result.json"
write_json(
json_fn,
{
"benchmarks": [
{
"name": "copy",
"axes": [block_size_axis(8)],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"summaries": [
sample_times_summary("result.json-bin/0.bin", 3),
sample_frequencies_summary(
"result.json-freqs-bin/0.bin",
2,
),
],
"is_skipped": False,
}
],
}
]
},
)
with pytest.raises(ValueError, match="sample count .* frequency count"):
results.BenchmarkResult.from_json(json_fn)

View File

@@ -0,0 +1,376 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
import importlib.util
import json
from pathlib import Path
def load_nvbench_json_summary():
module_path = (
Path(__file__).resolve().parents[1] / "scripts" / "nvbench_json_summary.py"
)
spec = importlib.util.spec_from_file_location("nvbench_json_summary", module_path)
assert spec is not None
assert spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
nvbench_json_summary = load_nvbench_json_summary()
def write_result_json(path):
path.write_text(
json.dumps(
{
"devices": [
{
"id": 0,
"name": "Test GPU",
}
],
"benchmarks": [
{
"name": "copy",
"devices": [0],
"axes": [
{
"name": "BlockSize",
"type": "int64",
"flags": "pow2",
"values": [
{
"input_string": "8",
"description": "2^8 = 256",
"value": 256,
}
],
}
],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"device": 0,
"type_config_index": 0,
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"summaries": [
{
"tag": "nv/cold/time/gpu/sample_size",
"name": "Samples",
"hint": "sample_size",
"data": [
{
"name": "value",
"type": "int64",
"value": "12",
}
],
},
{
"tag": "nv/cold/time/gpu/mean",
"name": "GPU Time",
"hint": "duration",
"data": [
{
"name": "value",
"type": "float64",
"value": "1.25e-6",
}
],
},
{
"tag": "nv/cold/time/gpu/stdev/relative",
"name": "Noise",
"hint": "percentage",
"data": [
{
"name": "value",
"type": "float64",
"value": "0.015",
}
],
},
{
"tag": "nv/cold/bw/global/bytes_per_second",
"name": "GlobalMem BW",
"hint": "byte_rate",
"data": [
{
"name": "value",
"type": "float64",
"value": "2.5e9",
}
],
},
{
"tag": "nv/cold/bw/global/utilization",
"name": "BWUtil",
"hint": "percentage",
"hide": False,
"data": [
{
"name": "value",
"type": "float64",
"value": "0.625",
}
],
},
{
"tag": "nv/cold/time/gpu/min",
"name": "Min GPU Time",
"hint": "duration",
"hide": "Hidden by default.",
"data": [
{
"name": "value",
"type": "float64",
"value": "1.0e-6",
}
],
},
],
"is_skipped": False,
}
],
}
],
}
),
encoding="utf-8",
)
def test_json_summary_formats_nvbench_style_markdown(tmp_path):
json_path = tmp_path / "result.json"
write_result_json(json_path)
result = nvbench_json_summary.BenchmarkResult.from_json(json_path)
report = nvbench_json_summary.format_result(result)
assert "# Benchmark Results" in report
assert "## copy" in report
assert "### [0] Test GPU" in report
assert (
"| BlockSize | Samples | GPU Time | Noise | GlobalMem BW | BWUtil |" in report
)
assert (
"| 2^8 = 256 | 12x | 1.250 us | 1.50% | 2.500 GB/s | 62.50% |" in report
)
assert "Min GPU Time" not in report
def test_json_summary_formats_null_summary_value_as_blank():
summary = nvbench_json_summary.BenchmarkResultSummary(
tag="nv/cold/time/gpu/stdev/relative",
name="Noise",
hint="percentage",
hide=None,
description=None,
data={"value": None},
)
assert nvbench_json_summary.format_summary(summary) == ""
def test_json_summary_formats_axis_values_like_markdown_printer():
axes_by_name = {
"BlockSize": {
"name": "BlockSize",
"type": "int64",
"flags": "pow2",
},
"NumBlocks": {
"name": "NumBlocks",
"type": "int64",
"flags": "",
},
"Duration": {
"name": "Duration",
"type": "float64",
"flags": "",
},
"Nullable": {
"name": "Nullable",
"type": "int64",
"flags": "",
},
}
assert nvbench_json_summary.format_axis_value(
{"name": "BlockSize", "type": "int64", "value": "256"}, axes_by_name
) == ("BlockSize", "2^8 = 256")
assert nvbench_json_summary.format_axis_value(
{"name": "NumBlocks", "type": "int64", "value": "64"}, axes_by_name
) == ("NumBlocks", "64")
assert nvbench_json_summary.format_axis_value(
{"name": "Duration", "type": "float64", "value": "0.123456789"},
axes_by_name,
) == ("Duration", "0.12346")
assert nvbench_json_summary.format_axis_value(
{"name": "Nullable", "type": "int64", "value": None}, axes_by_name
) == ("Nullable", "")
def test_json_summary_formats_state_with_null_axis_values(tmp_path):
json_path = tmp_path / "result.json"
json_path.write_text(
json.dumps(
{
"devices": [
{
"id": 0,
"name": "Test GPU",
}
],
"benchmarks": [
{
"name": "no_axes",
"devices": [0],
"axes": None,
"states": [
{
"name": "Device=0",
"device": 0,
"axis_values": None,
"summaries": [
{
"tag": "nv/cold/time/gpu/sample_size",
"name": "Samples",
"hint": "sample_size",
"data": [
{
"name": "value",
"type": "int64",
"value": "7",
}
],
}
],
"is_skipped": False,
}
],
}
],
}
),
encoding="utf-8",
)
result = nvbench_json_summary.BenchmarkResult.from_json(json_path)
report = nvbench_json_summary.format_result(result)
assert "## no_axes" in report
assert "| Samples |" in report
assert "| 7x |" in report
def test_json_summary_omits_skipped_states(tmp_path):
json_path = tmp_path / "result.json"
json_path.write_text(
json.dumps(
{
"devices": [
{
"id": 0,
"name": "Test GPU",
}
],
"benchmarks": [
{
"name": "copy",
"devices": [0],
"axes": [
{
"name": "BlockSize",
"type": "int64",
"flags": "pow2",
"values": [
{
"input_string": "8",
"description": "2^8 = 256",
"value": 256,
},
{
"input_string": "9",
"description": "2^9 = 512",
"value": 512,
},
],
}
],
"states": [
{
"name": "Device=0 BlockSize=2^8",
"device": 0,
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "256",
}
],
"summaries": None,
"is_skipped": True,
"skip_reason": "Deadlock detected",
},
{
"name": "Device=0 BlockSize=2^9",
"device": 0,
"axis_values": [
{
"name": "BlockSize",
"type": "int64",
"value": "512",
}
],
"summaries": [
{
"tag": "nv/cold/time/gpu/sample_size",
"name": "Samples",
"hint": "sample_size",
"data": [
{
"name": "value",
"type": "int64",
"value": "3",
}
],
}
],
"is_skipped": False,
},
],
}
],
}
),
encoding="utf-8",
)
result = nvbench_json_summary.BenchmarkResult.from_json(json_path)
report = nvbench_json_summary.format_result(result)
assert "Skip Reason" not in report
assert "Deadlock detected" not in report
assert "2^8 = 256" not in report
assert "2^9 = 512" in report
assert "3x" in report
def test_json_summary_cli_writes_output_file(tmp_path):
json_path = tmp_path / "result.json"
output_path = tmp_path / "summary.md"
write_result_json(json_path)
rc = nvbench_json_summary.main([str(json_path), "--output", str(output_path)])
assert rc == 0
assert "GlobalMem BW" in output_path.read_text(encoding="utf-8")