Files
ik_llama.cpp/examples/server/server-queue.h
firecoperana 1a461525d5 server: stop processing the prompt when client disconnects (#1134)
implement generator-based API for task results

Update httplib.h to 0.27.0

Fix embedding error

Stop prompt processing when disconnected

Co-authored-by: firecoperana <firecoperana>
2026-01-13 07:56:59 +02:00

141 lines
4.1 KiB
C++

#pragma once
#include "server-task.h"
#include <condition_variable>
#include <deque>
#include <mutex>
#include <unordered_set>
struct server_task_multi {
int id = -1;
std::set<int> subtasks_remaining;
std::vector<server_task_result> results;
};
struct server_queue {
int id = 0;
bool running;
// queues
std::deque<server_task> queue_tasks;
std::deque<server_task> queue_tasks_deferred;
std::vector<server_task_multi> queue_multitasks;
std::mutex mutex_tasks;
std::condition_variable condition_tasks;
// callback functions
std::function<void(server_task &&)> callback_new_task;
std::function<void(server_task_multi &)> callback_finish_multitask;
std::function<void(void)> callback_update_slots;
// Add a new task to the end of the queue
int post(server_task task);
int post(std::vector<server_task>&& tasks, bool front = false);
void cleanup_pending_task(int id_target);
// Add a new task, but defer until one slot is available
void defer(server_task&& task);
// Get the next id for creating anew task
int get_new_id();
// Register function to process a new task
void on_new_task(std::function<void(server_task&&)> callback);
// Register function to process a multitask when it is finished
void on_finish_multitask(std::function<void(server_task_multi&)> callback) {
callback_finish_multitask = std::move(callback);
}
// Register the function to be called when all slots data is ready to be processed
void on_update_slots(std::function<void(void)> callback) {
callback_update_slots = std::move(callback);
}
// Call when the state of one slot is changed
void notify_slot_changed();
// end the start_loop routine
void terminate() {
std::unique_lock<std::mutex> lock(mutex_tasks);
running = false;
condition_tasks.notify_all();
}
/**
* Main loop consists of these steps:
* - Wait until a new task arrives
* - Process the task (i.e. maybe copy data into slot)
* - Check if multitask is finished
* - Update all slots
*/
void start_loop();
//
// functions to manage multitasks
//
// add a multitask by specifying the id of all subtask (subtask is a server_task)
void add_multitask(int id_multi, std::vector<int>& sub_ids);
// updatethe remaining subtasks, while appending results to multitask
void update_multitask(int id_multi, int id_sub, server_task_result& result);
};
struct server_response {
typedef std::function<void(int, int, server_task_result&)> callback_multitask_t;
callback_multitask_t callback_update_multitask;
bool running = true;
// for keeping track of all tasks waiting for the result
std::set<int> waiting_task_ids;
// the main result queue (using ptr for polymorphism)
std::vector<server_task_result_ptr> queue_results;
std::vector<server_task_result> queue_results_legacy;
std::mutex mutex_results;
std::condition_variable condition_results;
// add the id_task to the list of tasks waiting for response
void add_waiting_task_id(int id_task);
void add_waiting_tasks(const std::vector<server_task>& tasks);
// when the request is finished, we can remove task associated with it
void remove_waiting_task_id(int id_task);
void remove_waiting_task_ids(const std::unordered_set<int>& id_tasks);
// This function blocks the thread until there is a response for this id_task
server_task_result recv(int id_task);
// same as recv(), but have timeout in seconds
// if timeout is reached, nullptr is returned
server_task_result_ptr recv_with_timeout(const std::unordered_set<int>& id_tasks, int timeout);
// Register the function to update multitask
void on_multitask_update(callback_multitask_t callback) {
callback_update_multitask = std::move(callback);
}
// Send a new result to a waiting id_task
void send(server_task_result result);
void send(server_task_result_ptr&& result);
// terminate the waiting loop
void terminate() {
running = false;
condition_results.notify_all();
};
};