server_metrics metrics;
- json webui_settings = json::object();
+ // cached responses for HTTP API (read-only from HTTP threads)
+ json json_server_props = json::object();
+ json json_server_model_meta = json::object();
// Necessary similarity of prompt for slot selection
float slot_prompt_similarity = 0.0f;
common_chat_templates_ptr chat_templates;
oaicompat_parser_options oai_parser_opt;
+ bool sleeping = false;
+
~server_context_impl() {
+ if (!sleeping) {
+ // destroy() is already called when entering sleeping state
+ // we don't call it again here to avoid double free
+ destroy();
+ }
+ }
+
+ void destroy() {
+ llama_init.reset();
+ ctx = nullptr;
+ model = nullptr;
+
mtmd_free(mctx);
+ mctx = nullptr;
// Clear any sampling context
for (server_slot & slot : slots) {
llama_batch_free(batch);
}
+ void handle_sleeping_state(bool new_state) {
+ GGML_ASSERT(sleeping != new_state);
+ if (new_state) {
+ SRV_INF("%s", "server is entering sleeping state\n");
+ destroy();
+ } else {
+ SRV_INF("%s", "server is exiting sleeping state\n");
+ if (!load_model(params_base)) {
+ GGML_ABORT("failed to reload model after sleeping");
+ }
+ }
+ sleeping = new_state;
+ }
+
// load the model and initialize llama_context
+ // this may also be called to resume from sleeping state
bool load_model(const common_params & params) {
+ bool is_resume = sleeping;
+
SRV_INF("loading model '%s'\n", params.model.path.c_str());
params_base = params;
- webui_settings = json::object();
- if (!params_base.webui_config_json.empty()) {
- try {
- webui_settings = json::parse(params_base.webui_config_json);
- } catch (const std::exception & e) {
- SRV_ERR("%s: failed to parse webui config: %s\n", __func__, e.what());
- return false;
- }
- }
-
llama_init = common_init_from_params(params_base);
model = llama_init->model();
std::string & mmproj_path = params_base.mmproj.path;
if (!mmproj_path.empty()) {
- mtmd_helper_log_set(common_log_default_callback, nullptr);
+ if (!is_resume) {
+ mtmd_helper_log_set(common_log_default_callback, nullptr);
+ }
mtmd_context_params mparams = mtmd_context_params_default();
mparams.use_gpu = params_base.mmproj_use_gpu;
}
}
- return true;
- }
-
- // initialize slots and server-related data
- void init() {
- // wiring up server queues
- queue_tasks.on_new_task([this](server_task && task) {
- process_single_task(std::move(task));
- });
- queue_tasks.on_update_slots([this]() {
- update_slots();
- });
-
// Necessary similarity of prompt for slot selection
slot_prompt_similarity = params_base.slot_prompt_similarity;
n_ctx_slot = n_ctx_train;
}
+ slots.clear();
for (int i = 0; i < params_base.n_parallel; i++) {
server_slot slot;
slot.ctx_dft = llama_init_from_model(model_dft, cparams_dft);
if (slot.ctx_dft == nullptr) {
SRV_ERR("%s", "failed to create draft context\n");
- return;
+ return false;
}
slot.spec = common_speculative_init(slot.ctx, slot.ctx_dft);
if (slot.spec == nullptr) {
SRV_ERR("%s", "failed to create speculator\n");
- return;
+ return false;
}
for (auto & pair : params_base.speculative.replacements) {
common_speculative_add_replacement_tgt_dft(slot.spec, pair.first.c_str(), pair.second.c_str());
batch = llama_batch_init(std::max(n_batch, params_base.n_parallel), 0, 1);
}
- metrics.init();
-
if (params_base.cache_ram_mib != 0) {
if (params_base.cache_ram_mib < 0) {
SRV_WRN("prompt cache is enabled, size limit: %s\n", "no limit");
LOG_INF("%s: chat template, chat_template: %s, example_format: '%s'\n", __func__,
common_chat_templates_source(chat_templates.get()),
common_chat_format_example(chat_templates.get(), params_base.use_jinja, params_base.default_template_kwargs).c_str());
+
+ if (!is_resume) {
+ return init();
+ }
+
+ return true;
+ }
+
+ // unlike load_model(), this is only called once during initialization
+ bool init() {
+ GGML_ASSERT(ctx != nullptr);
+ GGML_ASSERT(model != nullptr);
+ GGML_ASSERT(!sleeping);
+
+ // wiring up server queues
+ queue_tasks.on_new_task([this](server_task && task) {
+ process_single_task(std::move(task));
+ });
+ queue_tasks.on_update_slots([this]() {
+ update_slots();
+ });
+ queue_tasks.on_sleeping_state([this](bool sleeping) {
+ handle_sleeping_state(sleeping);
+ });
+
+ metrics.init();
+
+ if (!populate_json_responses()) {
+ SRV_ERR("%s", "failed to populate JSON responses\n");
+ return false;
+ }
+
+ return true;
+ }
+
+ bool populate_json_responses() {
+ // populate webui settings
+ json json_webui_settings = json::object();
+ {
+ if (!params_base.webui_config_json.empty()) {
+ try {
+ json_webui_settings = json::parse(params_base.webui_config_json);
+ } catch (const std::exception & e) {
+ SRV_ERR("%s: failed to parse webui config: %s\n", __func__, e.what());
+ return false;
+ }
+ }
+ }
+
+ // populate server properties
+ {
+ task_params params;
+ params.sampling = params_base.sampling;
+ json default_generation_settings_for_props = json {
+ {"params", params.to_json(true)},
+ {"n_ctx", get_slot_n_ctx()},
+ };
+
+ json_server_props = {
+ { "default_generation_settings", default_generation_settings_for_props },
+ { "total_slots", params_base.n_parallel },
+ { "model_alias", model_name },
+ { "model_path", params_base.model.path },
+ { "modalities", json {
+ {"vision", oai_parser_opt.allow_image},
+ {"audio", oai_parser_opt.allow_audio},
+ } },
+ { "endpoint_slots", params_base.endpoint_slots },
+ { "endpoint_props", params_base.endpoint_props },
+ { "endpoint_metrics", params_base.endpoint_metrics },
+ { "webui", params_base.webui },
+ { "webui_settings", json_webui_settings },
+ { "chat_template", common_chat_templates_source(chat_templates.get()) },
+ { "bos_token", common_token_to_piece(ctx, llama_vocab_bos(vocab), /* special= */ true)},
+ { "eos_token", common_token_to_piece(ctx, llama_vocab_eos(vocab), /* special= */ true)},
+ { "build_info", build_info },
+ };
+ if (params_base.use_jinja) {
+ if (auto tool_use_src = common_chat_templates_source(chat_templates.get(), "tool_use")) {
+ json_server_props["chat_template_tool_use"] = tool_use_src;
+ }
+ }
+ }
+
+ // populate model metadata
+ {
+ json_server_model_meta = {
+ {"vocab_type", llama_vocab_type (vocab)},
+ {"n_vocab", llama_vocab_n_tokens (vocab)},
+ {"n_ctx_train", llama_model_n_ctx_train(model)},
+ {"n_embd", llama_model_n_embd (model)},
+ {"n_params", llama_model_n_params (model)},
+ {"size", llama_model_size (model)},
+ };
+ }
+
+ return true;
}
server_slot * get_slot_by_id(int id) {
SRV_DBG("%s", "run slots completed\n");
}
- json model_meta() const {
- return json {
- {"vocab_type", llama_vocab_type (vocab)},
- {"n_vocab", llama_vocab_n_tokens (vocab)},
- {"n_ctx_train", llama_model_n_ctx_train(model)},
- {"n_embd", llama_model_n_embd (model)},
- {"n_params", llama_model_n_params (model)},
- {"size", llama_model_size (model)},
- };
- }
-
int get_slot_n_ctx() {
return slots.back().n_ctx;
}
server_context::server_context() : impl(new server_context_impl()) {}
server_context::~server_context() = default;
-void server_context::init() {
- impl->init();
-}
-
bool server_context::load_model(const common_params & params) {
return impl->load_model(params);
}
void server_context::start_loop() {
- impl->queue_tasks.start_loop();
+ auto & params = impl->params_base;
+ impl->queue_tasks.start_loop(params.sleep_idle_seconds * 1000);
}
void server_context::terminate() {
// generator-like API for HTTP response generation
+// may have bypass_sleep = true if the task does not use ctx_server
struct server_res_generator : server_http_res {
server_response_reader rd;
- server_res_generator(server_context_impl & ctx_server)
- : rd(ctx_server.queue_tasks, ctx_server.queue_results, HTTP_POLLING_SECONDS) {}
+ server_res_generator(server_context_impl & ctx_server, bool bypass_sleep = false)
+ : rd(ctx_server.queue_tasks, ctx_server.queue_results, HTTP_POLLING_SECONDS) {
+ // fast path in case sleeping is disabled
+ bypass_sleep |= ctx_server.params_base.sleep_idle_seconds < 0;
+ if (!bypass_sleep) {
+ ctx_server.queue_tasks.wait_until_no_sleep();
+ }
+ }
void ok(const json & response_data) {
status = 200;
data = safe_json_to_str(response_data);
//
static std::unique_ptr<server_res_generator> handle_completions_impl(
+ std::unique_ptr<server_res_generator> && res_ptr,
server_context_impl & ctx_server,
server_task_type type,
const json & data,
task_response_type res_type) {
GGML_ASSERT(type == SERVER_TASK_TYPE_COMPLETION || type == SERVER_TASK_TYPE_INFILL);
- auto res = std::make_unique<server_res_generator>(ctx_server);
+ auto res = std::move(res_ptr);
auto completion_id = gen_chatcmplid();
auto & rd = res->rd;
}
void server_routes::init_routes() {
+ // IMPORTANT: all lambda functions must start with std::make_unique<server_res_generator>
+ // this is to ensure that the server_res_generator can handle sleeping case correctly
+
this->get_health = [this](const server_http_req &) {
// error and loading states are handled by middleware
- auto res = std::make_unique<server_res_generator>(ctx_server);
+ auto res = std::make_unique<server_res_generator>(ctx_server, true);
res->ok({{"status", "ok"}});
return res;
};
};
this->get_props = [this](const server_http_req &) {
- auto res = std::make_unique<server_res_generator>(ctx_server);
- json default_generation_settings_for_props;
-
- {
- task_params params;
-
- params.sampling = ctx_server.params_base.sampling;
-
- default_generation_settings_for_props = json {
- {"params", params.to_json(true)},
- {"n_ctx", ctx_server.get_slot_n_ctx()},
- };
- }
-
- json data = {
- { "default_generation_settings", default_generation_settings_for_props },
- { "total_slots", ctx_server.params_base.n_parallel },
- { "model_alias", ctx_server.model_name },
- { "model_path", ctx_server.params_base.model.path },
- { "modalities", json {
- {"vision", ctx_server.oai_parser_opt.allow_image},
- {"audio", ctx_server.oai_parser_opt.allow_audio},
- } },
- { "endpoint_slots", params.endpoint_slots },
- { "endpoint_props", params.endpoint_props },
- { "endpoint_metrics", params.endpoint_metrics },
- { "webui", params.webui },
- { "webui_settings", ctx_server.webui_settings },
- { "chat_template", common_chat_templates_source(ctx_server.chat_templates.get()) },
- { "bos_token", common_token_to_piece(ctx_server.ctx, llama_vocab_bos(ctx_server.vocab), /* special= */ true)},
- { "eos_token", common_token_to_piece(ctx_server.ctx, llama_vocab_eos(ctx_server.vocab), /* special= */ true)},
- { "build_info", build_info },
- };
- if (ctx_server.params_base.use_jinja) {
- if (auto tool_use_src = common_chat_templates_source(ctx_server.chat_templates.get(), "tool_use")) {
- data["chat_template_tool_use"] = tool_use_src;
- }
- }
-
- res->ok(data);
+ auto res = std::make_unique<server_res_generator>(ctx_server, true);
+ auto props = ctx_server.json_server_props;
+ props["is_sleeping"] = ctx_server.queue_tasks.is_sleeping();
+ res->ok(props);
return res;
};
std::vector<raw_buffer> files; // dummy
return handle_completions_impl(
+ std::move(res),
ctx_server,
SERVER_TASK_TYPE_INFILL,
data,
};
this->post_completions = [this](const server_http_req & req) {
+ auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files; // dummy
const json body = json::parse(req.body);
return handle_completions_impl(
+ std::move(res),
ctx_server,
SERVER_TASK_TYPE_COMPLETION,
body,
};
this->post_completions_oai = [this](const server_http_req & req) {
+ auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files; // dummy
const json body = json::parse(req.body);
return handle_completions_impl(
+ std::move(res),
ctx_server,
SERVER_TASK_TYPE_COMPLETION,
body,
};
this->post_chat_completions = [this](const server_http_req & req) {
+ auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files;
json body = json::parse(req.body);
json body_parsed = oaicompat_chat_params_parse(
ctx_server.oai_parser_opt,
files);
return handle_completions_impl(
+ std::move(res),
ctx_server,
SERVER_TASK_TYPE_COMPLETION,
body_parsed,
};
this->post_anthropic_messages = [this](const server_http_req & req) {
+ auto res = std::make_unique<server_res_generator>(ctx_server);
std::vector<raw_buffer> files;
json body = convert_anthropic_to_oai(json::parse(req.body));
json body_parsed = oaicompat_chat_params_parse(
ctx_server.oai_parser_opt,
files);
return handle_completions_impl(
+ std::move(res),
ctx_server,
SERVER_TASK_TYPE_COMPLETION,
body_parsed,
return res;
};
+ // TODO: this endpoint is unsafe to access during model reloading (i.e. wake up from sleeping)
+ // how to make it work even during load_model()?
this->get_models = [this](const server_http_req &) {
auto res = std::make_unique<server_res_generator>(ctx_server);
json model_meta = nullptr;
if (is_ready()) {
- model_meta = ctx_server.model_meta();
+ model_meta = ctx_server.json_server_model_meta;
}
bool has_mtmd = ctx_server.mctx != nullptr;
json models = {
} else {
queue_tasks.push_back(std::move(task));
}
+ time_last_task = ggml_time_ms();
condition_tasks.notify_one();
return task_id;
}
queue_tasks.push_back(std::move(task));
}
}
+ time_last_task = ggml_time_ms();
condition_tasks.notify_one();
return 0;
}
std::unique_lock<std::mutex> lock(mutex_tasks);
QUE_DBG("defer task, id = %d\n", task.id);
queue_tasks_deferred.push_back(std::move(task));
+ time_last_task = ggml_time_ms();
condition_tasks.notify_one();
}
return new_id;
}
-void server_queue::on_new_task(std::function<void(server_task &&)> callback) {
- callback_new_task = std::move(callback);
-}
-
-void server_queue::on_update_slots(std::function<void(void)> callback) {
- callback_update_slots = std::move(callback);
-}
-
void server_queue::pop_deferred_task() {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!queue_tasks_deferred.empty()) {
queue_tasks.emplace_front(std::move(queue_tasks_deferred.front()));
queue_tasks_deferred.pop_front();
}
+ time_last_task = ggml_time_ms();
condition_tasks.notify_one();
}
+void server_queue::wait_until_no_sleep() {
+ std::unique_lock<std::mutex> lock(mutex_tasks);
+ if (!sleeping) {
+ return;
+ } else {
+ if (!req_stop_sleeping) {
+ QUE_DBG("%s", "requesting to stop sleeping\n");
+ req_stop_sleeping = true;
+ condition_tasks.notify_one(); // only main thread is waiting on this
+ }
+ QUE_DBG("%s", "waiting until no sleep\n");
+ condition_tasks.wait(lock, [&]{
+ return !sleeping;
+ });
+ }
+}
+
void server_queue::terminate() {
std::unique_lock<std::mutex> lock(mutex_tasks);
running = false;
condition_tasks.notify_all();
}
-void server_queue::start_loop() {
+void server_queue::start_loop(int64_t idle_sleep_ms) {
running = true;
+ time_last_task = ggml_time_ms();
+
+ constexpr auto max_wait_time = std::chrono::seconds(1);
+ auto should_sleep = [&]() -> bool {
+ // caller must hold mutex_tasks
+ if (idle_sleep_ms < 0) {
+ return false;
+ }
+ int64_t now = ggml_time_ms();
+ return (now - time_last_task) >= idle_sleep_ms;
+ };
while (true) {
QUE_DBG("%s", "processing new tasks\n");
QUE_DBG("processing task, id = %d\n", task.id);
callback_new_task(std::move(task));
}
-
// all tasks in the current loop is processed, slots data is now ready
QUE_DBG("%s", "update slots\n");
+ // this will run the main inference process for all slots
callback_update_slots();
+ {
+ // update_slots() may take a while to finish, we need to make sure it's not counted as idle
+ std::unique_lock<std::mutex> lock(mutex_tasks);
+ time_last_task = ggml_time_ms();
+ }
QUE_DBG("%s", "waiting for new tasks\n");
- {
+ while (true) {
std::unique_lock<std::mutex> lock(mutex_tasks);
- if (!running) {
- QUE_DBG("%s", "terminate\n");
- return;
+ if (!running || !queue_tasks.empty()) {
+ break; // go back to process new tasks or terminate
}
- if (queue_tasks.empty()) {
+
+ // no tasks, check for sleeping state
+ if (should_sleep()) {
+ QUE_INF("%s", "entering sleeping state\n");
+ sleeping = true;
+ callback_sleeping_state(true);
+ req_stop_sleeping = false;
+ // wait until we are requested to exit sleeping state
condition_tasks.wait(lock, [&]{
+ return (!running || req_stop_sleeping);
+ });
+ if (!running) { // may changed during sleep
+ break; // terminate
+ }
+ QUE_INF("%s", "exiting sleeping state\n");
+ req_stop_sleeping = false;
+ callback_sleeping_state(false);
+ sleeping = false;
+ time_last_task = ggml_time_ms();
+ condition_tasks.notify_all(); // notify wait_until_no_sleep()
+ break; // process new tasks
+ } else {
+ // wait for new tasks or timeout for checking sleeping condition
+ bool res = condition_tasks.wait_for(lock, max_wait_time, [&]{
return (!queue_tasks.empty() || !running);
});
+ if (res) {
+ break; // new task arrived or terminate
+ }
+ // otherwise, loop again to check sleeping condition
}
}
}