mirror of
https://github.com/kvcache-ai/ktransformers.git
synced 2026-05-11 00:10:07 +00:00
Fix TaskQueue worker thread 100% CPU spin when idle (#1899)
* initial fix for issue 1858 * [fix]: add done flag check to sync() wait predicate to prevent deadlock during destruction Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Ben Appleby <Ben.Appleby@microsoft.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -24,7 +24,11 @@ TaskQueue::TaskQueue() : done(false), pending(0) {
|
||||
}
|
||||
|
||||
TaskQueue::~TaskQueue() {
|
||||
done.store(true, std::memory_order_release);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mtx);
|
||||
done.store(true, std::memory_order_release);
|
||||
}
|
||||
cv.notify_all();
|
||||
if (workerThread.joinable()) workerThread.join();
|
||||
|
||||
Node* node = head.load(std::memory_order_relaxed);
|
||||
@@ -40,11 +44,18 @@ void TaskQueue::enqueue(std::function<void()> task) {
|
||||
Node* node = new Node(task);
|
||||
Node* prev = tail.exchange(node, std::memory_order_acq_rel);
|
||||
prev->next.store(node, std::memory_order_release);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mtx);
|
||||
}
|
||||
cv.notify_one();
|
||||
}
|
||||
|
||||
void TaskQueue::sync(size_t allow_n_pending) {
|
||||
// Spin until the pending task count drops to the allowed threshold.
|
||||
while (pending.load(std::memory_order_acquire) > allow_n_pending);
|
||||
std::unique_lock<std::mutex> lock(mtx);
|
||||
cv.wait(lock, [&] {
|
||||
return pending.load(std::memory_order_acquire) <= allow_n_pending
|
||||
|| done.load(std::memory_order_acquire);
|
||||
});
|
||||
}
|
||||
|
||||
void TaskQueue::worker() {
|
||||
@@ -58,7 +69,17 @@ void TaskQueue::worker() {
|
||||
delete curr;
|
||||
curr = next;
|
||||
head.store(curr, std::memory_order_release);
|
||||
pending.fetch_sub(1, std::memory_order_acq_rel);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mtx);
|
||||
pending.fetch_sub(1, std::memory_order_acq_rel);
|
||||
}
|
||||
cv.notify_all();
|
||||
} else {
|
||||
std::unique_lock<std::mutex> lock(mtx);
|
||||
cv.wait(lock, [&] {
|
||||
return curr->next.load(std::memory_order_acquire) != nullptr
|
||||
|| done.load(std::memory_order_acquire);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -40,6 +40,8 @@ class TaskQueue {
|
||||
std::atomic<bool> done;
|
||||
std::atomic<size_t> pending;
|
||||
std::thread workerThread;
|
||||
std::mutex mtx;
|
||||
std::condition_variable cv;
|
||||
|
||||
void worker();
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user