Files
composable_kernel/dispatcher/heuristics/convert_csv_to_parquet.py
Yaswanth Raparti 6989cf800c [rocm-libraries] ROCm/rocm-libraries#6327 (commit 1e7a12e)
[CK][CK TILE] Dispatcher kernel selection heuristic for
 grouped conv (#6327)

## Motivation
The ML heuristic in dispatcher does not support grouped-conv operator
yet. In this PR, the support for fwd, bdw-data, and bwd-weight
grouped-conv kernels have been added. A tile_engine utility has also
been added to compile and run any selected kernel configuration through
dispatcher infrastructure.

## Technical Details

1. Tile engine utility is added to benchmark each shape with all the
possible kernel+tile_size combinations here -
[https://github.com/ROCm/rocm-libraries/blob/users/yraparti/ck/dispatcher-grouped-conv-heuristics/projects/composablekernel/tile_engine/ops/grouped_conv/grouped_conv_full_benchmark.py](url)
2. New LGBM regressor models for grouped conv are added to models
directory. We have 3 separate models for fwd, bwd-data, and bwd-weights
[https://github.com/ROCm/rocm-libraries/tree/users/yraparti/ck/dispatcher-grouped-conv-heuristics/projects/composablekernel/dispatcher/heuristics/models](url)
3. Implemented lazy GPU initialization (dispatcher/python)
- **Issue**: ProcessPoolExecutor fork() + GPU context caused memory
access faults
- **Solution**: Mirror FMHA pattern - defer GPU initialization until
first run()
  - **Changes**:
- setup_multiple_grouped_conv_dispatchers() returns List[Path], not
loaded libs
    - GpuGroupedConvRunner.__init__() no longer calls ctypes.CDLL
    - Added _ensure_initialized() method for lazy GPU loading
    - GPU context created only on first run() call
  - **Benefit**: Parallel compilation now works without GPU conflicts
4. Addressed few miscellaneous issues such as:
  - Fixed BF16->FP16 naming bug in the dispatcher wrapper
- Added new tile sizes, and comp_v5 pipeline to the arch spec to expand
the kernel selection
- Added automatic padding support for unsupported shapes in dispatcher
runner
- Created a single source of truth between tile_engine and dispatcher
about the architecture and tile_size details
- Build a validation scripts to compare oracle_best vs ml_heuristic
comparison

## Test Plan

1. Validated fwd, bwd-data, and bwd-weight kernels with both known and
unseen data sets with up to 300 problems.
2. Ensured that test cases are added in both dispatcher and tile_engine
to validate the heuristic.

## Test Result
Results on Unseen shapes validated on gfx950
#### Forward Pass Model
- **Training Data**: 48,845 measurements across 1,372 unique problem
shapes
- **Validation Set**: 300 unseen problems from model crawler
- **Validation Performance** (vs. oracle):
  - Mean Efficiency: **93.05%**
  - Median Efficiency: **96.8%**
  - P10 Efficiency: **79.9%**

#### Backward Data Gradient (bwd_data) Model
- **Training Data**: 18,773 measurements across 891 unique problem
shapes
- **Validation Set**: 300 unseen problems from model crawler
- **Validation Performance** (vs. oracle):
  - Mean Efficiency: **93.8%**
  - Median Efficiency: **96.5%**
  - P10 Efficiency: **82.9%**

#### Backward Weight Gradient (bwd_weight) Model
- **Training Data**: 34,900 measurements across 1,508 unique problem
shapes
- **Validation Set**: 300 unseen problems from model crawler
- **Validation Performance** (vs. oracle):
  - Mean Efficiency: **96.1%**
  - Median Efficiency: **99.2%**
  - P10 Efficiency: **89.4%**

## Submission Checklist

- [ x] Look over the contributing guidelines at
https://github.com/ROCm/ROCm/blob/develop/CONTRIBUTING.md#pull-requests.
2026-05-08 20:48:42 +00:00

483 lines
14 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
"""
Generic CSV to Parquet converter for ML training data.
Works with any operation type (grouped_conv, gemm, fmha, etc.) by auto-detecting
CSV structure and optionally using custom kernel name patterns.
Supported operations:
- Grouped convolution (forward, bwd_data, bwd_weight)
- GEMM Universal
- FMHA
- Any future operations with CSV benchmark output
Usage:
# Auto-detect everything (recommended)
python convert_csv_to_parquet.py \
--input benchmark_data.csv \
--output training_data.parquet \
--arch gfx950
# With custom kernel pattern
python convert_csv_to_parquet.py \
--input benchmark_data.csv \
--output training_data.parquet \
--arch gfx950 \
--kernel-pattern "myop_(?P<variant>\\w+)_(?P<dtype>\\w+)_(?P<config>.*)"
# Override operation type
python convert_csv_to_parquet.py \
--input benchmark_data.csv \
--output training_data.parquet \
--arch gfx950 \
--op-type grouped_conv
Features:
- Auto-detects problem columns from CSV headers
- Generic kernel name parsing with optional custom patterns
- Supports all GPU architectures and data types
- No hardcoded operation-specific logic
- Validates data quality and reports statistics
"""
import argparse
import re
import pandas as pd
from pathlib import Path
from typing import Dict, Any, Optional, Set
# Known metric/metadata columns (will be excluded from problem features)
METRIC_COLUMNS: Set[str] = {
"kernel",
"kernel_name",
"latency_ms",
"tflops",
"bandwidth_gb_s",
"non_zero",
"problem_idx",
"run_id",
"is_valid",
"error_msg",
}
# Hardware profiles for different architectures
HW_PROFILES = {
"gfx950": { # MI300 series
"hw_num_cus": 256,
"hw_simds_per_cu": 4,
"hw_shader_engines": 32,
"hw_max_clock_mhz": 2400,
"hw_max_waves_per_cu": 32,
"hw_wavefront_size": 64,
"hw_lds_capacity": 65536,
"hw_l1_cache_kb": 32,
"hw_l2_cache_kb": 4096,
"hw_l3_cache_kb": 262144,
"hw_num_xcd": 8,
},
"gfx942": { # MI300A
"hw_num_cus": 228,
"hw_simds_per_cu": 4,
"hw_shader_engines": 28,
"hw_max_clock_mhz": 2100,
"hw_max_waves_per_cu": 32,
"hw_wavefront_size": 64,
"hw_lds_capacity": 65536,
"hw_l1_cache_kb": 32,
"hw_l2_cache_kb": 4096,
"hw_l3_cache_kb": 262144,
"hw_num_xcd": 8,
},
"gfx90a": { # MI250X
"hw_num_cus": 110,
"hw_simds_per_cu": 4,
"hw_shader_engines": 8,
"hw_max_clock_mhz": 1700,
"hw_max_waves_per_cu": 32,
"hw_wavefront_size": 64,
"hw_lds_capacity": 65536,
"hw_l1_cache_kb": 16,
"hw_l2_cache_kb": 8192,
"hw_l3_cache_kb": 131072,
"hw_num_xcd": 1,
},
}
def parse_kernel_name_generic(
kernel_name: str, pattern: Optional[str] = None
) -> Dict[str, Any]:
"""
Parse kernel name to extract configuration features.
Auto-detects common patterns or uses custom pattern if provided.
Common patterns:
- grouped_conv: grouped_conv_{variant}_{dtype}_{ndim}d_{block}x{m}x{n}_{pipeline}
- gemm: gemm_{dtype}_{layout}_{tiles}_{pipeline}_{scheduler}
Args:
kernel_name: Kernel name string
pattern: Optional custom regex pattern with named groups
Returns:
Dictionary with extracted features
"""
result = {"kernel_name": kernel_name}
if pattern:
# Use custom pattern
match = re.match(pattern, kernel_name)
if match:
result.update(match.groupdict())
return result
# Auto-detect common patterns
# Pattern 1: grouped_conv_{variant}_{dtype}_{ndim}d_{block}x{m}x{n}_{pipeline}
# [_{wave_mode}] [_dsb] [_si]
# Pipeline alternation is explicit so the suffix tokens do not get swallowed
# by the [a-z0-9]+ pipeline group.
grouped_conv_pattern = (
r"grouped_conv_([a-z_]+)_([a-z0-9]+)_(\d+)d_(\d+)x(\d+)x(\d+)_"
r"(basic_v\d+|basic_async_v\d+|comp_async|compv\d+|mem|preshufflev\d+)"
r"(?:_(intrawave|interwave))?(_dsb)?(_si)?$"
)
match = re.match(grouped_conv_pattern, kernel_name)
if match:
(
variant,
dtype,
ndim,
block_size,
gemm_m,
gemm_n,
pipeline,
wave_mode,
dsb_tok,
si_tok,
) = match.groups()
result.update(
{
"op_type": "grouped_conv",
"variant": variant,
"dtype": dtype,
"ndim_spatial": int(ndim),
"block_size": int(block_size),
"gemm_m_per_block": int(gemm_m),
"gemm_n_per_block": int(gemm_n),
"pipeline": pipeline,
"wave_mode": wave_mode if wave_mode else "intrawave",
"has_dsb": 1 if dsb_tok else 0,
"has_si": 1 if si_tok else 0,
}
)
return result
# Pattern 2: gemm_universal_{dtype}_{layout}_{tiles}_{pipeline}_{scheduler}
gemm_pattern = (
r"gemm_universal_([a-z0-9]+)_([a-z]+)_(\d+x\d+x\d+)_([a-z0-9]+)_([a-z]+)"
)
match = re.match(gemm_pattern, kernel_name)
if match:
dtype, layout, tiles, pipeline, scheduler = match.groups()
tile_parts = tiles.split("x")
result.update(
{
"op_type": "gemm_universal",
"dtype": dtype,
"layout": layout,
"tile_m": int(tile_parts[0]) if len(tile_parts) > 0 else 0,
"tile_n": int(tile_parts[1]) if len(tile_parts) > 1 else 0,
"tile_k": int(tile_parts[2]) if len(tile_parts) > 2 else 0,
"pipeline": pipeline,
"scheduler": scheduler,
}
)
return result
# Pattern 3: Generic fallback - extract dtype, pipeline from common suffixes
# Look for common patterns like _bf16_, _fp16_, _compv3, _mem
dtype_match = re.search(r"_(bf16|fp16|fp8|fp32|int8)", kernel_name)
if dtype_match:
result["dtype"] = dtype_match.group(1)
pipeline_match = re.search(r"_(compv\d+|mem|async)", kernel_name)
if pipeline_match:
result["pipeline"] = pipeline_match.group(1)
# Extract operation type from prefix
op_match = re.match(r"^([a-z_]+?)_", kernel_name)
if op_match:
result["op_type"] = op_match.group(1)
return result
def auto_detect_problem_columns(df: pd.DataFrame) -> list[str]:
"""
Auto-detect problem feature columns by excluding known metric columns.
Args:
df: Input dataframe
Returns:
List of column names that are problem features
"""
return [col for col in df.columns if col not in METRIC_COLUMNS]
def convert_csv_to_parquet(
csv_file: Path,
output_file: Path,
arch: str = "gfx950",
dtype: Optional[str] = None,
variant: Optional[str] = None,
op_type: Optional[str] = None,
kernel_pattern: Optional[str] = None,
) -> pd.DataFrame:
"""
Convert benchmark CSV to parquet training data format.
Args:
csv_file: Input CSV file path
output_file: Output parquet file path
arch: GPU architecture (default: gfx950)
dtype: Data type override (default: auto-detect from kernel name)
variant: Variant override (default: auto-detect from kernel name)
op_type: Operation type override (default: auto-detect)
kernel_pattern: Custom regex pattern for parsing kernel names
Returns:
DataFrame with converted data
"""
print(f"Loading {csv_file}...")
df = pd.read_csv(csv_file)
print(f" Rows: {len(df):,}")
print(f" Columns: {list(df.columns)}")
print()
# Auto-detect problem columns
problem_cols = auto_detect_problem_columns(df)
print(f"Auto-detected {len(problem_cols)} problem feature columns:")
print(f" {', '.join(problem_cols)}")
print()
# Parse kernel names
print("Parsing kernel configurations...")
kernel_configs = {}
parse_errors = 0
for kernel_name in df["kernel"].unique():
try:
config = parse_kernel_name_generic(kernel_name, kernel_pattern)
kernel_configs[kernel_name] = config
except Exception as e:
parse_errors += 1
if parse_errors <= 3: # Show first 3 errors
print(f" Warning: Could not fully parse '{kernel_name}': {e}")
kernel_configs[kernel_name] = {"kernel_name": kernel_name}
if parse_errors > 3:
print(f" ... and {parse_errors - 3} more parsing warnings")
print(f" Parsed {len(kernel_configs)} unique kernels")
print()
# Get hardware profile
hw_profile = HW_PROFILES.get(arch, {})
if not hw_profile:
print(f"Warning: No hardware profile for {arch}, using defaults")
hw_profile = HW_PROFILES["gfx950"]
# Build parquet rows
rows = []
for _, row in df.iterrows():
kernel_name = row["kernel"]
kernel_cfg = kernel_configs.get(kernel_name, {})
# Build parquet row
pq_row = {
# Kernel info
"kernel_name": kernel_name,
# Performance metrics
"latency_ms": float(row["latency_ms"]),
"tflops": float(row["tflops"]),
}
# Add optional columns if they exist
if "non_zero" in row:
pq_row["non_zero"] = int(row["non_zero"])
if "problem_idx" in row:
pq_row["problem_idx"] = int(row["problem_idx"])
# Add all problem features (auto-detected)
for col in problem_cols:
pq_row[col] = row[col]
# Add kernel configuration (parsed from name)
pq_row.update(kernel_cfg)
# Add metadata overrides
if op_type:
pq_row["op_type"] = op_type
if dtype:
pq_row["dtype"] = dtype
if variant:
pq_row["variant"] = variant
# Add architecture
pq_row["arch"] = arch
# Add hardware profile
pq_row.update(hw_profile)
# Add validity flag
pq_row["is_valid"] = True
pq_row["run_id"] = 0
rows.append(pq_row)
result_df = pd.DataFrame(rows)
print(f"Converted {len(result_df):,} benchmark results")
print(f" Valid: {result_df['is_valid'].sum():,}")
print(f" Unique kernels: {result_df['kernel_name'].nunique()}")
# Count unique problems (use problem columns only)
if problem_cols:
unique_problems = result_df[problem_cols].drop_duplicates().shape[0]
print(f" Unique problems: {unique_problems}")
print()
# Save to parquet
output_file.parent.mkdir(parents=True, exist_ok=True)
result_df.to_parquet(output_file, index=False)
print(f"✓ Saved to {output_file}")
print()
# Show statistics
print("=" * 80)
print("STATISTICS")
print("=" * 80)
print()
# Performance metrics
print("Performance metrics:")
print(
f" Latency (ms): {result_df['latency_ms'].min():.4f} - {result_df['latency_ms'].max():.4f}"
)
print(
f" TFLOPS: {result_df['tflops'].min():.2f} - {result_df['tflops'].max():.2f}"
)
print(f" Mean TFLOPS: {result_df['tflops'].mean():.2f}")
print(f" Median TFLOPS: {result_df['tflops'].median():.2f}")
print()
# Pipeline distribution (if available)
if "pipeline" in result_df.columns:
print("Pipeline distribution:")
print(result_df["pipeline"].value_counts())
print()
# Operation type distribution (if available)
if "op_type" in result_df.columns:
print("Operation type distribution:")
print(result_df["op_type"].value_counts())
print()
# Show sample best results
print("Sample best kernels per problem:")
# Group by problem columns if available
if problem_cols:
best_per_problem = result_df.loc[
result_df.groupby(problem_cols)["tflops"].idxmax()
]
for i, (idx, row) in enumerate(best_per_problem.head(5).iterrows()):
prob_desc = ", ".join(
[f"{col}={row[col]}" for col in problem_cols[:4]]
) # Show first 4 params
print(
f" {prob_desc}... → {row['tflops']:.1f} TFLOPS ({row['kernel_name']})"
)
print()
return result_df
def main():
parser = argparse.ArgumentParser(
description="Generic CSV to Parquet converter for ML training data",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--input", type=str, required=True, help="Input CSV file from benchmark"
)
parser.add_argument("--output", type=str, required=True, help="Output parquet file")
parser.add_argument(
"--arch", type=str, default="gfx950", help="GPU architecture (default: gfx950)"
)
parser.add_argument(
"--dtype",
type=str,
help="Data type override (default: auto-detect from kernel name)",
)
parser.add_argument(
"--variant",
type=str,
help="Operation variant override (default: auto-detect from kernel name)",
)
parser.add_argument(
"--op-type",
type=str,
help="Operation type override (default: auto-detect from kernel name)",
)
parser.add_argument(
"--kernel-pattern",
type=str,
help="Custom regex pattern for parsing kernel names (use named groups)",
)
args = parser.parse_args()
input_file = Path(args.input)
output_file = Path(args.output)
if not input_file.exists():
print(f"Error: Input file not found: {input_file}")
return 1
# Convert CSV to parquet
df = convert_csv_to_parquet(
input_file,
output_file,
args.arch,
args.dtype,
args.variant,
args.op_type,
args.kernel_pattern,
)
print("=" * 80)
print("CONVERSION COMPLETE")
print("=" * 80)
print()
print(f"✓ Output: {output_file}")
print(f"✓ Rows: {len(df):,}")
print(f"✓ Columns: {len(df.columns)}")
print(f"✓ Size: {output_file.stat().st_size / 1024:.1f} KB")
print()
return 0
if __name__ == "__main__":
exit(main())