Add persistent async input scheduler for GEMM kernels (#3520)

Add signal-based synchronization for persistent GEMM kernels where
input data becomes available incrementally. Uses modulo wraparound
(like PyTorch's AsyncMM) for chunk index calculation:
  chunk_idx = ((tile_idx + tile_idx_pivot) / tiles_per_chunk) % num_chunks

Key components:
- PersistentAsyncInputScheduler struct with tiles_per_chunk_m,
  chunk_signals, tile_idx_pivot_m, and num_chunks fields
- wait_eq_wave method using __builtin_amdgcn_s_sleep for power efficiency
- IsSupportedArgument validation for scheduler parameters
- Example demonstrating async input scheduling with simulated producer
- GTest unit tests covering all layout combinations

[ROCm/composable_kernel commit: 91b4102a59]
This commit is contained in:
Max Podkorytov
2026-01-20 10:37:09 -08:00
committed by GitHub
parent e227e837be
commit 8b842250da
11 changed files with 844 additions and 61 deletions

View File

@@ -456,7 +456,8 @@ inline auto create_args()
.insert("json", "0", "0: No Json, 1: Dump Results in Json format")
.insert("jsonfile", "gemm.json", "json file name to dump results")
.insert("flush_cache", "true", "flush cache before running the kernel, defaults to true")
.insert("rotating_count", "1000", "rotating count, defaults to 1000");
.insert("rotating_count", "1000", "rotating count, defaults to 1000")
.insert("test_async", "0", "0: normal gemm, 1: test async input scheduler");
return arg_parser;
}

View File

@@ -12,6 +12,169 @@
#include "run_gemm_example_common.hpp"
#include "universal_gemm_invoker.hpp"
// Universal GEMM-specific wrapper that handles test_async flag
template <typename GemmConfig,
typename ADataType,
typename BDataType = ADataType,
typename CDataType = ADataType,
typename ALayout,
typename BLayout,
typename CLayout>
int run_gemm_example_with_layouts_universal(ck_tile::ArgParser& arg_parser,
const ALayout a_layout = ALayout{},
const BLayout b_layout = BLayout{},
const CLayout c_layout = CLayout{})
{
using Invoker = UniversalInvoker;
using AccDataType = typename GemmTypeConfig<ADataType, BDataType, CDataType>::AccDataType;
// Check for async input scheduler test mode
bool test_async = arg_parser.get_int("test_async");
if(test_async)
{
// Extract parameters for async test (same as shared implementation)
const ck_tile::index_t M = arg_parser.get_int("m");
const ck_tile::index_t N = arg_parser.get_int("n");
const ck_tile::index_t K = arg_parser.get_int("k");
const ck_tile::index_t kbatch = arg_parser.get_int("split_k");
using Row = ck_tile::tensor_layout::gemm::RowMajor;
constexpr bool is_a_row_major = std::is_same_v<ALayout, Row>;
constexpr bool is_b_row_major = std::is_same_v<BLayout, Row>;
constexpr bool is_c_row_major = std::is_same_v<CLayout, Row>;
const ck_tile::index_t stride_A = is_a_row_major ? K : M;
const ck_tile::index_t stride_B = is_b_row_major ? N : K;
const ck_tile::index_t stride_C = is_c_row_major ? N : M;
// Allocate and initialize tensors
ck_tile::HostTensor<ADataType> a_m_k(ck_tile::host_tensor_descriptor(
M, K, stride_A, ck_tile::bool_constant<is_a_row_major>{}));
ck_tile::HostTensor<BDataType> b_k_n(ck_tile::host_tensor_descriptor(
K, N, stride_B, ck_tile::bool_constant<is_b_row_major>{}));
ck_tile::HostTensor<CDataType> c_m_n_dev_result(ck_tile::host_tensor_descriptor(
M, N, stride_C, ck_tile::bool_constant<is_c_row_major>{}));
ck_tile::FillUniformDistributionIntegerValue<ADataType>{-5, 5}(a_m_k);
ck_tile::FillUniformDistributionIntegerValue<BDataType>{-5, 5}(b_k_n);
ck_tile::DeviceMem a_m_k_dev_buf(a_m_k.get_element_space_size_in_bytes());
ck_tile::DeviceMem b_k_n_dev_buf(b_k_n.get_element_space_size_in_bytes());
ck_tile::DeviceMem c_m_n_dev_buf(c_m_n_dev_result.get_element_space_size_in_bytes());
a_m_k_dev_buf.ToDevice(a_m_k.data());
b_k_n_dev_buf.ToDevice(b_k_n.data());
c_m_n_dev_buf.SetZero();
c_m_n_dev_result.SetZero();
ck_tile::GemmHostArgs args = {a_m_k_dev_buf.GetDeviceBuffer(),
b_k_n_dev_buf.GetDeviceBuffer(),
c_m_n_dev_buf.GetDeviceBuffer(),
kbatch,
M,
N,
K,
stride_A,
stride_B,
stride_C};
Invoker::template test_async_input_scheduler<GemmConfig,
ADataType,
BDataType,
ck_tile::tuple<>,
AccDataType,
CDataType,
ALayout,
BLayout,
ck_tile::tuple<>,
CLayout,
ck_tile::element_wise::PassThrough>(
args, ck_tile::stream_config{nullptr, false, 1});
// Copy result from device for verification
c_m_n_dev_buf.FromDevice(c_m_n_dev_result.data());
// Compute CPU reference
ck_tile::HostTensor<CDataType> c_m_n_ref(ck_tile::host_tensor_descriptor(
M, N, stride_C, ck_tile::bool_constant<is_c_row_major>{}));
c_m_n_ref.SetZero();
ck_tile::reference_gemm<ADataType, BDataType, AccDataType, CDataType>(
a_m_k, b_k_n, c_m_n_ref);
// Verify results
const float max_accumulated_value =
*std::max_element(c_m_n_ref.mData.begin(), c_m_n_ref.mData.end());
const auto rtol_atol = calculate_rtol_atol<ADataType, BDataType, AccDataType, CDataType>(
K, kbatch, max_accumulated_value);
bool pass = do_verify(c_m_n_dev_result, c_m_n_ref, rtol_atol, "CPU");
std::cout << "Async input scheduler test: " << (pass ? "PASS" : "FAIL") << std::endl;
return pass;
}
// Normal path - delegate to shared implementation
return run_gemm_example_with_layouts<GemmConfig, Invoker, ADataType, BDataType, CDataType>(
arg_parser, a_layout, b_layout, c_layout);
}
// Universal GEMM-specific prec_type dispatcher that uses the wrapper
template <typename GemmConfig,
typename APrecType,
typename BPrecType = APrecType,
typename CPrecType = APrecType>
int run_gemm_example_prec_type_universal(std::string a_layout,
std::string b_layout,
ck_tile::ArgParser& arg_parser)
{
using Row = ck_tile::tensor_layout::gemm::RowMajor;
using Col = ck_tile::tensor_layout::gemm::ColumnMajor;
bool preshuffle = GemmConfig::Preshuffle;
if(preshuffle && std::is_same_v<BPrecType, ck_tile::pk_int4_t>)
{
throw std::runtime_error("Preshuffle is not supported for this int4 datatype!");
}
if(preshuffle && a_layout != "R" && b_layout != "C")
{
throw std::runtime_error(
"Preshuffle is supported only for A(Row major), B(column major) input matrices!");
}
using LayoutVariant = std::variant<Row, Col>;
auto string_to_layout = [](const std::string& layout) -> LayoutVariant {
if(layout == "R")
return Row{};
if(layout == "C")
return Col{};
throw std::runtime_error("Unsupported layout: " + layout);
};
auto a_layout_variant = string_to_layout(a_layout);
auto b_layout_variant = string_to_layout(b_layout);
return std::visit(
[&](auto a_layout_type, auto b_layout_type) -> int {
if constexpr(std::is_same_v<BPrecType, ck_tile::pk_int4_t> &&
std::is_same_v<decltype(b_layout_type), Row>)
{
throw std::runtime_error("Unsupported memory layout for the input matrices when "
"BPrecType is ck_tile::pk_int4_t!");
}
else
{
return run_gemm_example_with_layouts_universal<GemmConfig,
APrecType,
BPrecType,
CPrecType>(
arg_parser, a_layout_type, b_layout_type, Row{});
}
},
a_layout_variant,
b_layout_variant);
}
template <template <typename PrecType> typename GemmConfig>
int run_gemm_example(ck_tile::ArgParser& arg_parser)
{
@@ -19,52 +182,50 @@ int run_gemm_example(ck_tile::ArgParser& arg_parser)
std::string a_layout = arg_parser.get_str("a_layout");
std::string b_layout = arg_parser.get_str("b_layout");
using Invoker = UniversalInvoker;
if(data_type == "fp16")
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::half_t>, Invoker, ck_tile::half_t>(
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::half_t>, ck_tile::half_t>(
a_layout, b_layout, arg_parser);
}
else if(data_type == "bf16")
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::bf16_t>, Invoker, ck_tile::bf16_t>(
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::bf16_t>, ck_tile::bf16_t>(
a_layout, b_layout, arg_parser);
}
else if(data_type == "fp8")
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::fp8_t>,
Invoker,
ck_tile::fp8_t,
ck_tile::fp8_t,
ck_tile::half_t>(a_layout, b_layout, arg_parser);
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::fp8_t>,
ck_tile::fp8_t,
ck_tile::fp8_t,
ck_tile::half_t>(
a_layout, b_layout, arg_parser);
}
else if(data_type == "bf8")
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::bf8_t>,
Invoker,
ck_tile::bf8_t,
ck_tile::bf8_t,
ck_tile::half_t>(a_layout, b_layout, arg_parser);
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::bf8_t>,
ck_tile::bf8_t,
ck_tile::bf8_t,
ck_tile::half_t>(
a_layout, b_layout, arg_parser);
}
else if(data_type == "int8")
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::int8_t>,
Invoker,
ck_tile::int8_t,
ck_tile::int8_t,
ck_tile::int32_t>(a_layout, b_layout, arg_parser);
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::int8_t>,
ck_tile::int8_t,
ck_tile::int8_t,
ck_tile::int32_t>(
a_layout, b_layout, arg_parser);
}
else if(data_type == "fp16i4")
{
// TODO: Add support for bhalf_t ADataType
if constexpr(GemmConfig<ck_tile::half_t>::Pipeline == ck_tile::GemmPipeline::COMPUTE_V3)
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::half_t>,
Invoker,
ck_tile::half_t,
ck_tile::pk_int4_t,
ck_tile::half_t>(a_layout, b_layout, arg_parser);
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::half_t>,
ck_tile::half_t,
ck_tile::pk_int4_t,
ck_tile::half_t>(
a_layout, b_layout, arg_parser);
}
else
{
@@ -75,11 +236,11 @@ int run_gemm_example(ck_tile::ArgParser& arg_parser)
{
if constexpr(GemmConfig<ck_tile::fp8_t>::Pipeline == ck_tile::GemmPipeline::COMPUTE_V3)
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::fp8_t>,
Invoker,
ck_tile::fp8_t,
ck_tile::pk_int4_t,
ck_tile::half_t>(a_layout, b_layout, arg_parser);
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::fp8_t>,
ck_tile::fp8_t,
ck_tile::pk_int4_t,
ck_tile::half_t>(
a_layout, b_layout, arg_parser);
}
else
{
@@ -90,11 +251,11 @@ int run_gemm_example(ck_tile::ArgParser& arg_parser)
{
if constexpr(GemmConfig<ck_tile::bf8_t>::Pipeline == ck_tile::GemmPipeline::COMPUTE_V3)
{
return run_gemm_example_prec_type<GemmConfig<ck_tile::bf8_t>,
Invoker,
ck_tile::bf8_t,
ck_tile::pk_int4_t,
ck_tile::half_t>(a_layout, b_layout, arg_parser);
return run_gemm_example_prec_type_universal<GemmConfig<ck_tile::bf8_t>,
ck_tile::bf8_t,
ck_tile::pk_int4_t,
ck_tile::half_t>(
a_layout, b_layout, arg_parser);
}
else
{

View File

@@ -2,7 +2,11 @@
// SPDX-License-Identifier: MIT
#pragma once
#include <functional>
#include <chrono>
#include <thread>
#include "gemm_utils.hpp"
#include "ck_tile/host/hip_check_error.hpp"
#include "ck_tile/host/device_memory.hpp"
struct UniversalInvoker
{
@@ -150,4 +154,170 @@ struct UniversalInvoker
preprocess,
ck_tile::make_kernel<GemmConfig::kBlockPerCu>(Kernel{}, grids, blocks, 0, kargs));
}
template <typename GemmConfig,
typename ADataType,
typename BDataType,
typename DsDataType,
typename AccDataType,
typename CDataType,
typename ALayout,
typename BLayout,
typename DsLayout,
typename ELayout,
typename CDEElementWise>
static void test_async_input_scheduler(const ck_tile::GemmHostArgs& args,
const ck_tile::stream_config& s)
{
using GemmShape = ck_tile::TileGemmShape<
ck_tile::sequence<GemmConfig::M_Tile, GemmConfig::N_Tile, GemmConfig::K_Tile>,
ck_tile::sequence<GemmConfig::M_Warp, GemmConfig::N_Warp, GemmConfig::K_Warp>,
ck_tile::
sequence<GemmConfig::M_Warp_Tile, GemmConfig::N_Warp_Tile, GemmConfig::K_Warp_Tile>,
GemmConfig::PermuteA,
GemmConfig::PermuteB>;
using TilePartitioner =
ck_tile::GemmSpatiallyLocalTilePartitioner<GemmShape,
GemmConfig::TileParitionerGroupNum,
GemmConfig::TileParitionerM01>;
using GemmUniversalTraits =
ck_tile::TileGemmUniversalTraits<GemmConfig::kPadM,
GemmConfig::kPadN,
GemmConfig::kPadK,
GemmConfig::DoubleSmemBuffer,
ALayout,
BLayout,
ELayout,
GemmConfig::TransposeC,
GemmConfig::UseStructuredSparsity,
true, // Persistent = true for async test
GemmConfig::NumWaveGroups,
GemmConfig::Preshuffle>;
constexpr auto scheduler = GemmConfig::Scheduler;
using UniversalGemmProblem = ck_tile::UniversalGemmPipelineProblem<ADataType,
BDataType,
AccDataType,
GemmShape,
GemmUniversalTraits,
scheduler>;
using GemmPipeline = typename PipelineTypeTraits<
GemmConfig::Pipeline>::template GemmPipeline<UniversalGemmProblem>;
using GemmEpilogue = ck_tile::CShuffleEpilogue<
ck_tile::CShuffleEpilogueProblem<ADataType,
BDataType,
DsDataType,
AccDataType,
CDataType,
DsLayout,
ELayout,
CDEElementWise,
TilePartitioner::MPerBlock,
TilePartitioner::NPerBlock,
GemmConfig::M_Warp,
GemmConfig::N_Warp,
GemmConfig::M_Warp_Tile,
GemmConfig::N_Warp_Tile,
GemmConfig::K_Warp_Tile,
UniversalGemmProblem::TransposeC,
GemmConfig::NumWaveGroups,
false, /*FixedVectorSize_*/
1, /*VectorSizeC_*/
false, /*TiledMMAPermuteN_*/
1, /*BlockedXDLN_PerWarp_*/
GemmConfig::DoubleSmemBuffer>>;
using Kernel = ck_tile::GemmKernel<TilePartitioner, GemmPipeline, GemmEpilogue>;
const ck_tile::index_t tiles_m =
ck_tile::integer_divide_ceil(args.M, TilePartitioner::MPerBlock);
// Balance signal granularity (smaller chunks = finer control) vs overhead (more signals)
const ck_tile::index_t tiles_per_chunk = 2;
// Shift chunk assignments to test wraparound behavior
const ck_tile::index_t tile_idx_pivot = tiles_per_chunk;
// Account for pivot when allocating signal buffer
const ck_tile::index_t num_chunks =
ck_tile::integer_divide_ceil(tiles_m + tile_idx_pivot, tiles_per_chunk);
std::cout << "Async Input Scheduler Test:" << std::endl;
std::cout << " M tiles: " << tiles_m << std::endl;
std::cout << " Tiles per chunk: " << tiles_per_chunk << std::endl;
std::cout << " Tile index pivot: " << tile_idx_pivot << std::endl;
std::cout << " Number of signal chunks: " << num_chunks << std::endl;
// Signals must start as zero so kernel blocks until producer sets them
ck_tile::DeviceMem signal_buf(num_chunks * sizeof(uint32_t));
signal_buf.SetZero();
uint32_t* d_chunk_signals = static_cast<uint32_t*>(signal_buf.GetDeviceBuffer());
// Setup async input scheduler
ck_tile::PersistentAsyncInputScheduler async_scheduler;
async_scheduler.tiles_per_chunk_m = tiles_per_chunk;
async_scheduler.chunk_signals = d_chunk_signals;
async_scheduler.tile_idx_pivot_m = tile_idx_pivot;
async_scheduler.num_chunks = num_chunks;
// Create modified host args with async scheduler
ck_tile::UniversalGemmHostArgs<1, 1, 0> host_args({args.a_ptr},
{args.b_ptr},
{},
args.e_ptr,
args.k_batch,
args.M,
args.N,
args.K,
{args.stride_A},
{args.stride_B},
{},
args.stride_E,
async_scheduler);
auto kargs = Kernel::UniversalGemmKernel::MakeKernelArgs(host_args);
const dim3 grids = Kernel::MaxOccupancyGridSize(s);
const dim3 blocks = Kernel::BlockSize();
std::cout << " Grid: {" << grids.x << ", " << grids.y << ", " << grids.z << "}"
<< std::endl;
std::cout << " Blocks: {" << blocks.x << ", " << blocks.y << ", " << blocks.z << "}"
<< std::endl;
// Separate stream prevents deadlock: kernel and signal producer must run concurrently
hipStream_t signal_stream;
HIP_CHECK_ERROR(hipStreamCreateWithFlags(&signal_stream, hipStreamNonBlocking));
const auto start = std::chrono::high_resolution_clock::now();
ck_tile::launch_kernel(
s, ck_tile::make_kernel<GemmConfig::kBlockPerCu>(Kernel{}, grids, blocks, 0, kargs));
// Simulate incremental input arrival by delaying signal activation
const int sleep_us = 100;
for(ck_tile::index_t i = 0; i < num_chunks; ++i)
{
std::this_thread::sleep_for(std::chrono::microseconds(sleep_us));
const uint32_t signal_val = 1;
HIP_CHECK_ERROR(hipMemcpyAsync(d_chunk_signals + i,
&signal_val,
sizeof(uint32_t),
hipMemcpyHostToDevice,
signal_stream));
}
HIP_CHECK_ERROR(hipStreamSynchronize(signal_stream));
HIP_CHECK_ERROR(hipStreamDestroy(signal_stream));
// Wait for kernel completion
HIP_CHECK_ERROR(hipDeviceSynchronize());
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start);
std::cout << " Total time: " << duration.count() << " us" << std::endl;
std::cout << " Sleep time: " << (num_chunks * sleep_us) << " us" << std::endl;
}
};