Fix Anthropic Messages API (#1136)

* server: stop processing the prompt when client disconnects

implement generator-based API for task results

Update httplib.h to 0.27.0

Fix embedding error

Stop prompt processing when disconnected

* Port upstream https://github.com/ggml-org/llama.cpp/pull/18551

* add back anthropic

* Fix merge issue caused by github webui

---------

Co-authored-by: firecoperana <firecoperana>
This commit is contained in:
hksdpc255
2026-01-13 17:37:29 +11:00
committed by GitHub
parent 013831bba5
commit e1c4c4a495
6 changed files with 165 additions and 28 deletions

View File

@@ -506,15 +506,29 @@ bool server_sent_event(httplib::DataSink& sink, const json& data) {
}
bool server_sent_anthropic_event(httplib::DataSink& sink, const json& data) {
const std::string str =
(data.contains("event") && data.contains("data")) ?
("event: " + data.at("event").get<std::string>() + "\n" +
"data: " + data.at("data").dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n") :
("data: " + data.at("data").dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n");
static auto send_single = [](httplib::DataSink& sink, const json& data) -> bool {
const std::string str =
(data.contains("event") && data.contains("data")) ?
("event: " + data.at("event").get<std::string>() + "\n" +
"data: " + data.at("data").dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n") :
("data: " + data.at("data").dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n");
LOG_VERBOSE("data stream, to_send: %s", str.c_str());
LOG_DBG("data stream, to_send: %s", str.c_str());
return sink.write(str.c_str(), str.size());
};
return sink.write(str.c_str(), str.size());
if (data.is_array()) {
for (const auto& item : data) {
if (!send_single(sink, item)) {
return false;
}
}
}
else {
return send_single(sink, data);
}
return true;
}
//

View File

@@ -1482,7 +1482,7 @@ void server_context::send_partial_response(server_slot& slot, completion_token_o
res->content = tkn.text_to_send;
res->post_sampling_probs = slot.params.post_sampling_probs;
res->oaicompat = slot.params.oaicompat;
res->oaicompat_model = slot.params.oaicompat_model;
res->oaicompat_model = slot.task->params.oaicompat_model;
res->oaicompat_cmpl_id = slot.params.oaicompat_cmpl_id;
res->n_decoded = slot.n_decoded;
res->n_prompt_tokens = slot.n_prompt_tokens;
@@ -1494,6 +1494,20 @@ void server_context::send_partial_response(server_slot& slot, completion_token_o
};
slot.update_chat_msg(res->oaicompat_msg_diffs);
res->anthropic_has_reasoning = !slot.chat_msg.reasoning_content.empty();
res->anthropic_thinking_block_started = slot.anthropic_thinking_block_started;
res->anthropic_text_block_started = slot.anthropic_text_block_started;
for (const auto& diff : res->oaicompat_msg_diffs) {
if (!diff.reasoning_content_delta.empty() && !slot.anthropic_thinking_block_started) {
slot.anthropic_thinking_block_started = true;
}
if (!diff.content_delta.empty() && !slot.anthropic_text_block_started) {
slot.anthropic_text_block_started = true;
}
}
// populate res->probs_output
if (slot.sparams.n_probs > 0) {
res->probs_output = { tkn }; // copy the token probs

View File

@@ -100,6 +100,9 @@ struct server_slot {
common_chat_format chat_format = COMMON_CHAT_FORMAT_CONTENT_ONLY;
std::vector<std::string> generated_tool_call_ids;
bool anthropic_thinking_block_started = false;
bool anthropic_text_block_started = false;
int32_t ga_i = 0; // group-attention state
int32_t ga_n = 1; // group-attention factor
int32_t ga_w = 512; // group-attention width

View File

@@ -353,6 +353,13 @@ json server_task_result_cmpl_final::to_json_anthropic_final() {
msg.content = content;
}
if (!msg.reasoning_content.empty()) {
content_blocks.push_back({
{"type", "thinking"},
{"thinking", msg.reasoning_content},
{"signature", ""}
});
}
if (!msg.content.empty()) {
content_blocks.push_back({
@@ -403,13 +410,46 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() {
stop_reason = oaicompat_msg.tool_calls.empty() ? "end_turn" : "tool_use";
}
bool has_thinking = !oaicompat_msg.reasoning_content.empty();
bool has_text = !oaicompat_msg.content.empty();
size_t num_tool_calls = oaicompat_msg.tool_calls.size();
size_t thinking_block_index = 0;
size_t text_block_index = has_thinking ? 1 : 0;
bool thinking_block_started = false;
bool text_block_started = false;
std::set<size_t> tool_calls_started;
for (const auto& diff : oaicompat_msg_diffs) {
if (!diff.reasoning_content_delta.empty()) {
if (!thinking_block_started) {
events.push_back({
{"event", "content_block_start"},
{"data", {
{"type", "content_block_start"},
{"index", thinking_block_index},
{"content_block", {
{"type", "thinking"},
{"thinking", ""}
}}
}}
});
thinking_block_started = true;
}
events.push_back({
{"event", "content_block_delta"},
{"data", {
{"type", "content_block_delta"},
{"index", thinking_block_index},
{"delta", {
{"type", "thinking_delta"},
{"thinking", diff.reasoning_content_delta}
}}
}}
});
}
if (!diff.content_delta.empty()) {
if (!text_block_started) {
@@ -417,7 +457,7 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() {
{"event", "content_block_start"},
{"data", {
{"type", "content_block_start"},
{"index", 0},
{"index", text_block_index},
{"content_block", {
{"type", "text"},
{"text", ""}
@@ -431,7 +471,7 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() {
{"event", "content_block_delta"},
{"data", {
{"type", "content_block_delta"},
{"index", 0},
{"index", text_block_index},
{"delta", {
{"type", "text_delta"},
{"text", diff.content_delta}
@@ -441,7 +481,7 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() {
}
if (diff.tool_call_index != std::string::npos) {
size_t content_block_index = (has_text ? 1 : 0) + diff.tool_call_index;
size_t content_block_index = (has_thinking ? 1 : 0) + (has_text ? 1 : 0) + diff.tool_call_index;
if (tool_calls_started.find(diff.tool_call_index) == tool_calls_started.end()) {
const auto& full_tool_call = oaicompat_msg.tool_calls[diff.tool_call_index];
@@ -477,18 +517,39 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() {
}
}
if (has_thinking) {
events.push_back({
{"event", "content_block_delta"},
{"data", {
{"type", "content_block_delta"},
{"index", thinking_block_index},
{"delta", {
{"type", "signature_delta"},
{"signature", ""}
}}
}}
});
events.push_back({
{"event", "content_block_stop"},
{"data", {
{"type", "content_block_stop"},
{"index", thinking_block_index}
}}
});
}
if (has_text) {
events.push_back({
{"event", "content_block_stop"},
{"data", {
{"type", "content_block_stop"},
{"index", 0}
{"index", text_block_index}
}}
});
}
for (size_t i = 0; i < num_tool_calls; i++) {
size_t content_block_index = (has_text ? 1 : 0) + i;
size_t content_block_index = (has_thinking ? 1 : 0) + (has_text ? 1 : 0) + i;
events.push_back({
{"event", "content_block_stop"},
{"data", {
@@ -534,11 +595,14 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() {
json server_task_result_cmpl_partial::to_json_anthropic_partial() {
json events = json::array();
bool first = n_decoded == 1;
static bool text_block_started = false;
size_t thinking_block_index = 0;
size_t text_block_index = anthropic_has_reasoning ? 1 : 0;
bool thinking_started = anthropic_thinking_block_started;
bool text_started = anthropic_text_block_started;
if (first) {
text_block_started = false;
events.push_back({
{"event", "message_start"},
{"data", {
@@ -561,27 +625,56 @@ json server_task_result_cmpl_partial::to_json_anthropic_partial() {
}
for (const auto& diff : oaicompat_msg_diffs) {
if (!diff.content_delta.empty()) {
if (!text_block_started) {
if (!diff.reasoning_content_delta.empty()) {
if (!thinking_started) {
events.push_back({
{"event", "content_block_start"},
{"data", {
{"type", "content_block_start"},
{"index", 0},
{"index", thinking_block_index},
{"content_block", {
{"type", "text"},
{"text", ""}
{"type", "thinking"},
{"thinking", ""}
}}
}}
});
text_block_started = true;
thinking_started = true;
}
events.push_back({
{"event", "content_block_delta"},
{"data", {
{"type", "content_block_delta"},
{"index", 0},
{"index", thinking_block_index},
{"delta", {
{"type", "thinking_delta"},
{"thinking", diff.reasoning_content_delta}
}}
}}
});
}
if (!diff.content_delta.empty()) {
if (!text_started) {
events.push_back({
{"event", "content_block_start"},
{"data", {
{"type", "content_block_start"},
{"index", text_block_index},
{"content_block", {
{"type", "text"},
{"text", ""}
}}
}}
});
text_started = true;
}
events.push_back({
{"event", "content_block_delta"},
{"data", {
{"type", "content_block_delta"},
{"index", text_block_index},
{"delta", {
{"type", "text_delta"},
{"text", diff.content_delta}
@@ -591,7 +684,7 @@ json server_task_result_cmpl_partial::to_json_anthropic_partial() {
}
if (diff.tool_call_index != std::string::npos) {
size_t content_block_index = (text_block_started ? 1 : 0) + diff.tool_call_index;
size_t content_block_index = (anthropic_has_reasoning ? 1 : 0) + (text_started ? 1 : 0) + diff.tool_call_index;
if (!diff.tool_call_delta.name.empty()) {
events.push_back({

View File

@@ -200,6 +200,10 @@ struct server_task_result {
};
struct server_task_result_cmpl_partial : server_task_result {
bool anthropic_has_reasoning = false;
bool anthropic_thinking_block_started = false;
bool anthropic_text_block_started = false;
virtual bool is_stop() override {
return false; // in stream mode, partial responses are not considered stop
}

View File

@@ -1088,6 +1088,7 @@ int main(int argc, char ** argv) {
// OAI-compat
task.params.oaicompat = oaicompat;
task.params.oaicompat_cmpl_id = completion_id;
task.params.oaicompat_model = get_model_name(ctx_server.params.model);
tasks.push_back(std::move(task));
}
@@ -1141,9 +1142,17 @@ int main(int argc, char ** argv) {
// next responses are streamed
json first_result_json = first_result->to_json();
const auto chunked_content_provider = [first_result_json, rd, oaicompat](size_t, httplib::DataSink& sink) mutable -> bool {
const auto sse = [oaicompat, &sink](const json& res) {
if (oaicompat == OAICOMPAT_TYPE_ANTHROPIC) {
return server_sent_anthropic_event(sink, res);
}
else {
return server_sent_event(sink, res);
}
};
// flush the first result as it's not an error
if (!first_result_json.empty()) {
if (!server_sent_event(sink, first_result_json)) {
if (!sse(first_result_json)) {
sink.done();
return false; // sending failed, go to on_complete()
}
@@ -1161,7 +1170,7 @@ int main(int argc, char ** argv) {
json res_json = result->to_json();
bool ok = false;
if (result->is_error()) {
ok = server_sent_event(sink, json{ { "error", result->to_json() } });
ok = sse(json{ { "error", result->to_json() } });
sink.done();
return false; // go to on_complete()
}
@@ -1170,7 +1179,7 @@ int main(int argc, char ** argv) {
dynamic_cast<server_task_result_cmpl_partial*>(result.get()) != nullptr
|| dynamic_cast<server_task_result_cmpl_final*>(result.get()) != nullptr
);
ok = server_sent_event(sink, res_json);
ok = sse(res_json);
}
if (!ok) {
@@ -1180,7 +1189,7 @@ int main(int argc, char ** argv) {
// check if there is more data
if (!rd->has_next()) {
if (oaicompat != OAICOMPAT_TYPE_NONE) {
if (oaicompat != OAICOMPAT_TYPE_ANTHROPIC && oaicompat != OAICOMPAT_TYPE_NONE) {
static const std::string ev_done = "data: [DONE]\n\n";
sink.write(ev_done.data(), ev_done.size());
}