Files
composable_kernel/script/process_perf_data.py
Aviral Goel ab68c9d384 chore(copyright): update copyright header for script directory (#3184)
* chore(copyright): update copyright header for tile_engine directory

* chore(copyright): update copyright header for script directory

---------

Co-authored-by: Vidyasagar Ananthan <vanantha@amd.com>
2025-11-11 11:26:01 -08:00

489 lines
16 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Advanced Micro Devices, Inc., or its affiliates.
# SPDX-License-Identifier: MIT
import os
import io
import argparse
import datetime
# import numpy as np
import sqlalchemy
from sqlalchemy import text
import pandas as pd
from sshtunnel import SSHTunnelForwarder
def print_to_string(*args, **kwargs):
output = io.StringIO()
print(*args, file=output, **kwargs)
contents = output.getvalue()
output.close()
return contents
def parse_args():
parser = argparse.ArgumentParser(description="Parse results from tf benchmark runs")
parser.add_argument(
"filename", type=str, help="Log file to prase or directory containing log files"
)
args = parser.parse_args()
files = []
if os.path.isdir(args.filename):
all_files = os.listdir(args.filename)
for name in all_files:
if "log" not in name:
continue
files.append(os.path.join(args.filename, name))
else:
files = [args.filename]
args.files = files
return args
def get_log_params(logfile):
print("logfile=", logfile)
branch_name = " "
node_id = " "
gpu_arch = " "
hip_vers = " "
compute_units = 0
environment = " "
rocm_vers = " "
for line in open(logfile):
if "Branch name" in line:
lst = line.split()
branch_name = lst[2]
if "On branch" in line:
lst = line.split()
branch_name = lst[2]
if "Node name" in line:
lst = line.split()
node_id = lst[2]
if "GPU_arch" in line:
lst = line.split()
gpu_arch = lst[2]
if "HIP version" in line:
lst = line.split()
hip_vers = lst[2]
if "Compute Unit" in line:
lst = line.split()
compute_units = lst[2]
if "Environment type" in line:
lst = line.split()
environment = lst[2]
if "InstalledDir" in line:
lst = line.split()
rocm_vers = lst[1][
lst[1].find("/opt/rocm-") + len("/opt/rocm-") : lst[1].rfind(
"/llvm/bin"
)
]
return (
branch_name,
node_id,
gpu_arch,
compute_units,
rocm_vers,
hip_vers,
environment,
)
def parse_logfile(logfile):
glue = ""
res = []
tests = []
kernels = []
tflops = []
dtype = []
alayout = []
blayout = []
M = []
N = []
K = []
StrideA = []
StrideB = []
StrideC = []
if "perf_gemm" in logfile and "gemm_bilinear" not in logfile:
for line in open(logfile):
if "Best Perf" in line:
lst = line.split()
if len(lst) >= 37: # the line is complete
tests.append(glue.join(lst[5:30]))
kernels.append(glue.join(lst[37:]))
tflops.append(lst[33])
dtype.append(lst[5])
alayout.append(lst[8])
blayout.append(lst[11])
M.append(lst[14])
N.append(lst[17])
K.append(lst[20])
StrideA.append(lst[23])
StrideB.append(lst[26])
StrideC.append(lst[29])
elif len(lst) < 37 and len(lst) >= 33: # the tflops are available
tests.append(glue.join(lst[5:30]))
kernels.append("N/A")
tflops.append(lst[33])
dtype.append(lst[5])
alayout.append(lst[8])
blayout.append(lst[11])
M.append(lst[14])
N.append(lst[17])
K.append(lst[20])
StrideA.append(lst[23])
StrideB.append(lst[26])
StrideC.append(lst[29])
print("warning: incomplete line:", lst)
elif len(lst) < 33: # even the tflops are not available
print("Error in ckProfiler output!")
print("warning: incomplete line=", lst)
# sort results
# sorted_tests = sorted(tests)
res = [x for _, x in sorted(zip(tests, tflops))]
# sorted_kernels = [x for _,x in sorted(zip(tests,kernels))]
# test_list = list(range(1, len(tests) + 1))
# parse conv_fwd and conv_bwd performance tests:
elif "conv_fwd" in logfile or "conv_bwd" in logfile:
for line in open(logfile):
if "tflops:" in line:
lst = line.split()
res.append(lst[1])
# parse all other performance tests:
elif (
"resnet50" in logfile
or "batched_gemm" in logfile
or "grouped_gemm" in logfile
or "gemm_bilinear" in logfile
or "reduction" in logfile
):
for line in open(logfile):
if "Best Perf" in line:
lst = line.split()
res.append(lst[4])
elif "onnx_gemm" in logfile:
for line in open(logfile):
if "Best Perf" in line:
lst = line.split()
res.append(lst[33])
elif "splitK_gemm" in logfile or "mixed_gemm" in logfile:
for line in open(logfile):
if "Best Perf" in line:
lst = line.split()
res.append(lst[36])
elif "perf_fmha" in logfile:
for line in open(logfile):
if "TFlops" in line:
lst = line.split()
line_dict = dict(zip(lst[1:], lst))
res.append(line_dict["TFlops,"])
elif "perf_tile_gemm_basic" in logfile or "perf_tile_gemm_mem_pipeline" in logfile:
for line in open(logfile):
if "TFlops" in line:
lst = line.split()
line_dict = dict(zip(lst[1:], lst))
res.append(line_dict["TFlops,"])
return res
def get_baseline(table, connection):
query = text(
"""SELECT * from """
+ table
+ """ WHERE Datetime = (SELECT MAX(Datetime) FROM """
+ table
+ """ where Branch_ID='develop' );"""
)
return pd.read_sql(query, connection)
def store_new_test_result(
table_name,
test_results,
testlist,
branch_name,
node_id,
gpu_arch,
compute_units,
rocm_vers,
hip_vers,
environment,
connection,
):
params = [
str(branch_name),
str(node_id),
str(gpu_arch),
compute_units,
str(rocm_vers),
str(hip_vers),
str(environment),
str(datetime.datetime.now()),
]
df = pd.DataFrame(
data=[params],
columns=[
"Branch_ID",
"Node_ID",
"GPU_arch",
"Compute Units",
"ROCM_version",
"HIP_version",
"Environment",
"Datetime",
],
)
df_add = pd.DataFrame(data=[test_results], columns=testlist)
df = pd.concat([df, df_add], axis=1)
# print("new test results dataframe:",df)
df.to_sql(table_name, connection, if_exists="append", index=False)
return 0
def compare_test_to_baseline(baseline, test, testlist):
regression = 0
if not baseline.empty:
base = baseline[testlist].to_numpy(dtype="float")
base_list = base[0]
ave_perf = 0
for i in range(len(base_list)):
# success criterion:
if base_list[i] > 1.01 * float(test[i]):
print(
"test # ",
i,
"shows regression by {:.3f}%".format(
(float(test[i]) - base_list[i]) / base_list[i] * 100
),
)
regression = 1
if base_list[i] > 0:
ave_perf = ave_perf + float(test[i]) / base_list[i]
if regression == 0:
print("no regressions found")
ave_perf = ave_perf / len(base_list)
print("average performance relative to baseline:", ave_perf)
else:
print("could not find a baseline")
return regression
"""
def post_test_params(tlist,connection):
sorted_dtypes = [x for _,x in sorted(zip(tests,dtype))]
sorted_alayout = [x for _,x in sorted(zip(tests,alayout))]
sorted_blayout = [x for _,x in sorted(zip(tests,blayout))]
sorted_M = [x for _,x in sorted(zip(tests,M))]
sorted_N = [x for _,x in sorted(zip(tests,N))]
sorted_K = [x for _,x in sorted(zip(tests,K))]
sorted_StrideA = [x for _,x in sorted(zip(tests,StrideA))]
sorted_StrideB = [x for _,x in sorted(zip(tests,StrideB))]
sorted_StrideC = [x for _,x in sorted(zip(tests,StrideC))]
ck_gemm_params=[tlist,sorted_dtypes,sorted_alayout,sorted_blayout,
sorted_M,sorted_N,sorted_K,sorted_StrideA,sorted_StrideB,
sorted_StrideC]
df=pd.DataFrame(np.transpose(ck_gemm_params),columns=['Test_number','Data_type',
'Alayout','BLayout','M','N','K', 'StrideA','StrideB','StrideC'])
print(df)
dtypes = {
'Test_number': Integer(),
'Data_type': NVARCHAR(length=5),
'Alayout': NVARCHAR(length=12),
'Blayout': NVARCHAR(length=12),
'M': Integer(),
'N': Integer(),
'K': Integer(),
'StrideA': Integer(),
'StrideB': Integer(),
'StrideC': Integer()
}
df.to_sql("ck_gemm_test_params",connection,if_exists='replace',index=False, dtype=dtypes)
"""
def main():
args = parse_args()
results = []
tflops_base = []
testlist = []
# parse the test parameters from the logfile
for filename in args.files:
(
branch_name,
node_id,
gpu_arch,
compute_units,
rocm_vers,
hip_vers,
environment,
) = get_log_params(filename)
print("Branch name:", branch_name)
print("Node name:", node_id)
print("GPU_arch:", gpu_arch)
print("Compute units:", compute_units)
print("ROCM_version:", rocm_vers)
print("HIP_version:", hip_vers)
print("Environment:", environment)
# parse results, get the Tflops value for "Best Perf" kernels
results = parse_logfile(filename)
print("Number of tests:", len(results))
sql_hostname = "127.0.0.1"
sql_username = os.environ["dbuser"]
sql_password = os.environ["dbpassword"]
sql_main_database = os.environ["ck_perf_db"]
sql_port = 3306
ssh_host = os.environ["dbsship"]
ssh_user = os.environ["dbsshuser"]
ssh_port = int(os.environ["dbsshport"])
ssh_pass = os.environ["dbsshpassword"]
with SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_user,
ssh_password=ssh_pass,
remote_bind_address=(sql_hostname, sql_port),
) as tunnel:
sqlEngine = sqlalchemy.create_engine(
"mysql+pymysql://{0}:{1}@{2}:{3}/{4}".format(
sql_username,
sql_password,
sql_hostname,
tunnel.local_bind_port,
sql_main_database,
)
)
conn = sqlEngine.connect()
# save gemm performance tests:
if "perf_gemm" in filename and "gemm_bilinear" not in filename:
# write the ck_gemm_test_params table only needed once the test set changes
# post_test_params(test_list,conn)
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_gemm_tflops"
if "batched_gemm" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_batched_gemm_tflops"
if "grouped_gemm" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_grouped_gemm_tflops"
if "perf_conv_fwd" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_conv_fwd_tflops"
if "perf_conv_bwd_data" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_conv_bwd_data_tflops"
if "grouped_conv_fwd" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_grouped_conv_fwd_tflops"
if "grouped_conv_bwd_data" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_grouped_conv_bwd_data_tflops"
if "grouped_conv_bwd_weight" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_grouped_conv_bwd_weight_tflops"
if "gemm_bilinear" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_gemm_bilinear_tflops"
if "reduction" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_reduction_GBps"
if "resnet50_N4" in filename:
for i in range(1, 50):
testlist.append("Layer%i" % i)
table_name = "ck_resnet50_N4_tflops"
if "resnet50_N256" in filename:
for i in range(1, 50):
testlist.append("Layer%i" % i)
table_name = "ck_resnet50_N256_tflops"
if "onnx_gemm" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_onnx_gemm_tflops"
if "splitK_gemm" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_splitK_gemm_tflops"
if "mixed_gemm" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_mixed_gemm_tflops"
if "fmha_fwd" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_fmha_fwd_tflops"
if "fmha_bwd" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_fmha_bwd_tflops"
if "gemm_basic_fp16" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_basic_fp16_tflops"
if "gemm_mem_pipeline_fp16" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_mem_pipeline_fp16_tflops"
if "gemm_basic_bf16" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_basic_bf16_tflops"
if "gemm_mem_pipeline_bf16" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_mem_pipeline_bf16_tflops"
if "gemm_basic_fp8" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_basic_fp8_tflops"
if "gemm_mem_pipeline_fp8" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_mem_pipeline_fp8_tflops"
if "gemm_basic_bf8" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_basic_bf8_tflops"
if "gemm_mem_pipeline_bf8" in filename:
for i in range(1, len(results) + 1):
testlist.append("Test%i" % i)
table_name = "ck_tile_gemm_mem_pipeline_bf8_tflops"
tflops_base = get_baseline(table_name, conn)
store_new_test_result(
table_name,
results,
testlist,
branch_name,
node_id,
gpu_arch,
compute_units,
rocm_vers,
hip_vers,
environment,
sqlEngine,
)
conn.close()
# compare the results to the baseline if baseline exists
regression = 0
regression = compare_test_to_baseline(tflops_base, results, testlist)
return regression
if __name__ == "__main__":
main()