Files
composable_kernel/test_data/miopen_to_csv.py
JH-Leon-KIM-AMD b963478759 CSV-driven convolution test pipeline (#2581)
* Add CSV-driven convolution test pipeline

- Add test_grouped_convnd_fwd_dataset_xdl.cpp with CSV reader functionality
- Add complete dataset generation toolchain in test_data/
- Add Jenkins integration with RUN_CONV_COMPREHENSIVE_DATASET parameter
- Ready for comprehensive convolution testing with scalable datasets

* Update convolution test dataset generation pipeline

* add 2d, 3d dataset csv files

* Remove CSV test dataset files from repository

* Update generate_test_dataset.sh

* Fix channel division for MIOpen to CK conversion

* Remove unnecessary test files

* Fix clang-format-18 formatting issues

---------

Co-authored-by: Bartłomiej Kocot <barkocot@amd.com>
2025-08-13 16:24:34 +02:00

364 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Convert MIOpen Driver Commands to CSV Test Cases
Parses MIOpen driver commands from log files and converts them to CSV format
for CK convolution testing.
Usage:
python3 miopen_to_csv.py --input miopen_commands.txt --output conv_cases.csv
python3 miopen_to_csv.py --input miopen_log.txt --output-2d conv_2d.csv --output-3d conv_3d.csv
"""
import argparse
import csv
import re
import os
def parse_miopen_command(command_line):
"""
Parse MIOpen driver command line into parameter dictionary
Example input:
./bin/MIOpenDriver conv -n 4 -c 3 -H 224 -W 224 -k 64 -y 3 -x 3 -p 1 -q 1 -u 1 -v 1 -l 1 -j 1 -m conv -g 1 -F 1 -t 1
Returns dict with parsed parameters or None if parsing fails
"""
if not command_line.strip().startswith('./bin/MIOpenDriver conv'):
return None
# Extract parameters using regex
params = {}
# Parameter mapping: flag -> description
# Support both short (-D) and long (--in_d) parameter formats
param_patterns = {
'n': r'-n\s+(\d+)', # batch size
'c': r'-c\s+(\d+)', # input channels
'k': r'-k\s+(\d+)', # output channels
'H': r'-H\s+(\d+)', # input height
'W': r'-W\s+(\d+)', # input width
'D': r'(?:-D|--in_d)\s+(\d+)', # input depth (3D only) - supports both -D and --in_d
'y': r'-y\s+(\d+)', # kernel height
'x': r'-x\s+(\d+)', # kernel width
'z': r'(?:-z|--fil_d)\s+(\d+)', # kernel depth (3D only) - supports both -z and --fil_d
'u': r'-u\s+(\d+)', # stride height
'v': r'-v\s+(\d+)', # stride width
'w': r'(?:-w|--conv_stride_d)\s+(\d+)', # stride depth (3D only) - supports both -w and --conv_stride_d
'p': r'-p\s+(\d+)', # pad height
'q': r'-q\s+(\d+)', # pad width
's': r'(?:-s|--pad_d)\s+(\d+)', # pad depth (3D only) - supports both -s and --pad_d
'l': r'-l\s+(\d+)', # dilation height
'j': r'-j\s+(\d+)', # dilation width
'r': r'(?:-r|--dilation_d)\s+(\d+)', # dilation depth (3D only) - supports both -r and --dilation_d
'g': r'-g\s+(\d+)', # groups
'F': r'-F\s+(\d+)', # direction (1=fwd, 2=bwd_weight, 4=bwd_data)
}
for param, pattern in param_patterns.items():
match = re.search(pattern, command_line)
if match:
params[param] = int(match.group(1))
return params if params else None
def miopen_to_conv_param(miopen_params):
"""
Convert MIOpen parameters to CK ConvParam format
Returns dictionary in CSV format or None if conversion fails
"""
if not miopen_params:
return None
# Determine if 2D or 3D convolution
is_3d = 'D' in miopen_params or 'z' in miopen_params or 'w' in miopen_params or 'r' in miopen_params or 's' in miopen_params
# Extract basic parameters with defaults
ndim = 3 if is_3d else 2
groups = miopen_params.get('g', 1)
batch_size = miopen_params.get('n', 1)
# MIOpen uses total channels (C*G), CK uses channels per group
out_channels_total = miopen_params.get('k', 64)
in_channels_total = miopen_params.get('c', 3)
out_channels = out_channels_total // groups # CK format: channels per group
in_channels = in_channels_total // groups # CK format: channels per group
if is_3d:
# 3D convolution
kernel_d = miopen_params.get('z', 3)
kernel_h = miopen_params.get('y', 3)
kernel_w = miopen_params.get('x', 3)
input_d = miopen_params.get('D', 16)
input_h = miopen_params.get('H', 32)
input_w = miopen_params.get('W', 32)
stride_d = miopen_params.get('w', 1)
stride_h = miopen_params.get('u', 1)
stride_w = miopen_params.get('v', 1)
dilation_d = miopen_params.get('r', 1)
dilation_h = miopen_params.get('l', 1)
dilation_w = miopen_params.get('j', 1)
pad_d = miopen_params.get('s', 0)
pad_h = miopen_params.get('p', 0)
pad_w = miopen_params.get('q', 0)
# Calculate output dimensions
output_d = (input_d + 2 * pad_d - dilation_d * (kernel_d - 1) - 1) // stride_d + 1
output_h = (input_h + 2 * pad_h - dilation_h * (kernel_h - 1) - 1) // stride_h + 1
output_w = (input_w + 2 * pad_w - dilation_w * (kernel_w - 1) - 1) // stride_w + 1
# Skip invalid configurations
if output_d <= 0 or output_h <= 0 or output_w <= 0:
return None
direction = miopen_params.get('F', 1) # 1=fwd, 2=bwd_weight, 4=bwd_data
direction_name = {1: 'fwd', 2: 'bwd_weight', 4: 'bwd_data'}.get(direction, 'fwd')
return {
'NDim': ndim,
'Groups': groups,
'BatchSize': batch_size,
'OutChannels': out_channels,
'InChannels': in_channels,
'KernelD': kernel_d, 'KernelH': kernel_h, 'KernelW': kernel_w,
'InputD': input_d, 'InputH': input_h, 'InputW': input_w,
'OutputD': output_d, 'OutputH': output_h, 'OutputW': output_w,
'StrideD': stride_d, 'StrideH': stride_h, 'StrideW': stride_w,
'DilationD': dilation_d, 'DilationH': dilation_h, 'DilationW': dilation_w,
'LeftPadD': pad_d, 'LeftPadH': pad_h, 'LeftPadW': pad_w,
'RightPadD': pad_d, 'RightPadH': pad_h, 'RightPadW': pad_w,
'TestName': f'MIOpen_3D_{direction_name}'
}
else:
# 2D convolution
kernel_h = miopen_params.get('y', 3)
kernel_w = miopen_params.get('x', 3)
input_h = miopen_params.get('H', 32)
input_w = miopen_params.get('W', 32)
stride_h = miopen_params.get('u', 1)
stride_w = miopen_params.get('v', 1)
dilation_h = miopen_params.get('l', 1)
dilation_w = miopen_params.get('j', 1)
pad_h = miopen_params.get('p', 0)
pad_w = miopen_params.get('q', 0)
# Calculate output dimensions
output_h = (input_h + 2 * pad_h - dilation_h * (kernel_h - 1) - 1) // stride_h + 1
output_w = (input_w + 2 * pad_w - dilation_w * (kernel_w - 1) - 1) // stride_w + 1
# Skip invalid configurations
if output_h <= 0 or output_w <= 0:
return None
direction = miopen_params.get('F', 1)
direction_name = {1: 'fwd', 2: 'bwd_weight', 4: 'bwd_data'}.get(direction, 'fwd')
return {
'NDim': ndim,
'Groups': groups,
'BatchSize': batch_size,
'OutChannels': out_channels,
'InChannels': in_channels,
'KernelH': kernel_h, 'KernelW': kernel_w,
'InputH': input_h, 'InputW': input_w,
'OutputH': output_h, 'OutputW': output_w,
'StrideH': stride_h, 'StrideW': stride_w,
'DilationH': dilation_h, 'DilationW': dilation_w,
'LeftPadH': pad_h, 'LeftPadW': pad_w,
'RightPadH': pad_h, 'RightPadW': pad_w,
'TestName': f'MIOpen_2D_{direction_name}'
}
def write_csv_cases(test_cases, output_file, ndim):
"""Write test cases to CSV file"""
if not test_cases:
print(f"No {ndim}D test cases to write")
return
print(f"Writing {len(test_cases)} {ndim}D test cases to {output_file}")
# Define CSV headers based on dimension
if ndim == 2:
headers = ['NDim', 'Groups', 'BatchSize', 'OutChannels', 'InChannels',
'KernelH', 'KernelW', 'InputH', 'InputW', 'OutputH', 'OutputW',
'StrideH', 'StrideW', 'DilationH', 'DilationW',
'LeftPadH', 'LeftPadW', 'RightPadH', 'RightPadW', 'TestName']
else: # 3D
headers = ['NDim', 'Groups', 'BatchSize', 'OutChannels', 'InChannels',
'KernelD', 'KernelH', 'KernelW', 'InputD', 'InputH', 'InputW',
'OutputD', 'OutputH', 'OutputW', 'StrideD', 'StrideH', 'StrideW',
'DilationD', 'DilationH', 'DilationW',
'LeftPadD', 'LeftPadH', 'LeftPadW', 'RightPadD', 'RightPadH', 'RightPadW', 'TestName']
with open(output_file, 'w', newline='') as csvfile:
# Write header comment
csvfile.write(f"# {ndim}D Convolution Test Cases from MIOpen Commands\n")
csvfile.write(f"# Generated {len(test_cases)} test cases\n")
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for test_case in test_cases:
# Only write fields that exist in headers
filtered_case = {k: v for k, v in test_case.items() if k in headers}
writer.writerow(filtered_case)
def main():
parser = argparse.ArgumentParser(description='Convert MIOpen commands to CSV test cases')
parser.add_argument('--input', type=str, required=True,
help='Input file with MIOpen driver commands')
parser.add_argument('--output', type=str,
help='Output CSV file (for mixed 2D/3D cases)')
parser.add_argument('--output-2d', type=str, default='miopen_conv_2d.csv',
help='Output CSV file for 2D cases')
parser.add_argument('--output-3d', type=str, default='miopen_conv_3d.csv',
help='Output CSV file for 3D cases')
parser.add_argument('--filter-duplicates', action='store_true',
help='Remove duplicate test cases')
parser.add_argument('--model-name', type=str, default='MIOpen',
help='Model name to use in test case names (default: MIOpen)')
args = parser.parse_args()
if not os.path.exists(args.input):
print(f"ERROR: Input file not found: {args.input}")
return 1
print(f"Parsing MIOpen commands from {args.input}...")
test_cases_2d = []
test_cases_3d = []
total_lines = 0
parsed_lines = 0
with open(args.input, 'r') as f:
for line_num, line in enumerate(f, 1):
total_lines += 1
line = line.strip()
# Skip empty lines and non-MIOpen commands
# Handle both direct commands and logged commands with MIOpen prefix
if not line:
continue
# Extract the actual MIOpenDriver command from logged format
if 'MIOpenDriver conv' in line:
# Extract command after finding MIOpenDriver
command_start = line.find('./bin/MIOpenDriver conv')
if command_start != -1:
line = line[command_start:]
else:
# Handle cases where path might be different - create standard format
driver_start = line.find('MIOpenDriver conv')
if driver_start != -1:
line = './bin/' + line[driver_start:]
else:
continue
elif not line.startswith('./bin/MIOpenDriver conv'):
continue
try:
# Parse MIOpen command
miopen_params = parse_miopen_command(line)
if not miopen_params:
continue
# Convert to ConvParam format
conv_param = miopen_to_conv_param(miopen_params)
if not conv_param:
continue
# Add model name to test name
conv_param['TestName'] = f"{args.model_name}_{conv_param['NDim']}D_fwd"
# Separate 2D and 3D cases
if conv_param['NDim'] == 2:
test_cases_2d.append(conv_param)
else:
test_cases_3d.append(conv_param)
parsed_lines += 1
except Exception as e:
print(f"WARNING: Failed to parse line {line_num}: {e}")
continue
print(f"Processed {total_lines} lines, parsed {parsed_lines} commands")
print(f"Found {len(test_cases_2d)} 2D cases, {len(test_cases_3d)} 3D cases")
# Remove duplicates if requested
if args.filter_duplicates:
# Simple duplicate removal based on key parameters
def make_key(case):
if case['NDim'] == 2:
return (case['Groups'], case['BatchSize'], case['OutChannels'], case['InChannels'],
case['KernelH'], case['KernelW'], case['InputH'], case['InputW'],
case['StrideH'], case['StrideW'])
else:
return (case['Groups'], case['BatchSize'], case['OutChannels'], case['InChannels'],
case['KernelD'], case['KernelH'], case['KernelW'],
case['InputD'], case['InputH'], case['InputW'],
case['StrideD'], case['StrideH'], case['StrideW'])
seen_2d = set()
unique_2d = []
for case in test_cases_2d:
key = make_key(case)
if key not in seen_2d:
seen_2d.add(key)
unique_2d.append(case)
seen_3d = set()
unique_3d = []
for case in test_cases_3d:
key = make_key(case)
if key not in seen_3d:
seen_3d.add(key)
unique_3d.append(case)
print(f"After deduplication: {len(unique_2d)} 2D cases, {len(unique_3d)} 3D cases")
test_cases_2d = unique_2d
test_cases_3d = unique_3d
# Write output files
if args.output:
# Write mixed cases to single file
all_cases = test_cases_2d + test_cases_3d
if all_cases:
print(f"Writing {len(all_cases)} total cases to {args.output}")
# Use 2D headers for mixed file, extend as needed
mixed_headers = ['NDim', 'Groups', 'BatchSize', 'OutChannels', 'InChannels',
'KernelH', 'KernelW', 'InputH', 'InputW', 'OutputH', 'OutputW',
'StrideH', 'StrideW', 'DilationH', 'DilationW',
'LeftPadH', 'LeftPadW', 'RightPadH', 'RightPadW', 'TestName']
with open(args.output, 'w', newline='') as csvfile:
csvfile.write(f"# Mixed 2D/3D Convolution Test Cases from MIOpen Commands\n")
writer = csv.DictWriter(csvfile, fieldnames=mixed_headers, extrasaction='ignore')
writer.writeheader()
for case in all_cases:
writer.writerow(case)
else:
# Write separate files for 2D and 3D
if test_cases_2d:
write_csv_cases(test_cases_2d, args.output_2d, 2)
if test_cases_3d:
write_csv_cases(test_cases_3d, args.output_3d, 3)
print("Conversion completed!")
return 0
if __name__ == "__main__":
exit(main())