From: Xuan-Son Nguyen Date: Wed, 24 Dec 2025 22:47:49 +0000 (+0100) Subject: server: (router) add stop-timeout option (#18350) X-Git-Tag: upstream/0.0.7599~64 X-Git-Url: https://git.djapps.eu/?a=commitdiff_plain;h=f5acfb2ffa1a0bc9d01a0a40f4d7c17421d7d570;p=pkg%2Fggml%2Fsources%2Fllama.cpp server: (router) add stop-timeout option (#18350) * server: (router) add stop-timeout option * also allow stop while loading * add docs * unload_lru: also wait for unload to complete --- diff --git a/common/arg.cpp b/common/arg.cpp index fded0bd2..774f8731 100644 --- a/common/arg.cpp +++ b/common/arg.cpp @@ -3518,15 +3518,15 @@ void common_params_add_preset_options(std::vector & args) { [](common_params &, const std::string &) { /* unused */ } ).set_env(COMMON_ARG_PRESET_LOAD_ON_STARTUP).set_preset_only()); + args.push_back(common_arg( + {"stop-timeout"}, "SECONDS", + "in server router mode, force-kill model instance after this many seconds of graceful shutdown", + [](common_params &, int) { /* unused */ } + ).set_env(COMMON_ARG_PRESET_STOP_TIMEOUT).set_preset_only()); + // args.push_back(common_arg( // {"pin"}, // "in server router mode, do not unload this model if models_max is exceeded", // [](common_params &) { /* unused */ } // ).set_preset_only()); - - // args.push_back(common_arg( - // {"unload-idle-seconds"}, "SECONDS", - // "in server router mode, unload models idle for more than this many seconds", - // [](common_params &, int) { /* unused */ } - // ).set_preset_only()); } diff --git a/common/arg.h b/common/arg.h index f5111c65..a1b6a14e 100644 --- a/common/arg.h +++ b/common/arg.h @@ -10,6 +10,7 @@ // pseudo-env variable to identify preset-only arguments #define COMMON_ARG_PRESET_LOAD_ON_STARTUP "__PRESET_LOAD_ON_STARTUP" +#define COMMON_ARG_PRESET_STOP_TIMEOUT "__PRESET_STOP_TIMEOUT" // // CLI argument parsing diff --git a/tools/server/README.md b/tools/server/README.md index 1ae5eae4..7d2f6f79 100644 --- a/tools/server/README.md +++ b/tools/server/README.md @@ -1486,6 +1486,7 @@ The precedence rule for preset options is as follows: We also offer additional options that are exclusive to presets (these aren't treated as command-line arguments): - `load-on-startup` (boolean): Controls whether the model loads automatically when the server starts +- `stop-timeout` (int, seconds): After requested unload, wait for this many seconds before forcing termination (default: 10) ### Routing requests @@ -1574,8 +1575,7 @@ Payload: ```json { - "model": "ggml-org/gemma-3-4b-it-GGUF:Q4_K_M", - "extra_args": ["-n", "128", "--top-k", "4"] + "model": "ggml-org/gemma-3-4b-it-GGUF:Q4_K_M" } ``` diff --git a/tools/server/server-models.cpp b/tools/server/server-models.cpp index 08a0da5c..cb7e7045 100644 --- a/tools/server/server-models.cpp +++ b/tools/server/server-models.cpp @@ -34,6 +34,8 @@ #include #endif +#define DEFAULT_STOP_TIMEOUT 10 // seconds + #define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit" #define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" @@ -203,13 +205,14 @@ void server_models::load_models() { // convert presets to server_model_meta and add to mapping for (const auto & preset : final_presets) { server_model_meta meta{ - /* preset */ preset.second, - /* name */ preset.first, - /* port */ 0, - /* status */ SERVER_MODEL_STATUS_UNLOADED, - /* last_used */ 0, - /* args */ std::vector(), - /* exit_code */ 0 + /* preset */ preset.second, + /* name */ preset.first, + /* port */ 0, + /* status */ SERVER_MODEL_STATUS_UNLOADED, + /* last_used */ 0, + /* args */ std::vector(), + /* exit_code */ 0, + /* stop_timeout */ DEFAULT_STOP_TIMEOUT, }; add_model(std::move(meta)); } @@ -227,6 +230,20 @@ void server_models::load_models() { } } + // handle custom stop-timeout option + for (auto & [name, inst] : mapping) { + std::string val; + if (inst.meta.preset.get_option(COMMON_ARG_PRESET_STOP_TIMEOUT, val)) { + try { + inst.meta.stop_timeout = std::stoi(val); + } catch (...) { + SRV_WRN("invalid stop-timeout value '%s' for model '%s', using default %d seconds\n", + val.c_str(), name.c_str(), DEFAULT_STOP_TIMEOUT); + inst.meta.stop_timeout = DEFAULT_STOP_TIMEOUT; + } + } + } + // load any autoload models std::vector models_to_load; for (const auto & [name, inst] : mapping) { @@ -362,7 +379,7 @@ void server_models::unload_lru() { int64_t lru_last_used = ggml_time_ms(); size_t count_active = 0; { - std::lock_guard lk(mutex); + std::unique_lock lk(mutex); for (const auto & m : mapping) { if (m.second.meta.is_active()) { count_active++; @@ -376,6 +393,13 @@ void server_models::unload_lru() { if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) { SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str()); unload(lru_model_name); + // wait for unload to complete + { + std::unique_lock lk(mutex); + cv.wait(lk, [this, &lru_model_name]() { + return mapping[lru_model_name].meta.status == SERVER_MODEL_STATUS_UNLOADED; + }); + } } } @@ -436,38 +460,83 @@ void server_models::load(const std::string & name) { // start a thread to manage the child process // captured variables are guaranteed to be destroyed only after the thread is joined - inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() { - // read stdout/stderr and forward to main server log - bool state_received = false; // true if child state received - FILE * p_stdout_stderr = subprocess_stdout(child_proc.get()); - if (p_stdout_stderr) { - char buffer[4096]; - while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) { - LOG("[%5d] %s", port, buffer); - if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) { - // child process is ready - this->update_status(name, SERVER_MODEL_STATUS_LOADED); - state_received = true; + inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port, stop_timeout = inst.meta.stop_timeout]() { + FILE * stdin_file = subprocess_stdin(child_proc.get()); + FILE * stdout_file = subprocess_stdout(child_proc.get()); // combined stdout/stderr + + std::thread log_thread([&]() { + // read stdout/stderr and forward to main server log + // also handle status report from child process + bool state_received = false; // true if child state received + if (stdout_file) { + char buffer[4096]; + while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) { + LOG("[%5d] %s", port, buffer); + if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) { + // child process is ready + this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0); + state_received = true; + } } + } else { + SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str()); } - } else { - SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str()); - } + }); + + std::thread stopping_thread([&]() { + // thread to monitor stopping signal + auto is_stopping = [this, &name]() { + return this->stopping_models.find(name) != this->stopping_models.end(); + }; + { + std::unique_lock lk(this->mutex); + this->cv_stop.wait(lk, is_stopping); + } + SRV_INF("stopping model instance name=%s\n", name.c_str()); + // send interrupt to child process + fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT); + fflush(stdin_file); + // wait to stop gracefully or timeout + int64_t start_time = ggml_time_ms(); + while (true) { + std::unique_lock lk(this->mutex); + if (!is_stopping()) { + return; // already stopped + } + int64_t elapsed = ggml_time_ms() - start_time; + if (elapsed >= stop_timeout * 1000) { + // timeout, force kill + SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout); + subprocess_terminate(child_proc.get()); + return; + } + this->cv_stop.wait_for(lk, std::chrono::seconds(1)); + } + }); + // we reach here when the child process exits + // note: we cannot join() prior to this point because it will close stdin_file + if (log_thread.joinable()) { + log_thread.join(); + } + + // stop the timeout monitoring thread + { + std::lock_guard lk(this->mutex); + stopping_models.erase(name); + cv_stop.notify_all(); + } + if (stopping_thread.joinable()) { + stopping_thread.join(); + } + + // get the exit code int exit_code = 0; subprocess_join(child_proc.get(), &exit_code); subprocess_destroy(child_proc.get()); - // update PID and status - { - std::lock_guard lk(mutex); - auto it = mapping.find(name); - if (it != mapping.end()) { - auto & meta = it->second.meta; - meta.exit_code = exit_code; - meta.status = SERVER_MODEL_STATUS_UNLOADED; - } - cv.notify_all(); - } + + // update status and exit code + this->update_status(name, SERVER_MODEL_STATUS_UNLOADED, exit_code); SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code); }); @@ -488,22 +557,14 @@ void server_models::load(const std::string & name) { cv.notify_all(); } -static void interrupt_subprocess(FILE * stdin_file) { - // because subprocess.h does not provide a way to send SIGINT, - // we will send a command to the child process to exit gracefully - if (stdin_file) { - fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT); - fflush(stdin_file); - } -} - void server_models::unload(const std::string & name) { std::lock_guard lk(mutex); auto it = mapping.find(name); if (it != mapping.end()) { if (it->second.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); - interrupt_subprocess(it->second.stdin_file); + stopping_models.insert(name); + cv_stop.notify_all(); // status change will be handled by the managing thread } else { SRV_WRN("model instance name=%s is not loaded\n", name.c_str()); @@ -518,7 +579,8 @@ void server_models::unload_all() { for (auto & [name, inst] : mapping) { if (inst.meta.is_active()) { SRV_INF("unloading model instance name=%s\n", name.c_str()); - interrupt_subprocess(inst.stdin_file); + stopping_models.insert(name); + cv_stop.notify_all(); // status change will be handled by the managing thread } // moving the thread to join list to avoid deadlock @@ -532,16 +594,15 @@ void server_models::unload_all() { } } -void server_models::update_status(const std::string & name, server_model_status status) { - // for now, we only allow updating to LOADED status - if (status != SERVER_MODEL_STATUS_LOADED) { - throw std::runtime_error("invalid status value"); - } - auto meta = get_meta(name); - if (meta.has_value()) { - meta->status = status; - update_meta(name, meta.value()); +void server_models::update_status(const std::string & name, server_model_status status, int exit_code) { + std::unique_lock lk(mutex); + auto it = mapping.find(name); + if (it != mapping.end()) { + auto & meta = it->second.meta; + meta.status = status; + meta.exit_code = exit_code; } + cv.notify_all(); } void server_models::wait_until_loaded(const std::string & name) { @@ -568,6 +629,7 @@ bool server_models::ensure_model_loaded(const std::string & name) { load(name); } + // for loading state SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str()); wait_until_loaded(name); @@ -795,7 +857,7 @@ void server_models_routes::init_routes() { res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST)); return res; } - if (model->status != SERVER_MODEL_STATUS_LOADED) { + if (!model->is_active()) { res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST)); return res; } diff --git a/tools/server/server-models.h b/tools/server/server-models.h index 3e1868c2..7e335375 100644 --- a/tools/server/server-models.h +++ b/tools/server/server-models.h @@ -9,6 +9,7 @@ #include #include #include +#include /** * state diagram: @@ -56,6 +57,7 @@ struct server_model_meta { int64_t last_used = 0; // for LRU unloading std::vector args; // args passed to the model instance, will be populated by render_args() int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED) + int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown bool is_active() const { return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING; @@ -83,6 +85,10 @@ private: std::condition_variable cv; std::map mapping; + // for stopping models + std::condition_variable cv_stop; + std::set stopping_models; + common_preset_context ctx_preset; common_params base_params; @@ -119,7 +125,7 @@ public: void unload_all(); // update the status of a model instance (thread-safe) - void update_status(const std::string & name, server_model_status status); + void update_status(const std::string & name, server_model_status status, int exit_code); // wait until the model instance is fully loaded (thread-safe) // return when the model is loaded or failed to load