[CK_TILE] Generate random tensor values with multiple threads (#3324)

This commit is contained in:
Yi DING
2025-12-09 11:02:33 +08:00
committed by GitHub
parent c363a98d41
commit c1c2e41a03
6 changed files with 286 additions and 66 deletions

View File

@@ -33,59 +33,73 @@ namespace ck_tile {
* @example
*
* // Direct usage without creating a separate variable:
* ck_tile::FillUniformDistribution<ADataType>{-1.f, 1.f}(a_host_tensor);
* ck_tile::FillUniformDistribution<>{-1.f, 1.f}(a_host_tensor);
*/
template <typename T>
template <typename T = void>
struct FillUniformDistribution
{
float a_{-5.f};
float b_{5.f};
std::optional<uint32_t> seed_{11939};
// ATTENTION: Whether to use multi-threading (note: not guaranteed to be perfectly distributed
// across threads).
bool threaded = false;
template <typename ForwardIter>
void operator()(ForwardIter first, ForwardIter last) const
{
if(threaded)
{
uint32_t num_thread = std::thread::hardware_concurrency();
auto total = static_cast<std::size_t>(std::distance(first, last));
auto work_per_thread = static_cast<std::size_t>((total + num_thread - 1) / num_thread);
if(first == last)
return;
using T_iter = std::decay_t<decltype(*first)>;
static_assert(std::is_same_v<T, T_iter> || std::is_void_v<T>,
"Iterator value type must match template type T");
constexpr auto PackedSize = numeric_traits<T_iter>::PackedSize;
const auto total = static_cast<size_t>(std::distance(first, last));
const auto total_bytes = total * sizeof(T_iter);
std::vector<joinable_thread> threads(num_thread);
for(std::size_t it = 0; it < num_thread; ++it)
{
std::size_t iw_begin = it * work_per_thread;
std::size_t iw_end = std::min((it + 1) * work_per_thread, total);
auto thread_f = [this, total, iw_begin, iw_end, &first] {
if(iw_begin > total || iw_end > total)
return;
// need to make each thread unique, add an offset to current seed
std::mt19937 gen(seed_.has_value() ? (*seed_ + iw_begin)
: std::random_device{}());
std::uniform_real_distribution<float> dis(a_, b_);
std::generate(first + iw_begin, first + iw_end, [&dis, &gen]() {
if constexpr(numeric_traits<T>::PackedSize == 2)
return ck_tile::type_convert<T>(fp32x2_t{dis(gen), dis(gen)});
else
return ck_tile::type_convert<T>(dis(gen));
});
};
threads[it] = joinable_thread(thread_f);
}
}
else
// max 80 threads; at least 2MB per thread
const size_t available_cpu_cores = get_available_cpu_cores();
const size_t num_thread =
min(80UL, available_cpu_cores, integer_divide_ceil(total_bytes, 0x200000UL));
constexpr size_t BLOCK_BYTES = 64;
constexpr size_t BLOCK_SIZE = BLOCK_BYTES / sizeof(T_iter);
const size_t num_blocks = integer_divide_ceil(total_bytes, BLOCK_BYTES);
const size_t blocks_per_thread = integer_divide_ceil(num_blocks, num_thread);
// use minstd_rand for better performance on discard()
std::minstd_rand gen(seed_.has_value() ? *seed_ : std::random_device{}());
std::uniform_real_distribution<float> dis(a_, b_);
std::vector<joinable_thread> threads;
threads.reserve(num_thread - 1); // last job run in the main thread
for(int it = num_thread - 1; it >= 0; --it)
{
std::mt19937 gen(seed_.has_value() ? *seed_ : std::random_device{}());
std::uniform_real_distribution<float> dis(a_, b_);
std::generate(first, last, [&dis, &gen]() {
if constexpr(numeric_traits<T>::PackedSize == 2)
return ck_tile::type_convert<T>(fp32x2_t{dis(gen), dis(gen)});
else
return ck_tile::type_convert<T>(dis(gen));
});
const size_t ib_begin = it * blocks_per_thread;
const size_t ib_end = min(ib_begin + blocks_per_thread, num_blocks);
auto job = [=]() {
auto g_ = gen; // copy
auto d_ = dis; // copy
g_.discard(ib_begin * BLOCK_SIZE * PackedSize);
auto t_fn = [&]() {
if constexpr(PackedSize == 2)
return type_convert<T_iter>(fp32x2_t{d_(g_), d_(g_)});
else
return type_convert<T_iter>(d_(g_));
};
size_t ib = ib_begin;
for(; ib < ib_end - 1; ++ib) // full blocks
static_for<0, BLOCK_SIZE, 1>{}([&](auto iw_) {
constexpr size_t iw = iw_.value;
*(first + ib * BLOCK_SIZE + iw) = t_fn();
});
for(size_t iw = 0; iw < BLOCK_SIZE; ++iw) // last block
if(ib * BLOCK_SIZE + iw < total)
*(first + ib * BLOCK_SIZE + iw) = t_fn();
};
if(it > 0)
threads.emplace_back(std::move(job));
else
job(); // last job run in the main thread
}
}

View File

@@ -3,6 +3,9 @@
#pragma once
#ifdef __linux__
#include <sched.h>
#endif
#include <thread>
#include <utility>
@@ -24,4 +27,50 @@ struct joinable_thread : std::thread
this->join();
}
};
inline unsigned int get_available_cpu_cores()
{
#if defined(__linux__)
cpu_set_t cpu_set;
if(sched_getaffinity(0, sizeof(cpu_set_t), &cpu_set) == 0)
{
unsigned int cpu_count = CPU_COUNT(&cpu_set);
if(cpu_count > 0)
return cpu_count;
}
#endif
// Fallback if sched_getaffinity unavailable or fails
return std::thread::hardware_concurrency();
}
class cpu_core_guard
{
#if defined(__linux__)
cpu_set_t original_cpu_set_;
public:
cpu_core_guard(unsigned int num_cores) : original_cpu_set_()
{
// save original cpu set
sched_getaffinity(0, sizeof(cpu_set_t), &original_cpu_set_);
// set new cpu set
cpu_set_t new_cpu_set;
CPU_ZERO(&new_cpu_set);
for(unsigned int i = 0; i < num_cores; ++i)
{
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wold-style-cast"
CPU_SET(i, &new_cpu_set); // NOLINT(old-style-cast)
#pragma clang diagnostic pop
}
sched_setaffinity(0, sizeof(cpu_set_t), &new_cpu_set);
}
~cpu_core_guard()
{
// restore original cpu set
sched_setaffinity(0, sizeof(cpu_set_t), &original_cpu_set_);
}
#endif
};
} // namespace ck_tile