Revert "sync from b7516"

This reverts commit 6ee41dd9e3.
This commit is contained in:
2026-01-16 11:34:48 +08:00
parent 6ee41dd9e3
commit 620f04050b
380 changed files with 38842 additions and 18471 deletions

View File

@@ -4,7 +4,6 @@
#include "server-task.h"
#include "server-queue.h"
#include "arg.h"
#include "common.h"
#include "llama.h"
#include "log.h"
@@ -16,7 +15,6 @@
#include <cstddef>
#include <cinttypes>
#include <memory>
#include <unordered_set>
#include <filesystem>
// fix problem with std::min and std::max
@@ -47,26 +45,6 @@ enum server_state {
SERVER_STATE_READY, // Server is ready and model is loaded
};
static bool server_task_type_need_embd(server_task_type task_type) {
switch (task_type) {
case SERVER_TASK_TYPE_EMBEDDING:
case SERVER_TASK_TYPE_RERANK:
return true;
default:
return false;
}
}
static bool server_task_type_need_logits(server_task_type task_type) {
switch (task_type) {
case SERVER_TASK_TYPE_COMPLETION:
case SERVER_TASK_TYPE_INFILL:
return true;
default:
return false;
}
}
struct server_slot {
int id;
@@ -81,6 +59,8 @@ struct server_slot {
common_speculative * spec = nullptr;
// TODO: move members that belong to the task (such as `generated_text`, `has_new_line`) to task_results_state
// see https://github.com/ggml-org/llama.cpp/pull/18283#issuecomment-3710175837
std::unique_ptr<const server_task> task;
std::unique_ptr<const server_task> task_prev; // used for debugging
@@ -147,6 +127,17 @@ struct server_slot {
return res;
}
void prompt_clear(bool allow_processing) {
if (!allow_processing) {
GGML_ASSERT(!is_processing());
}
SLT_INF(*this, "clearing prompt with %zu tokens\n", prompt.tokens.size());
llama_memory_seq_rm(llama_get_memory(ctx), id, -1, -1);
prompt.tokens.clear();
}
std::vector<common_adapter_lora_info> lora;
int32_t alora_invocation_start = -1;
@@ -155,7 +146,7 @@ struct server_slot {
common_sampler_ptr smpl;
llama_token sampled; // in speculative mode, this is the last accepted token
llama_token sampled; // in speculative mode, this is the last accepted token
llama_tokens drafted;
// stats
@@ -167,7 +158,7 @@ struct server_slot {
double t_prompt_processing; // ms
double t_token_generation; // ms
std::function<void(int)> callback_on_release;
std::function<void(int /* slot_id */)> callback_on_release;
// Speculative decoding stats
int32_t n_draft_total = 0; // Total draft tokens generated
@@ -196,30 +187,46 @@ struct server_slot {
n_draft_total = 0;
n_draft_accepted = 0;
task_prev = std::move(task);
task.reset();
task_prev.reset();
llama_set_sampler(ctx, id, nullptr);
// clear alora start
alora_invocation_start = -1;
}
bool need_embd() const {
GGML_ASSERT(task);
void init_sampler() const {
common_sampler_reset(smpl.get());
return server_task_type_need_embd(task->type);
}
if (!task->need_sampling()) {
return;
}
bool need_logits() const {
GGML_ASSERT(task);
const int64_t t_start = ggml_time_us();
return server_task_type_need_logits(task->type);
int n_text = 0;
for (int i = 0; i < (int) prompt.tokens.size(); i++) {
const llama_token id = prompt.tokens[i];
if (id != LLAMA_TOKEN_NULL) {
common_sampler_accept(smpl.get(), id, false);
n_text++;
}
}
SLT_INF(*this, "init sampler, took %0.2f ms, tokens: text = %d, total = %d\n",
(ggml_time_us() - t_start) / 1000.0, n_text, (int) prompt.tokens.size());
}
// if the context does not have a memory module then all embeddings have to be computed within a single ubatch
// also we cannot split if the pooling would require any past tokens
bool can_split() const {
GGML_ASSERT(task);
return
!need_embd() ||
!task->need_embd() ||
(llama_get_memory(ctx) && llama_pooling_type(ctx) == LLAMA_POOLING_TYPE_LAST);
}
@@ -260,10 +267,13 @@ struct server_slot {
SLT_WRN(*this, "%s", "slot is not processing\n");
return;
}
generated_token_probs.push_back(token);
}
int get_n_draft_max() const {
GGML_ASSERT(task);
if (!can_speculate()) {
return 0;
}
@@ -288,27 +298,23 @@ struct server_slot {
return n_draft_max;
}
// note: a slot can also be either a parent or a child
bool is_parent() const {
return is_processing() && task->n_children > 0;
}
bool is_child() const {
return is_processing() && task->id_parent >= 0;
}
void release() {
if (is_processing()) {
GGML_ASSERT(task);
SLT_INF(*this, "stop processing: n_tokens = %d, truncated = %d\n", prompt.n_tokens(), truncated);
t_last_used = ggml_time_us();
t_last_used = ggml_time_us();
t_token_generation = (ggml_time_us() - t_start_generation) / 1e3;
state = SLOT_STATE_IDLE;
task_prev = std::move(task);
task.reset();
// do not keep context of the child slots - the parent's context is enough
if (task->is_child()) {
prompt_clear(false);
}
reset();
callback_on_release(id);
}
@@ -427,14 +433,22 @@ struct server_slot {
}
void copy_state_to(server_slot & other) const {
llama_memory_seq_rm(llama_get_memory(ctx), other.id, 0, -1);
llama_memory_seq_cp(llama_get_memory(ctx), id, other.id, 0, -1);
GGML_ASSERT(state == SLOT_STATE_DONE_PROMPT);
llama_memory_seq_rm(llama_get_memory(ctx), other.id, -1, -1);
llama_memory_seq_cp(llama_get_memory(ctx), id, other.id, -1, -1);
other.n_decoded = n_decoded;
other.n_remaining = n_remaining;
other.i_batch = i_batch;
other.t_start_process_prompt = t_start_process_prompt;
other.t_prompt_processing = t_prompt_processing;
other.n_prompt_tokens_cache = n_prompt_tokens_cache;
other.n_prompt_tokens_processed = n_prompt_tokens_processed;
other.prompt = prompt.clone();
other.init_sampler();
}
};
@@ -747,6 +761,8 @@ private:
}
slots.clear();
// initialize slots
for (int i = 0; i < params_base.n_parallel; i++) {
server_slot slot;
@@ -778,8 +794,8 @@ private:
SLT_INF(slot, "new slot, n_ctx = %d\n", slot.n_ctx);
slot.callback_on_release = [this](int) {
queue_tasks.pop_deferred_task();
slot.callback_on_release = [this](int slot_id) {
queue_tasks.pop_deferred_task(slot_id);
};
slot.reset();
@@ -893,9 +909,9 @@ private:
return true;
}
server_slot * get_slot_by_id(int id) {
server_slot * get_slot_by_id(int id_slot) {
for (server_slot & slot : slots) {
if (slot.id == id) {
if (slot.id == id_slot) {
return &slot;
}
}
@@ -995,7 +1011,7 @@ private:
ret->prompt_save(*prompt_cache);
if (!ret->prompt_load(*prompt_cache, task.tokens)) {
clear_slot(*ret);
ret->prompt_clear(false);
}
prompt_cache->update();
@@ -1007,15 +1023,6 @@ private:
return ret;
}
void clear_slot(server_slot & slot) const {
GGML_ASSERT(!slot.is_processing());
SLT_WRN(slot, "clearing slot with %zu tokens\n", slot.prompt.tokens.size());
llama_memory_seq_rm(llama_get_memory(ctx), slot.id, -1, -1);
slot.prompt.tokens.clear();
}
// return true if at least one slot has been cleared
// TODO: improve logic
// - smarter decision which slot to clear (LRU or longest prompt?)
@@ -1036,7 +1043,7 @@ private:
if (slot.prompt.n_tokens() > 0) {
SRV_WRN("purging slot %d with %zu tokens\n", slot.id, slot.prompt.tokens.size());
clear_slot(slot);
slot.prompt_clear(false);
res = true;
@@ -1062,8 +1069,6 @@ private:
}
bool launch_slot_with_task(server_slot & slot, server_task && task) {
slot.reset();
// process per-request lora adapters
if (!task.params.lora.empty()) {
auto task_loras = construct_lora_list(task.params.lora);
@@ -1137,7 +1142,7 @@ private:
SLT_DBG(slot, "launching slot : %s\n", safe_json_to_str(slot.to_json()).c_str());
// initialize samplers
{
if (task.need_sampling()) {
slot.smpl.reset(common_sampler_init(model, task.params.sampling));
if (slot.smpl == nullptr) {
@@ -1146,7 +1151,28 @@ private:
return false;
}
const bool need_logits = task.params.sampling.n_probs > 0;
bool backend_sampling = true;
backend_sampling &= task.params.sampling.backend_sampling;
// TODO: speculative decoding requires multiple samples per batch - not supported yet
backend_sampling &= !(slot.ctx_dft && task.params.speculative.n_max > 0);
// TODO: getting post/pre sampling logits is not yet supported with backend sampling
backend_sampling &= !need_logits;
// TODO: tmp until backend sampling is fully implemented
if (backend_sampling) {
llama_set_sampler(ctx, slot.id, common_sampler_get(slot.smpl.get()));
} else {
llama_set_sampler(ctx, slot.id, nullptr);
}
SLT_INF(slot, "sampler chain: %s\n", common_sampler_print(slot.smpl.get()).c_str());
} else {
slot.smpl.reset();
}
// initialize draft batch
@@ -1159,12 +1185,11 @@ private:
slot.task = std::make_unique<const server_task>(std::move(task));
slot.state = slot.is_child()
slot.state = slot.task->is_child()
? SLOT_STATE_WAIT_OTHER // wait for the parent to process prompt
: SLOT_STATE_STARTED;
SLT_INF(slot, "%s", "processing task\n");
SLT_INF(slot, "processing task, is_child = %d\n", slot.task->is_child());
return true;
}
@@ -1484,9 +1509,9 @@ private:
res->n_tokens = slot.task->n_tokens();
res->res_type = slot.task->params.res_type;
const int n_embd = llama_model_n_embd(model);
const int n_embd_out = llama_model_n_embd_out(model);
std::vector<float> embd_res(n_embd, 0.0f);
std::vector<float> embd_res(n_embd_out, 0.0f);
for (int i = 0; i < batch.n_tokens; ++i) {
if (!batch.logits[i] || batch.seq_id[i][0] != slot.id) {
@@ -1503,18 +1528,18 @@ private:
if (embd == nullptr) {
SLT_ERR(slot, "failed to get embeddings, token = %d, seq_id = %d\n", batch.token[i], batch.seq_id[i][0]);
res->embedding.push_back(std::vector<float>(n_embd, 0.0f));
res->embedding.push_back(std::vector<float>(n_embd_out, 0.0f));
continue;
}
// normalize only when there is pooling
if (llama_pooling_type(slot.ctx) != LLAMA_POOLING_TYPE_NONE) {
common_embd_normalize(embd, embd_res.data(), n_embd, slot.task->params.embd_normalize);
common_embd_normalize(embd, embd_res.data(), n_embd_out, slot.task->params.embd_normalize);
res->embedding.push_back(embd_res);
break;
}
res->embedding.emplace_back(embd, embd + n_embd);
res->embedding.emplace_back(embd, embd + n_embd_out);
}
SLT_DBG(slot, "%s", "sending embeddings\n");
@@ -1559,9 +1584,7 @@ private:
// tokenize the input if it's set by CLI, return false on error
bool tokenize_cli_input(server_task & task) {
if (task.cli_input == nullptr) {
return true; // nothing to do
}
GGML_ASSERT(task.cli_input != nullptr);
try {
auto & opt = oai_parser_opt;
common_chat_templates_inputs inputs;
@@ -1595,6 +1618,64 @@ private:
return true;
}
std::vector<server_slot *> get_free_slots(size_t n_slots_needed, int exclude_id_slot) {
std::vector<server_slot *> free_slots;
for (auto & slot : slots) {
if (!slot.is_processing() && slot.id != exclude_id_slot) {
free_slots.push_back(&slot);
}
if (free_slots.size() >= n_slots_needed) {
break;
}
}
return free_slots;
}
// launch multiple slots for parent + child tasks
bool launch_slots_with_parent_task(server_slot & parent_slot, std::vector<server_slot *> & child_slots, server_task && parent_task) {
GGML_ASSERT(!parent_slot.is_processing());
GGML_ASSERT(parent_task.is_parent());
GGML_ASSERT(child_slots.size() == parent_task.child_tasks.size());
int id_parent = parent_task.id;
SRV_INF("launching slots for parent task id_task = %d with %zu child tasks\n", id_parent, parent_task.child_tasks.size());
// to be called in case of failure to release all launched slots
auto release_slots = [this, id_parent]() {
for (auto & slot : slots) {
if (slot.is_processing() && (
slot.task->id == id_parent ||
slot.task->id_parent == id_parent
)) {
slot.release();
}
}
};
// launch all child tasks first
size_t idx = 0;
GGML_ASSERT(child_slots.size() == parent_task.child_tasks.size());
for (auto * slot : child_slots) {
int id_child = parent_task.child_tasks[idx].id;
if (!launch_slot_with_task(*slot, std::move(parent_task.child_tasks[idx]))) {
SRV_ERR("failed to launch slot with child task, id_task = %d\n", id_child);
release_slots();
return false;
}
idx++;
}
// finally, launch the parent task
if (!launch_slot_with_task(parent_slot, std::move(parent_task))) {
SRV_ERR("failed to launch slot with task, id_task = %d\n", id_parent);
release_slots();
return false;
}
return true;
}
void process_single_task(server_task && task) {
switch (task.type) {
case SERVER_TASK_TYPE_COMPLETION:
@@ -1602,31 +1683,55 @@ private:
case SERVER_TASK_TYPE_EMBEDDING:
case SERVER_TASK_TYPE_RERANK:
{
if (!tokenize_cli_input(task)) {
break;
// special case: if input is provided via CLI, tokenize it first
// otherwise, no need to tokenize as it's already done inside the HTTP thread
if (task.cli_input != nullptr) {
if (!tokenize_cli_input(task)) {
break;
}
}
const int id_slot = task.id_slot;
const int id_task = task.id;
server_slot * slot = id_slot != -1 ? get_slot_by_id(id_slot) : get_available_slot(task);
server_slot * slot = id_slot != -1
? get_slot_by_id(id_slot)
: get_available_slot(task);
//
// slot scheduling logic
//
if (slot == nullptr) {
// if no slot is available, we defer this task for processing later
SRV_DBG("no slot is available, defer task, id_task = %d\n", task.id);
SRV_DBG("no slot is available, defer task, id_task = %d\n", id_task);
queue_tasks.defer(std::move(task));
break;
}
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id);
SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", id_task);
queue_tasks.defer(std::move(task));
break;
}
if (!launch_slot_with_task(*slot, std::move(task))) {
SRV_ERR("failed to launch slot with task, id_task = %d\n", task.id);
break;
if (task.is_parent()) {
// try getting free slots for all child tasks
size_t n_child_tasks = task.child_tasks.size();
std::vector<server_slot *> child_slots = get_free_slots(n_child_tasks, slot->id);
if (child_slots.size() < n_child_tasks) {
SRV_DBG("not enough free slots for child tasks, n_free = %zu, n_children = %zu, defer task, id_task = %d\n", child_slots.size(), n_child_tasks, id_task);
queue_tasks.defer(std::move(task));
break;
}
if (!launch_slots_with_parent_task(*slot, child_slots, std::move(task))) {
SRV_ERR("failed to launch slot with parent task, id_task = %d\n", id_task);
break; // drop the task
}
} else if (!launch_slot_with_task(*slot, std::move(task))) {
SRV_ERR("failed to launch slot with task, id_task = %d\n", id_task);
break; // drop the task
}
} break;
case SERVER_TASK_TYPE_CANCEL:
@@ -1800,7 +1905,7 @@ private:
// Erase token cache
const size_t n_erased = slot->prompt.tokens.size();
clear_slot(*slot);
slot->prompt_clear(false);
auto res = std::make_unique<server_task_result_slot_erase>();
res->id = task.id;
@@ -1895,7 +2000,7 @@ private:
GGML_ABORT("not supported by multimodal");
}
if (slot.is_parent() || slot.is_child()) {
if (slot.task->is_parent() || slot.task->is_child()) {
send_error(slot, "context shift cannot be used for shared prompt", ERROR_TYPE_SERVER);
slot.release();
continue;
@@ -2034,6 +2139,12 @@ private:
continue;
}
// check if this is a child slot
if (slot.state == SLOT_STATE_WAIT_OTHER) {
SLT_DBG(slot, "%s", "waiting for parent slot to complete\n");
continue;
}
// this slot still has a prompt to be processed
if (slot.state == SLOT_STATE_PROCESSING_PROMPT || slot.state == SLOT_STATE_STARTED) {
const auto & input_tokens = slot.task->tokens;
@@ -2076,7 +2187,7 @@ private:
}
// TODO: support memory-less logits computation
if (slot.need_logits() && !llama_get_memory(ctx)) {
if (slot.task->need_logits() && !llama_get_memory(ctx)) {
send_error(slot, "the current context does not logits computation. skipping", ERROR_TYPE_SERVER);
slot.release();
continue;
@@ -2313,6 +2424,12 @@ private:
slot.n_prompt_tokens_processed = 0;
slot.prompt.tokens.keep_first(n_past);
// send initial 0% progress update if needed
// this is to signal the client that the request has started processing
if (slot.task->params.stream && slot.task->params.return_progress) {
send_partial_response(slot, {}, true);
}
}
if (!slot.can_split()) {
@@ -2330,7 +2447,7 @@ private:
if (!llama_memory_seq_rm(llama_get_memory(ctx), slot.id, p0, -1)) {
SLT_WRN(slot, "failed to truncate tokens with position >= %d - clearing the memory\n", p0);
clear_slot(slot);
slot.prompt_clear(true);
// there is no common part left
slot.n_prompt_tokens_cache = 0;
@@ -2409,7 +2526,7 @@ private:
cur_tok,
slot.prompt.tokens.pos_next(),
{ slot.id },
slot.need_embd());
slot.task->need_embd());
slot.prompt.tokens.push_back(cur_tok);
slot.n_prompt_tokens_processed++;
@@ -2430,16 +2547,6 @@ private:
GGML_ASSERT(batch.n_tokens > 0);
common_sampler_reset(slot.smpl.get());
// Process all prompt tokens through sampler system
for (int i = 0; i < slot.task->n_tokens(); ++i) {
llama_token id = input_tokens[i];
if (id != LLAMA_TOKEN_NULL) {
common_sampler_accept(slot.smpl.get(), id, false);
}
}
// extract the logits only for the last token
batch.logits[batch.n_tokens - 1] = true;
@@ -2448,6 +2555,8 @@ private:
SLT_INF(slot, "prompt done, n_tokens = %d, batch.n_tokens = %d\n", slot.prompt.n_tokens(), batch.n_tokens);
slot.init_sampler();
const auto pos_min = llama_memory_seq_pos_min(llama_get_memory(ctx), slot.id);
const auto pos_max = llama_memory_seq_pos_max(llama_get_memory(ctx), slot.id);
@@ -2494,11 +2603,6 @@ private:
}
}
if (batch.n_tokens == 0) {
SRV_WRN("%s", "no tokens to decode\n");
return;
}
SRV_DBG("decoding batch, n_tokens = %d\n", batch.n_tokens);
if (slot_batched) {
@@ -2512,7 +2616,11 @@ private:
slot_batched->lora[alora_disabled_id].scale = alora_scale;
}
llama_set_embeddings(ctx, slot_batched->need_embd());
llama_set_embeddings(ctx, slot_batched->task->need_embd());
}
if (batch.n_tokens == 0) {
SRV_WRN("%s", "no tokens to decode\n");
}
int32_t i_next = 0;
@@ -2566,7 +2674,7 @@ private:
// note: it's complicated to keep track of how much of the current batch has been
// processed before the error occurred, so we simply clear the entire context
clear_slot(slot);
slot.prompt_clear(false);
}
}
@@ -2590,31 +2698,30 @@ private:
// on successful decode, restore the original batch size
n_batch = llama_n_batch(ctx);
// technically, measuring the time here excludes the sampling time for the last batch
// but on the other hand, we don't want to do too many system calls to measure the time, so it's ok
const int64_t t_current = ggml_time_us();
// handle `n_cmpl > 1` tasks - when the main prompt is processed, activate all child tasks too
for (auto & slot : slots) {
// may need to copy state to other slots
if (slot.state == SLOT_STATE_DONE_PROMPT && slot.is_parent()) {
std::vector<server_slot *> child_slots;
if (slot.state == SLOT_STATE_DONE_PROMPT && slot.task->is_parent()) {
std::vector<server_slot *> children;
for (auto & other : slots) {
if (other.state == SLOT_STATE_WAIT_OTHER && slot.task->id == other.task->id_parent) {
child_slots.push_back(&other);
children.push_back(&other);
}
}
// we can only proceed if all child slots are having the correct tasks
if (child_slots.size() == slot.task->n_children) {
// copy state to the child slots
for (auto & child : child_slots) {
SLT_INF(slot, "copying state to child %d\n", child->id);
slot.copy_state_to(*child);
child->state = SLOT_STATE_DONE_PROMPT;
}
// all children slots should already launched by launch_slots_with_parent_task()
// copy state to the child slots
for (auto & child : children) {
SLT_INF(slot, " - copying state to child %d\n", child->id);
GGML_ASSERT(child->state == SLOT_STATE_WAIT_OTHER);
slot.copy_state_to(*child);
child->state = SLOT_STATE_DONE_PROMPT;
}
}
}
for (auto & slot : slots) {
// optionally send prompt processing progress
if (slot.state == SLOT_STATE_PROCESSING_PROMPT || slot.state == SLOT_STATE_DONE_PROMPT) {
if (slot.task->params.stream && slot.task->params.return_progress) {
@@ -2642,6 +2749,8 @@ private:
continue; // continue loop of slots
}
GGML_ASSERT(slot.task->need_sampling());
// prompt evaluated for next-token prediction
slot.state = SLOT_STATE_GENERATING;
} else if (slot.state != SLOT_STATE_GENERATING) {
@@ -2660,6 +2769,9 @@ private:
common_sampler_accept(slot.smpl.get(), id, true);
// here we have synchronized the llama_context (due to the sampling above), so we can do time measurement
const int64_t t_current = ggml_time_us();
slot.n_decoded += 1;
if (slot.n_decoded == 1) {
@@ -2696,13 +2808,15 @@ private:
continue;
}
size_t n_draft = slot.drafted.size();
const size_t n_draft = slot.drafted.size();
// the accepted tokens from the speculation
const auto ids = common_sampler_sample_and_accept_n(slot.smpl.get(), ctx, slot.i_batch_dft, slot.drafted);
slot.i_batch_dft.clear();
slot.drafted.clear();
const int64_t t_current = ggml_time_us();
slot.n_decoded += ids.size();
slot.t_token_generation = std::max<int64_t>(1, t_current - slot.t_start_generation) / 1e3;
@@ -2784,6 +2898,12 @@ server_response_reader server_context::get_response_reader() {
server_context_meta server_context::get_meta() const {
auto tool_use_src = common_chat_templates_source(impl->chat_templates.get(), "tool_use");
auto bos_id = llama_vocab_bos(impl->vocab);
auto eos_id = llama_vocab_eos(impl->vocab);
auto bos_token_str = bos_id != LLAMA_TOKEN_NULL ? common_token_to_piece(impl->ctx, bos_id, true) : "";
auto eos_token_str = eos_id != LLAMA_TOKEN_NULL ? common_token_to_piece(impl->ctx, eos_id, true) : "";
return server_context_meta {
/* build_info */ build_info,
/* model_name */ impl->model_name,
@@ -2798,8 +2918,8 @@ server_context_meta server_context::get_meta() const {
/* chat_template */ common_chat_templates_source(impl->chat_templates.get()),
/* chat_template_tool_use */ tool_use_src ? tool_use_src : "",
/* bos_token_str */ common_token_to_piece(impl->ctx, llama_vocab_bos(impl->vocab), true),
/* eos_token_str */ common_token_to_piece(impl->ctx, llama_vocab_eos(impl->vocab), true),
/* bos_token_str */ bos_token_str,
/* eos_token_str */ eos_token_str,
/* fim_pre_token */ llama_vocab_fim_pre(impl->vocab),
/* fim_sub_token */ llama_vocab_fim_suf(impl->vocab),
/* fim_mid_token */ llama_vocab_fim_mid(impl->vocab),
@@ -2872,7 +2992,9 @@ std::unique_ptr<server_res_generator> server_routes::handle_completions_impl(
// Everything else, including multimodal completions.
inputs = tokenize_input_prompts(ctx_server.vocab, ctx_server.mctx, prompt, true, true);
}
tasks.reserve(inputs.size());
// tasks.reserve(inputs.size()); // TODO: this is inaccurate due to child tasks
for (size_t i = 0; i < inputs.size(); i++) {
server_task task = server_task(type);
@@ -2891,13 +3013,11 @@ std::unique_ptr<server_res_generator> server_routes::handle_completions_impl(
task.params.oaicompat_cmpl_id = completion_id;
task.params.oaicompat_model = meta->model_name;
// prepare child tasks
if (task.params.n_cmpl > 1) {
task.n_children = task.params.n_cmpl - 1;
for (size_t j = 0; j < task.n_children; j++) {
server_task child = task.create_child(
task.id,
rd.get_new_id());
tasks.push_back(std::move(child));
int n_children = task.params.n_cmpl - 1;
for (int j = 0; j < n_children; j++) {
task.add_child(task.id, rd.get_new_id());
}
}
@@ -2946,19 +3066,22 @@ std::unique_ptr<server_res_generator> server_routes::handle_completions_impl(
// in streaming mode, the first error must be treated as non-stream response
// this is to match the OAI API behavior
// ref: https://github.com/ggml-org/llama.cpp/pull/16486#discussion_r2419657309
server_task_result_ptr first_result = rd.next(req.should_stop);
auto first_result = rd.next(req.should_stop);
if (first_result == nullptr) {
GGML_ASSERT(req.should_stop());
return res; // connection is closed
} else if (first_result->is_error()) {
}
if (first_result->is_error()) {
res->error(first_result->to_json());
return res;
} else {
GGML_ASSERT(
dynamic_cast<server_task_result_cmpl_partial*>(first_result.get()) != nullptr
|| dynamic_cast<server_task_result_cmpl_final*>(first_result.get()) != nullptr
);
}
GGML_ASSERT(
dynamic_cast<server_task_result_cmpl_partial*>(first_result.get()) != nullptr ||
dynamic_cast<server_task_result_cmpl_final*> (first_result.get()) != nullptr
);
// next responses are streamed
// to be sent immediately
json first_result_json = first_result->to_json();
@@ -3014,6 +3137,7 @@ std::unique_ptr<server_res_generator> server_routes::handle_completions_impl(
auto result = rd.next(req.should_stop);
if (result == nullptr) {
SRV_DBG("%s", "stopping streaming due to should_stop condition\n");
GGML_ASSERT(req.should_stop());
return false; // should_stop condition met
}
@@ -3097,6 +3221,11 @@ void server_routes::init_routes() {
// get the result
auto result = res->rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());
@@ -3197,6 +3326,11 @@ void server_routes::init_routes() {
// get the result
auto result = res->rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());
@@ -3703,7 +3837,12 @@ void server_routes::init_routes() {
}
// get the result
server_task_result_ptr result = rd.next(req.should_stop);
auto result = rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());
@@ -3732,7 +3871,12 @@ void server_routes::init_routes() {
}
// get the result
server_task_result_ptr result = rd.next(req.should_stop);
auto result = rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());
@@ -3765,7 +3909,12 @@ std::unique_ptr<server_res_generator> server_routes::handle_slots_save(const ser
rd.post_task(std::move(task));
}
server_task_result_ptr result = rd.next(req.should_stop);
auto result = rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());
@@ -3796,7 +3945,12 @@ std::unique_ptr<server_res_generator> server_routes::handle_slots_restore(const
rd.post_task(std::move(task));
}
server_task_result_ptr result = rd.next(req.should_stop);
auto result = rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());
@@ -3818,7 +3972,12 @@ std::unique_ptr<server_res_generator> server_routes::handle_slots_erase(const se
rd.post_task(std::move(task));
}
server_task_result_ptr result = rd.next(req.should_stop);
auto result = rd.next(req.should_stop);
if (!result) {
// connection was closed
GGML_ASSERT(req.should_stop());
return res;
}
if (result->is_error()) {
res->error(result->to_json());