/** * @Description : * @Author : chenht2022 * @Date : 2024-07-17 12:25:51 * @Version : 1.0.0 * @LastEditors : chenht2022 * @LastEditTime : 2024-10-09 11:08:10 * @Copyright (c) 2024 by KVCache.AI, All Rights Reserved. **/ #include "task_queue.h" #include #include #include #include #include TaskQueue::TaskQueue() : done(false), pending(0) { Node* dummy = new Node(); head.store(dummy, std::memory_order_relaxed); tail.store(dummy, std::memory_order_relaxed); workerThread = std::thread(&TaskQueue::worker, this); } TaskQueue::~TaskQueue() { { std::lock_guard lock(mtx); done.store(true, std::memory_order_release); } cv.notify_all(); if (workerThread.joinable()) workerThread.join(); Node* node = head.load(std::memory_order_relaxed); while (node) { Node* next = node->next.load(std::memory_order_relaxed); delete node; node = next; } } void TaskQueue::enqueue(std::function task) { pending.fetch_add(1, std::memory_order_acq_rel); Node* node = new Node(task); Node* prev = tail.exchange(node, std::memory_order_acq_rel); prev->next.store(node, std::memory_order_release); { std::lock_guard lock(mtx); } cv.notify_one(); } void TaskQueue::sync(size_t allow_n_pending) { std::unique_lock lock(mtx); cv.wait(lock, [&] { return pending.load(std::memory_order_acquire) <= allow_n_pending || done.load(std::memory_order_acquire); }); } void TaskQueue::worker() { Node* curr = head.load(std::memory_order_relaxed); while (!done.load(std::memory_order_acquire)) { Node* next = curr->next.load(std::memory_order_acquire); if (next) { if (next->task) { next->task(); } delete curr; curr = next; head.store(curr, std::memory_order_release); { std::lock_guard lock(mtx); pending.fetch_sub(1, std::memory_order_acq_rel); } cv.notify_all(); } else { std::unique_lock lock(mtx); cv.wait(lock, [&] { return curr->next.load(std::memory_order_acquire) != nullptr || done.load(std::memory_order_acquire); }); } } }