#pragma once #include "server-task.h" #include #include #include #include struct server_task_multi { int id = -1; std::set subtasks_remaining; std::vector results; }; struct server_queue { int id = 0; bool running; // queues std::deque queue_tasks; std::deque queue_tasks_deferred; std::vector queue_multitasks; std::mutex mutex_tasks; std::condition_variable condition_tasks; // callback functions std::function callback_new_task; std::function callback_finish_multitask; std::function callback_update_slots; // Add a new task to the end of the queue int post(server_task task); int post(std::vector&& tasks, bool front = false); void cleanup_pending_task(int id_target); // Add a new task, but defer until one slot is available void defer(server_task&& task); // Get the next id for creating anew task int get_new_id(); // Register function to process a new task void on_new_task(std::function callback); // Register function to process a multitask when it is finished void on_finish_multitask(std::function callback) { callback_finish_multitask = std::move(callback); } // Register the function to be called when all slots data is ready to be processed void on_update_slots(std::function callback) { callback_update_slots = std::move(callback); } // Call when the state of one slot is changed void notify_slot_changed(); // end the start_loop routine void terminate() { std::unique_lock lock(mutex_tasks); running = false; condition_tasks.notify_all(); } /** * Main loop consists of these steps: * - Wait until a new task arrives * - Process the task (i.e. maybe copy data into slot) * - Check if multitask is finished * - Update all slots */ void start_loop(); // // functions to manage multitasks // // add a multitask by specifying the id of all subtask (subtask is a server_task) void add_multitask(int id_multi, std::vector& sub_ids); // updatethe remaining subtasks, while appending results to multitask void update_multitask(int id_multi, int id_sub, server_task_result& result); }; struct server_response { typedef std::function callback_multitask_t; callback_multitask_t callback_update_multitask; bool running = true; // for keeping track of all tasks waiting for the result std::set waiting_task_ids; // the main result queue (using ptr for polymorphism) std::vector queue_results; std::vector queue_results_legacy; std::mutex mutex_results; std::condition_variable condition_results; // add the id_task to the list of tasks waiting for response void add_waiting_task_id(int id_task); void add_waiting_tasks(const std::vector& tasks); // when the request is finished, we can remove task associated with it void remove_waiting_task_id(int id_task); void remove_waiting_task_ids(const std::unordered_set& id_tasks); // This function blocks the thread until there is a response for this id_task server_task_result recv(int id_task); // same as recv(), but have timeout in seconds // if timeout is reached, nullptr is returned server_task_result_ptr recv_with_timeout(const std::unordered_set& id_tasks, int timeout); // Register the function to update multitask void on_multitask_update(callback_multitask_t callback) { callback_update_multitask = std::move(callback); } // Send a new result to a waiting id_task void send(server_task_result result); void send(server_task_result_ptr&& result); // terminate the waiting loop void terminate() { running = false; condition_results.notify_all(); }; };