Files
Yaswanth Raparti 6989cf800c [rocm-libraries] ROCm/rocm-libraries#6327 (commit 1e7a12e)
[CK][CK TILE] Dispatcher kernel selection heuristic for
 grouped conv (#6327)

## Motivation
The ML heuristic in dispatcher does not support grouped-conv operator
yet. In this PR, the support for fwd, bdw-data, and bwd-weight
grouped-conv kernels have been added. A tile_engine utility has also
been added to compile and run any selected kernel configuration through
dispatcher infrastructure.

## Technical Details

1. Tile engine utility is added to benchmark each shape with all the
possible kernel+tile_size combinations here -
[https://github.com/ROCm/rocm-libraries/blob/users/yraparti/ck/dispatcher-grouped-conv-heuristics/projects/composablekernel/tile_engine/ops/grouped_conv/grouped_conv_full_benchmark.py](url)
2. New LGBM regressor models for grouped conv are added to models
directory. We have 3 separate models for fwd, bwd-data, and bwd-weights
[https://github.com/ROCm/rocm-libraries/tree/users/yraparti/ck/dispatcher-grouped-conv-heuristics/projects/composablekernel/dispatcher/heuristics/models](url)
3. Implemented lazy GPU initialization (dispatcher/python)
- **Issue**: ProcessPoolExecutor fork() + GPU context caused memory
access faults
- **Solution**: Mirror FMHA pattern - defer GPU initialization until
first run()
  - **Changes**:
- setup_multiple_grouped_conv_dispatchers() returns List[Path], not
loaded libs
    - GpuGroupedConvRunner.__init__() no longer calls ctypes.CDLL
    - Added _ensure_initialized() method for lazy GPU loading
    - GPU context created only on first run() call
  - **Benefit**: Parallel compilation now works without GPU conflicts
4. Addressed few miscellaneous issues such as:
  - Fixed BF16->FP16 naming bug in the dispatcher wrapper
- Added new tile sizes, and comp_v5 pipeline to the arch spec to expand
the kernel selection
- Added automatic padding support for unsupported shapes in dispatcher
runner
- Created a single source of truth between tile_engine and dispatcher
about the architecture and tile_size details
- Build a validation scripts to compare oracle_best vs ml_heuristic
comparison

## Test Plan

1. Validated fwd, bwd-data, and bwd-weight kernels with both known and
unseen data sets with up to 300 problems.
2. Ensured that test cases are added in both dispatcher and tile_engine
to validate the heuristic.

## Test Result
Results on Unseen shapes validated on gfx950
#### Forward Pass Model
- **Training Data**: 48,845 measurements across 1,372 unique problem
shapes
- **Validation Set**: 300 unseen problems from model crawler
- **Validation Performance** (vs. oracle):
  - Mean Efficiency: **93.05%**
  - Median Efficiency: **96.8%**
  - P10 Efficiency: **79.9%**

#### Backward Data Gradient (bwd_data) Model
- **Training Data**: 18,773 measurements across 891 unique problem
shapes
- **Validation Set**: 300 unseen problems from model crawler
- **Validation Performance** (vs. oracle):
  - Mean Efficiency: **93.8%**
  - Median Efficiency: **96.5%**
  - P10 Efficiency: **82.9%**

#### Backward Weight Gradient (bwd_weight) Model
- **Training Data**: 34,900 measurements across 1,508 unique problem
shapes
- **Validation Set**: 300 unseen problems from model crawler
- **Validation Performance** (vs. oracle):
  - Mean Efficiency: **96.1%**
  - Median Efficiency: **99.2%**
  - P10 Efficiency: **89.4%**

## Submission Checklist

- [ x] Look over the contributing guidelines at
https://github.com/ROCm/ROCm/blob/develop/CONTRIBUTING.md#pull-requests.
2026-05-08 20:48:42 +00:00

274 lines
10 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
Predictor for CK Tile kernel performance.
Loads trained LightGBM models and provides:
- predict_tflops(): predicted TFLOPS for a single (problem, kernel) pair
- predict_latency(): predicted latency in ms
- predict_bandwidth(): predicted bandwidth in GB/s
- predict_all(): all three predictions at once
- rank_kernels(): rank all candidate kernels by predicted TFLOPS
- select_best(): return the best kernel ID
Usage:
predictor = Predictor("models/gemm_universal_fp8_gfx950")
best_kernel = predictor.select_best(
problem={"m": 128, "n": 1536, "k": 7168, "dtype": "fp8", "layout": "rcr"},
kernel_configs=[...],
)
"""
import gzip
import json
from pathlib import Path
from typing import Optional
import lightgbm as lgb
import numpy as np
import pandas as pd
from feature_engine import GemmUniversalFeatureEngine
class Predictor:
"""Loads trained models and feature spec for kernel performance prediction.
Parameters
----------
model_dir : str or Path
Directory containing model artifacts:
- model_tflops.lgbm (required)
- model_latency.lgbm (optional)
- model_bandwidth.lgbm (optional)
- feature_spec.json (required)
feature_engine : FeatureEngine, optional
Override the feature engine. If None, constructs one from feature_spec.json.
"""
def __init__(self, model_dir: str | Path, feature_engine=None):
self._model_dir = Path(model_dir)
self._models: dict[str, lgb.Booster] = {}
spec_path = self._model_dir / "feature_spec.json"
if spec_path.exists():
with open(spec_path) as f:
self._spec = json.load(f)
else:
self._spec = {}
self._log_targets = set(self._spec.get("log_targets", []))
if feature_engine is not None:
self._feature_engine = feature_engine
else:
self._feature_engine = GemmUniversalFeatureEngine()
# Build a column index map so models trained with an older (smaller)
# feature set still work with a feature engine that has since been
# extended. The model's feature_spec.json["feature_names"] is the
# ground truth of what columns the booster expects, in order.
self._feature_indices: Optional[np.ndarray] = None
spec_names = self._spec.get("feature_names")
if spec_names:
engine_names = self._feature_engine.get_feature_names()
if list(spec_names) != list(engine_names):
idx_map = {n: i for i, n in enumerate(engine_names)}
missing = [n for n in spec_names if n not in idx_map]
if missing:
raise ValueError(
f"{self._feature_engine.__class__.__name__} cannot "
f"supply features required by model {self._model_dir.name}: "
f"{missing[:5]}{'...' if len(missing) > 5 else ''}"
)
self._feature_indices = np.array(
[idx_map[n] for n in spec_names], dtype=np.intp
)
def _select_features(self, X: np.ndarray) -> np.ndarray:
"""Subset/reorder engine output to match the loaded model's spec."""
if self._feature_indices is None:
return X
return X[:, self._feature_indices]
def _load_model(self, target: str) -> Optional[lgb.Booster]:
"""Lazy-load a model for the given target.
Automatically decompresses .lgbm.gz files if the .lgbm file doesn't exist.
The decompressed file is cached to disk for subsequent loads.
"""
if target in self._models:
return self._models[target]
path = self._model_dir / f"model_{target}.lgbm"
gz_path = self._model_dir / f"model_{target}.lgbm.gz"
# Auto-decompress if needed
if not path.exists() and gz_path.exists():
with gzip.open(gz_path, "rb") as f_in:
with open(path, "wb") as f_out:
f_out.write(f_in.read())
if not path.exists():
return None
model = lgb.Booster(model_file=str(path))
self._models[target] = model
return model
def _predict_single(self, target: str, problem: dict, kernel_config: dict) -> float:
"""Predict a single target value, applying inverse log transform if needed."""
model = self._load_model(target)
if model is None:
raise FileNotFoundError(f"No model_{target}.lgbm in {self._model_dir}")
features = self._feature_engine.extract(problem, kernel_config).reshape(1, -1)
features = self._select_features(features)
raw = float(model.predict(features)[0])
if target in self._log_targets:
return float(np.expm1(raw))
# Clamp to non-negative even for non-log models
return float(max(0.0, raw))
def predict_tflops(self, problem: dict, kernel_config: dict) -> float:
"""Predict TFLOPS for a single (problem, kernel) pair.
Returns a real TFLOPS estimate (interpretable, usable as DE surrogate).
If the model was trained in log-space, the inverse transform is applied
automatically.
"""
return self._predict_single("tflops", problem, kernel_config)
def predict_latency(self, problem: dict, kernel_config: dict) -> float:
"""Predict latency in milliseconds for a single (problem, kernel) pair."""
return self._predict_single("latency", problem, kernel_config)
def predict_bandwidth(self, problem: dict, kernel_config: dict) -> float:
"""Predict bandwidth in GB/s for a single (problem, kernel) pair."""
return self._predict_single("bandwidth", problem, kernel_config)
def predict_all(self, problem: dict, kernel_config: dict) -> dict[str, float]:
"""Predict all available targets for a single (problem, kernel) pair.
Returns dict with keys 'tflops', 'latency_ms', 'bandwidth_gb_s' (if models exist).
Note: Applies inverse log transform for targets in log_targets and clamps
negatives to 0.0, consistent with _predict_single().
"""
features = self._feature_engine.extract(problem, kernel_config).reshape(1, -1)
features = self._select_features(features)
result = {}
for target, key in [
("tflops", "tflops"),
("latency", "latency_ms"),
("bandwidth", "bandwidth_gb_s"),
]:
model = self._load_model(target)
if model is not None:
raw = float(model.predict(features)[0])
# Apply inverse log transform if model was trained in log-space
if target in self._log_targets:
result[key] = float(np.expm1(raw))
else:
# Clamp to non-negative even for non-log models
result[key] = float(max(0.0, raw))
return result
def rank_kernels(
self, problem: dict, kernel_configs: list[dict]
) -> list[tuple[str, float]]:
"""Rank candidate kernels by predicted TFLOPS (descending).
Parameters
----------
problem : dict
Problem specification with keys: m, n, k, dtype, layout, split_k.
kernel_configs : list of dict
Each dict must have a 'kernel_name' key plus kernel parameters.
Returns
-------
list of (kernel_name, predicted_tflops) tuples, sorted descending.
"""
if not kernel_configs:
return []
model = self._load_model("tflops")
if model is None:
raise FileNotFoundError(f"No model_tflops.lgbm in {self._model_dir}")
rows = []
for kc in kernel_configs:
merged = {**problem, **kc}
rows.append(merged)
df = pd.DataFrame(rows)
X = self._feature_engine.extract_batch(df)
X = self._select_features(X)
preds = model.predict(X)
if "tflops" in self._log_targets:
preds = np.expm1(preds)
results = []
for i, kc in enumerate(kernel_configs):
name = kc.get("kernel_name", f"kernel_{i}")
results.append((name, float(preds[i])))
results.sort(key=lambda x: -x[1])
return results
def select_best(self, problem: dict, kernel_configs: list[dict]) -> str:
"""Return the kernel_name of the best predicted kernel."""
ranked = self.rank_kernels(problem, kernel_configs)
if not ranked:
raise ValueError("No kernel configs provided")
return ranked[0][0]
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Predict kernel performance")
parser.add_argument(
"--model_dir", required=True, help="Directory with trained models"
)
parser.add_argument("--m", type=int, required=True)
parser.add_argument("--n", type=int, required=True)
parser.add_argument("--k", type=int, required=True)
parser.add_argument("--layout", default="rcr")
parser.add_argument("--dtype", default="fp8")
args = parser.parse_args()
predictor = Predictor(args.model_dir)
problem = {
"m": args.m,
"n": args.n,
"k": args.k,
"dtype": args.dtype,
"layout": args.layout,
"split_k": 1,
}
print(f"Loading models from {args.model_dir}...")
print(
f"Problem: M={args.m} N={args.n} K={args.k} dtype={args.dtype} layout={args.layout}"
)
data_dir = Path(args.model_dir).parent.parent / "data"
if data_dir.exists():
for pq in data_dir.glob("*.parquet"):
df = pd.read_parquet(pq)
kernel_names = df["kernel_name"].unique()
configs = []
for kn in kernel_names[:10]:
row = df[df["kernel_name"] == kn].iloc[0]
configs.append(row.to_dict())
if configs:
ranked = predictor.rank_kernels(problem, configs)
print(f"\nTop 5 kernels (from {len(configs)} candidates):")
for name, tflops in ranked[:5]:
print(f" {tflops:8.2f} TFLOPS {name}")
break