mirror of
https://github.com/ROCm/composable_kernel.git
synced 2026-05-13 17:55:48 +00:00
[CK][CK TILE] Dispatcher kernel selection heuristic for grouped conv (#6327) ## Motivation The ML heuristic in dispatcher does not support grouped-conv operator yet. In this PR, the support for fwd, bdw-data, and bwd-weight grouped-conv kernels have been added. A tile_engine utility has also been added to compile and run any selected kernel configuration through dispatcher infrastructure. ## Technical Details 1. Tile engine utility is added to benchmark each shape with all the possible kernel+tile_size combinations here - [https://github.com/ROCm/rocm-libraries/blob/users/yraparti/ck/dispatcher-grouped-conv-heuristics/projects/composablekernel/tile_engine/ops/grouped_conv/grouped_conv_full_benchmark.py](url) 2. New LGBM regressor models for grouped conv are added to models directory. We have 3 separate models for fwd, bwd-data, and bwd-weights [https://github.com/ROCm/rocm-libraries/tree/users/yraparti/ck/dispatcher-grouped-conv-heuristics/projects/composablekernel/dispatcher/heuristics/models](url) 3. Implemented lazy GPU initialization (dispatcher/python) - **Issue**: ProcessPoolExecutor fork() + GPU context caused memory access faults - **Solution**: Mirror FMHA pattern - defer GPU initialization until first run() - **Changes**: - setup_multiple_grouped_conv_dispatchers() returns List[Path], not loaded libs - GpuGroupedConvRunner.__init__() no longer calls ctypes.CDLL - Added _ensure_initialized() method for lazy GPU loading - GPU context created only on first run() call - **Benefit**: Parallel compilation now works without GPU conflicts 4. Addressed few miscellaneous issues such as: - Fixed BF16->FP16 naming bug in the dispatcher wrapper - Added new tile sizes, and comp_v5 pipeline to the arch spec to expand the kernel selection - Added automatic padding support for unsupported shapes in dispatcher runner - Created a single source of truth between tile_engine and dispatcher about the architecture and tile_size details - Build a validation scripts to compare oracle_best vs ml_heuristic comparison ## Test Plan 1. Validated fwd, bwd-data, and bwd-weight kernels with both known and unseen data sets with up to 300 problems. 2. Ensured that test cases are added in both dispatcher and tile_engine to validate the heuristic. ## Test Result Results on Unseen shapes validated on gfx950 #### Forward Pass Model - **Training Data**: 48,845 measurements across 1,372 unique problem shapes - **Validation Set**: 300 unseen problems from model crawler - **Validation Performance** (vs. oracle): - Mean Efficiency: **93.05%** - Median Efficiency: **96.8%** - P10 Efficiency: **79.9%** #### Backward Data Gradient (bwd_data) Model - **Training Data**: 18,773 measurements across 891 unique problem shapes - **Validation Set**: 300 unseen problems from model crawler - **Validation Performance** (vs. oracle): - Mean Efficiency: **93.8%** - Median Efficiency: **96.5%** - P10 Efficiency: **82.9%** #### Backward Weight Gradient (bwd_weight) Model - **Training Data**: 34,900 measurements across 1,508 unique problem shapes - **Validation Set**: 300 unseen problems from model crawler - **Validation Performance** (vs. oracle): - Mean Efficiency: **96.1%** - Median Efficiency: **99.2%** - P10 Efficiency: **89.4%** ## Submission Checklist - [ x] Look over the contributing guidelines at https://github.com/ROCm/ROCm/blob/develop/CONTRIBUTING.md#pull-requests.
274 lines
10 KiB
Python
274 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
"""
|
|
Predictor for CK Tile kernel performance.
|
|
|
|
Loads trained LightGBM models and provides:
|
|
- predict_tflops(): predicted TFLOPS for a single (problem, kernel) pair
|
|
- predict_latency(): predicted latency in ms
|
|
- predict_bandwidth(): predicted bandwidth in GB/s
|
|
- predict_all(): all three predictions at once
|
|
- rank_kernels(): rank all candidate kernels by predicted TFLOPS
|
|
- select_best(): return the best kernel ID
|
|
|
|
Usage:
|
|
predictor = Predictor("models/gemm_universal_fp8_gfx950")
|
|
best_kernel = predictor.select_best(
|
|
problem={"m": 128, "n": 1536, "k": 7168, "dtype": "fp8", "layout": "rcr"},
|
|
kernel_configs=[...],
|
|
)
|
|
"""
|
|
|
|
import gzip
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import lightgbm as lgb
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from feature_engine import GemmUniversalFeatureEngine
|
|
|
|
|
|
class Predictor:
|
|
"""Loads trained models and feature spec for kernel performance prediction.
|
|
|
|
Parameters
|
|
----------
|
|
model_dir : str or Path
|
|
Directory containing model artifacts:
|
|
- model_tflops.lgbm (required)
|
|
- model_latency.lgbm (optional)
|
|
- model_bandwidth.lgbm (optional)
|
|
- feature_spec.json (required)
|
|
|
|
feature_engine : FeatureEngine, optional
|
|
Override the feature engine. If None, constructs one from feature_spec.json.
|
|
"""
|
|
|
|
def __init__(self, model_dir: str | Path, feature_engine=None):
|
|
self._model_dir = Path(model_dir)
|
|
self._models: dict[str, lgb.Booster] = {}
|
|
|
|
spec_path = self._model_dir / "feature_spec.json"
|
|
if spec_path.exists():
|
|
with open(spec_path) as f:
|
|
self._spec = json.load(f)
|
|
else:
|
|
self._spec = {}
|
|
|
|
self._log_targets = set(self._spec.get("log_targets", []))
|
|
|
|
if feature_engine is not None:
|
|
self._feature_engine = feature_engine
|
|
else:
|
|
self._feature_engine = GemmUniversalFeatureEngine()
|
|
|
|
# Build a column index map so models trained with an older (smaller)
|
|
# feature set still work with a feature engine that has since been
|
|
# extended. The model's feature_spec.json["feature_names"] is the
|
|
# ground truth of what columns the booster expects, in order.
|
|
self._feature_indices: Optional[np.ndarray] = None
|
|
spec_names = self._spec.get("feature_names")
|
|
if spec_names:
|
|
engine_names = self._feature_engine.get_feature_names()
|
|
if list(spec_names) != list(engine_names):
|
|
idx_map = {n: i for i, n in enumerate(engine_names)}
|
|
missing = [n for n in spec_names if n not in idx_map]
|
|
if missing:
|
|
raise ValueError(
|
|
f"{self._feature_engine.__class__.__name__} cannot "
|
|
f"supply features required by model {self._model_dir.name}: "
|
|
f"{missing[:5]}{'...' if len(missing) > 5 else ''}"
|
|
)
|
|
self._feature_indices = np.array(
|
|
[idx_map[n] for n in spec_names], dtype=np.intp
|
|
)
|
|
|
|
def _select_features(self, X: np.ndarray) -> np.ndarray:
|
|
"""Subset/reorder engine output to match the loaded model's spec."""
|
|
if self._feature_indices is None:
|
|
return X
|
|
return X[:, self._feature_indices]
|
|
|
|
def _load_model(self, target: str) -> Optional[lgb.Booster]:
|
|
"""Lazy-load a model for the given target.
|
|
|
|
Automatically decompresses .lgbm.gz files if the .lgbm file doesn't exist.
|
|
The decompressed file is cached to disk for subsequent loads.
|
|
"""
|
|
if target in self._models:
|
|
return self._models[target]
|
|
|
|
path = self._model_dir / f"model_{target}.lgbm"
|
|
gz_path = self._model_dir / f"model_{target}.lgbm.gz"
|
|
|
|
# Auto-decompress if needed
|
|
if not path.exists() and gz_path.exists():
|
|
with gzip.open(gz_path, "rb") as f_in:
|
|
with open(path, "wb") as f_out:
|
|
f_out.write(f_in.read())
|
|
|
|
if not path.exists():
|
|
return None
|
|
|
|
model = lgb.Booster(model_file=str(path))
|
|
self._models[target] = model
|
|
return model
|
|
|
|
def _predict_single(self, target: str, problem: dict, kernel_config: dict) -> float:
|
|
"""Predict a single target value, applying inverse log transform if needed."""
|
|
model = self._load_model(target)
|
|
if model is None:
|
|
raise FileNotFoundError(f"No model_{target}.lgbm in {self._model_dir}")
|
|
features = self._feature_engine.extract(problem, kernel_config).reshape(1, -1)
|
|
features = self._select_features(features)
|
|
raw = float(model.predict(features)[0])
|
|
if target in self._log_targets:
|
|
return float(np.expm1(raw))
|
|
# Clamp to non-negative even for non-log models
|
|
return float(max(0.0, raw))
|
|
|
|
def predict_tflops(self, problem: dict, kernel_config: dict) -> float:
|
|
"""Predict TFLOPS for a single (problem, kernel) pair.
|
|
|
|
Returns a real TFLOPS estimate (interpretable, usable as DE surrogate).
|
|
If the model was trained in log-space, the inverse transform is applied
|
|
automatically.
|
|
"""
|
|
return self._predict_single("tflops", problem, kernel_config)
|
|
|
|
def predict_latency(self, problem: dict, kernel_config: dict) -> float:
|
|
"""Predict latency in milliseconds for a single (problem, kernel) pair."""
|
|
return self._predict_single("latency", problem, kernel_config)
|
|
|
|
def predict_bandwidth(self, problem: dict, kernel_config: dict) -> float:
|
|
"""Predict bandwidth in GB/s for a single (problem, kernel) pair."""
|
|
return self._predict_single("bandwidth", problem, kernel_config)
|
|
|
|
def predict_all(self, problem: dict, kernel_config: dict) -> dict[str, float]:
|
|
"""Predict all available targets for a single (problem, kernel) pair.
|
|
|
|
Returns dict with keys 'tflops', 'latency_ms', 'bandwidth_gb_s' (if models exist).
|
|
|
|
Note: Applies inverse log transform for targets in log_targets and clamps
|
|
negatives to 0.0, consistent with _predict_single().
|
|
"""
|
|
features = self._feature_engine.extract(problem, kernel_config).reshape(1, -1)
|
|
features = self._select_features(features)
|
|
result = {}
|
|
for target, key in [
|
|
("tflops", "tflops"),
|
|
("latency", "latency_ms"),
|
|
("bandwidth", "bandwidth_gb_s"),
|
|
]:
|
|
model = self._load_model(target)
|
|
if model is not None:
|
|
raw = float(model.predict(features)[0])
|
|
# Apply inverse log transform if model was trained in log-space
|
|
if target in self._log_targets:
|
|
result[key] = float(np.expm1(raw))
|
|
else:
|
|
# Clamp to non-negative even for non-log models
|
|
result[key] = float(max(0.0, raw))
|
|
return result
|
|
|
|
def rank_kernels(
|
|
self, problem: dict, kernel_configs: list[dict]
|
|
) -> list[tuple[str, float]]:
|
|
"""Rank candidate kernels by predicted TFLOPS (descending).
|
|
|
|
Parameters
|
|
----------
|
|
problem : dict
|
|
Problem specification with keys: m, n, k, dtype, layout, split_k.
|
|
kernel_configs : list of dict
|
|
Each dict must have a 'kernel_name' key plus kernel parameters.
|
|
|
|
Returns
|
|
-------
|
|
list of (kernel_name, predicted_tflops) tuples, sorted descending.
|
|
"""
|
|
if not kernel_configs:
|
|
return []
|
|
|
|
model = self._load_model("tflops")
|
|
if model is None:
|
|
raise FileNotFoundError(f"No model_tflops.lgbm in {self._model_dir}")
|
|
|
|
rows = []
|
|
for kc in kernel_configs:
|
|
merged = {**problem, **kc}
|
|
rows.append(merged)
|
|
|
|
df = pd.DataFrame(rows)
|
|
X = self._feature_engine.extract_batch(df)
|
|
X = self._select_features(X)
|
|
preds = model.predict(X)
|
|
if "tflops" in self._log_targets:
|
|
preds = np.expm1(preds)
|
|
|
|
results = []
|
|
for i, kc in enumerate(kernel_configs):
|
|
name = kc.get("kernel_name", f"kernel_{i}")
|
|
results.append((name, float(preds[i])))
|
|
|
|
results.sort(key=lambda x: -x[1])
|
|
return results
|
|
|
|
def select_best(self, problem: dict, kernel_configs: list[dict]) -> str:
|
|
"""Return the kernel_name of the best predicted kernel."""
|
|
ranked = self.rank_kernels(problem, kernel_configs)
|
|
if not ranked:
|
|
raise ValueError("No kernel configs provided")
|
|
return ranked[0][0]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Predict kernel performance")
|
|
parser.add_argument(
|
|
"--model_dir", required=True, help="Directory with trained models"
|
|
)
|
|
parser.add_argument("--m", type=int, required=True)
|
|
parser.add_argument("--n", type=int, required=True)
|
|
parser.add_argument("--k", type=int, required=True)
|
|
parser.add_argument("--layout", default="rcr")
|
|
parser.add_argument("--dtype", default="fp8")
|
|
args = parser.parse_args()
|
|
|
|
predictor = Predictor(args.model_dir)
|
|
problem = {
|
|
"m": args.m,
|
|
"n": args.n,
|
|
"k": args.k,
|
|
"dtype": args.dtype,
|
|
"layout": args.layout,
|
|
"split_k": 1,
|
|
}
|
|
|
|
print(f"Loading models from {args.model_dir}...")
|
|
print(
|
|
f"Problem: M={args.m} N={args.n} K={args.k} dtype={args.dtype} layout={args.layout}"
|
|
)
|
|
|
|
data_dir = Path(args.model_dir).parent.parent / "data"
|
|
if data_dir.exists():
|
|
for pq in data_dir.glob("*.parquet"):
|
|
df = pd.read_parquet(pq)
|
|
kernel_names = df["kernel_name"].unique()
|
|
configs = []
|
|
for kn in kernel_names[:10]:
|
|
row = df[df["kernel_name"] == kn].iloc[0]
|
|
configs.append(row.to_dict())
|
|
if configs:
|
|
ranked = predictor.rank_kernels(problem, configs)
|
|
print(f"\nTop 5 kernels (from {len(configs)} candidates):")
|
|
for name, tflops in ranked[:5]:
|
|
print(f" {tflops:8.2f} TFLOPS {name}")
|
|
break
|