]> git.djapps.eu Git - pkg/ggml/sources/llama.cpp/commitdiff
server: (router) add stop-timeout option (#18350)
authorXuan-Son Nguyen <redacted>
Wed, 24 Dec 2025 22:47:49 +0000 (23:47 +0100)
committerGitHub <redacted>
Wed, 24 Dec 2025 22:47:49 +0000 (23:47 +0100)
* server: (router) add stop-timeout option

* also allow stop while loading

* add docs

* unload_lru: also wait for unload to complete

common/arg.cpp
common/arg.h
tools/server/README.md
tools/server/server-models.cpp
tools/server/server-models.h

index fded0bd260d59d3162f9e00074a787837be3e565..774f8731a9f4051b85583f014c72004608b9d257 100644 (file)
@@ -3518,15 +3518,15 @@ void common_params_add_preset_options(std::vector<common_arg> & args) {
         [](common_params &, const std::string &) { /* unused */ }
     ).set_env(COMMON_ARG_PRESET_LOAD_ON_STARTUP).set_preset_only());
 
+    args.push_back(common_arg(
+        {"stop-timeout"}, "SECONDS",
+        "in server router mode, force-kill model instance after this many seconds of graceful shutdown",
+        [](common_params &, int) { /* unused */ }
+    ).set_env(COMMON_ARG_PRESET_STOP_TIMEOUT).set_preset_only());
+
     // args.push_back(common_arg(
     //     {"pin"},
     //     "in server router mode, do not unload this model if models_max is exceeded",
     //     [](common_params &) { /* unused */ }
     // ).set_preset_only());
-
-    // args.push_back(common_arg(
-    //     {"unload-idle-seconds"}, "SECONDS",
-    //     "in server router mode, unload models idle for more than this many seconds",
-    //     [](common_params &, int) { /* unused */ }
-    // ).set_preset_only());
 }
index f5111c658f4045aca50f9da6941a8212c5246a6c..a1b6a14e67508602f6c75406f7c424447985a92a 100644 (file)
@@ -10,6 +10,7 @@
 
 // pseudo-env variable to identify preset-only arguments
 #define COMMON_ARG_PRESET_LOAD_ON_STARTUP "__PRESET_LOAD_ON_STARTUP"
+#define COMMON_ARG_PRESET_STOP_TIMEOUT    "__PRESET_STOP_TIMEOUT"
 
 //
 // CLI argument parsing
index 1ae5eae4c68e738f36dc1129a51e9356b8115de0..7d2f6f798e731d94fc881f8574d1e0c0f4680c9d 100644 (file)
@@ -1486,6 +1486,7 @@ The precedence rule for preset options is as follows:
 
 We also offer additional options that are exclusive to presets (these aren't treated as command-line arguments):
 - `load-on-startup` (boolean): Controls whether the model loads automatically when the server starts
+- `stop-timeout` (int, seconds): After requested unload, wait for this many seconds before forcing termination (default: 10)
 
 ### Routing requests
 
@@ -1574,8 +1575,7 @@ Payload:
 
 ```json
 {
-  "model": "ggml-org/gemma-3-4b-it-GGUF:Q4_K_M",
-  "extra_args": ["-n", "128", "--top-k", "4"]
+  "model": "ggml-org/gemma-3-4b-it-GGUF:Q4_K_M"
 }
 ```
 
index 08a0da5c875edfd91c5eec30450731a20f0701dc..cb7e70455ab57c0cbb66bcb4b25f337175d70e3c 100644 (file)
@@ -34,6 +34,8 @@
 #include <limits.h>
 #endif
 
+#define DEFAULT_STOP_TIMEOUT 10 // seconds
+
 #define CMD_ROUTER_TO_CHILD_EXIT  "cmd_router_to_child:exit"
 #define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
 
@@ -203,13 +205,14 @@ void server_models::load_models() {
     // convert presets to server_model_meta and add to mapping
     for (const auto & preset : final_presets) {
         server_model_meta meta{
-            /* preset      */ preset.second,
-            /* name        */ preset.first,
-            /* port        */ 0,
-            /* status      */ SERVER_MODEL_STATUS_UNLOADED,
-            /* last_used   */ 0,
-            /* args        */ std::vector<std::string>(),
-            /* exit_code   */ 0
+            /* preset       */ preset.second,
+            /* name         */ preset.first,
+            /* port         */ 0,
+            /* status       */ SERVER_MODEL_STATUS_UNLOADED,
+            /* last_used    */ 0,
+            /* args         */ std::vector<std::string>(),
+            /* exit_code    */ 0,
+            /* stop_timeout */ DEFAULT_STOP_TIMEOUT,
         };
         add_model(std::move(meta));
     }
@@ -227,6 +230,20 @@ void server_models::load_models() {
         }
     }
 
+    // handle custom stop-timeout option
+    for (auto & [name, inst] : mapping) {
+        std::string val;
+        if (inst.meta.preset.get_option(COMMON_ARG_PRESET_STOP_TIMEOUT, val)) {
+            try {
+                inst.meta.stop_timeout = std::stoi(val);
+            } catch (...) {
+                SRV_WRN("invalid stop-timeout value '%s' for model '%s', using default %d seconds\n",
+                    val.c_str(), name.c_str(), DEFAULT_STOP_TIMEOUT);
+                inst.meta.stop_timeout = DEFAULT_STOP_TIMEOUT;
+            }
+        }
+    }
+
     // load any autoload models
     std::vector<std::string> models_to_load;
     for (const auto & [name, inst] : mapping) {
@@ -362,7 +379,7 @@ void server_models::unload_lru() {
     int64_t lru_last_used = ggml_time_ms();
     size_t count_active = 0;
     {
-        std::lock_guard<std::mutex> lk(mutex);
+        std::unique_lock<std::mutex> lk(mutex);
         for (const auto & m : mapping) {
             if (m.second.meta.is_active()) {
                 count_active++;
@@ -376,6 +393,13 @@ void server_models::unload_lru() {
     if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) {
         SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str());
         unload(lru_model_name);
+        // wait for unload to complete
+        {
+            std::unique_lock<std::mutex> lk(mutex);
+            cv.wait(lk, [this, &lru_model_name]() {
+                return mapping[lru_model_name].meta.status == SERVER_MODEL_STATUS_UNLOADED;
+            });
+        }
     }
 }
 
@@ -436,38 +460,83 @@ void server_models::load(const std::string & name) {
 
     // start a thread to manage the child process
     // captured variables are guaranteed to be destroyed only after the thread is joined
-    inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
-        // read stdout/stderr and forward to main server log
-        bool state_received = false; // true if child state received
-        FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
-        if (p_stdout_stderr) {
-            char buffer[4096];
-            while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
-                LOG("[%5d] %s", port, buffer);
-                if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
-                    // child process is ready
-                    this->update_status(name, SERVER_MODEL_STATUS_LOADED);
-                    state_received = true;
+    inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port, stop_timeout = inst.meta.stop_timeout]() {
+        FILE * stdin_file = subprocess_stdin(child_proc.get());
+        FILE * stdout_file = subprocess_stdout(child_proc.get()); // combined stdout/stderr
+
+        std::thread log_thread([&]() {
+            // read stdout/stderr and forward to main server log
+            // also handle status report from child process
+            bool state_received = false; // true if child state received
+            if (stdout_file) {
+                char buffer[4096];
+                while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) {
+                    LOG("[%5d] %s", port, buffer);
+                    if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
+                        // child process is ready
+                        this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0);
+                        state_received = true;
+                    }
                 }
+            } else {
+                SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
             }
-        } else {
-            SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
-        }
+        });
+
+        std::thread stopping_thread([&]() {
+            // thread to monitor stopping signal
+            auto is_stopping = [this, &name]() {
+                return this->stopping_models.find(name) != this->stopping_models.end();
+            };
+            {
+                std::unique_lock<std::mutex> lk(this->mutex);
+                this->cv_stop.wait(lk, is_stopping);
+            }
+            SRV_INF("stopping model instance name=%s\n", name.c_str());
+            // send interrupt to child process
+            fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
+            fflush(stdin_file);
+            // wait to stop gracefully or timeout
+            int64_t start_time = ggml_time_ms();
+            while (true) {
+                std::unique_lock<std::mutex> lk(this->mutex);
+                if (!is_stopping()) {
+                    return; // already stopped
+                }
+                int64_t elapsed = ggml_time_ms() - start_time;
+                if (elapsed >= stop_timeout * 1000) {
+                    // timeout, force kill
+                    SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
+                    subprocess_terminate(child_proc.get());
+                    return;
+                }
+                this->cv_stop.wait_for(lk, std::chrono::seconds(1));
+            }
+        });
+
         // we reach here when the child process exits
+        // note: we cannot join() prior to this point because it will close stdin_file
+        if (log_thread.joinable()) {
+            log_thread.join();
+        }
+
+        // stop the timeout monitoring thread
+        {
+            std::lock_guard<std::mutex> lk(this->mutex);
+            stopping_models.erase(name);
+            cv_stop.notify_all();
+        }
+        if (stopping_thread.joinable()) {
+            stopping_thread.join();
+        }
+
+        // get the exit code
         int exit_code = 0;
         subprocess_join(child_proc.get(), &exit_code);
         subprocess_destroy(child_proc.get());
-        // update PID and status
-        {
-            std::lock_guard<std::mutex> lk(mutex);
-            auto it = mapping.find(name);
-            if (it != mapping.end()) {
-                auto & meta = it->second.meta;
-                meta.exit_code = exit_code;
-                meta.status    = SERVER_MODEL_STATUS_UNLOADED;
-            }
-            cv.notify_all();
-        }
+
+        // update status and exit code
+        this->update_status(name, SERVER_MODEL_STATUS_UNLOADED, exit_code);
         SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
     });
 
@@ -488,22 +557,14 @@ void server_models::load(const std::string & name) {
     cv.notify_all();
 }
 
-static void interrupt_subprocess(FILE * stdin_file) {
-    // because subprocess.h does not provide a way to send SIGINT,
-    // we will send a command to the child process to exit gracefully
-    if (stdin_file) {
-        fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
-        fflush(stdin_file);
-    }
-}
-
 void server_models::unload(const std::string & name) {
     std::lock_guard<std::mutex> lk(mutex);
     auto it = mapping.find(name);
     if (it != mapping.end()) {
         if (it->second.meta.is_active()) {
             SRV_INF("unloading model instance name=%s\n", name.c_str());
-            interrupt_subprocess(it->second.stdin_file);
+            stopping_models.insert(name);
+            cv_stop.notify_all();
             // status change will be handled by the managing thread
         } else {
             SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
@@ -518,7 +579,8 @@ void server_models::unload_all() {
         for (auto & [name, inst] : mapping) {
             if (inst.meta.is_active()) {
                 SRV_INF("unloading model instance name=%s\n", name.c_str());
-                interrupt_subprocess(inst.stdin_file);
+                stopping_models.insert(name);
+                cv_stop.notify_all();
                 // status change will be handled by the managing thread
             }
             // moving the thread to join list to avoid deadlock
@@ -532,16 +594,15 @@ void server_models::unload_all() {
     }
 }
 
-void server_models::update_status(const std::string & name, server_model_status status) {
-    // for now, we only allow updating to LOADED status
-    if (status != SERVER_MODEL_STATUS_LOADED) {
-        throw std::runtime_error("invalid status value");
-    }
-    auto meta = get_meta(name);
-    if (meta.has_value()) {
-        meta->status = status;
-        update_meta(name, meta.value());
+void server_models::update_status(const std::string & name, server_model_status status, int exit_code) {
+    std::unique_lock<std::mutex> lk(mutex);
+    auto it = mapping.find(name);
+    if (it != mapping.end()) {
+        auto & meta = it->second.meta;
+        meta.status    = status;
+        meta.exit_code = exit_code;
     }
+    cv.notify_all();
 }
 
 void server_models::wait_until_loaded(const std::string & name) {
@@ -568,6 +629,7 @@ bool server_models::ensure_model_loaded(const std::string & name) {
         load(name);
     }
 
+    // for loading state
     SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
     wait_until_loaded(name);
 
@@ -795,7 +857,7 @@ void server_models_routes::init_routes() {
             res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
             return res;
         }
-        if (model->status != SERVER_MODEL_STATUS_LOADED) {
+        if (!model->is_active()) {
             res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
             return res;
         }
index 3e1868c27cc121dac51ac365cafc749d73be219d..7e33537536a2fe067d9f93afe468f99549cc7987 100644 (file)
@@ -9,6 +9,7 @@
 #include <condition_variable>
 #include <functional>
 #include <memory>
+#include <set>
 
 /**
  * state diagram:
@@ -56,6 +57,7 @@ struct server_model_meta {
     int64_t last_used = 0; // for LRU unloading
     std::vector<std::string> args; // args passed to the model instance, will be populated by render_args()
     int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED)
+    int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown
 
     bool is_active() const {
         return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING;
@@ -83,6 +85,10 @@ private:
     std::condition_variable cv;
     std::map<std::string, instance_t> mapping;
 
+    // for stopping models
+    std::condition_variable cv_stop;
+    std::set<std::string> stopping_models;
+
     common_preset_context ctx_preset;
 
     common_params base_params;
@@ -119,7 +125,7 @@ public:
     void unload_all();
 
     // update the status of a model instance (thread-safe)
-    void update_status(const std::string & name, server_model_status status);
+    void update_status(const std::string & name, server_model_status status, int exit_code);
 
     // wait until the model instance is fully loaded (thread-safe)
     // return when the model is loaded or failed to load