#include <limits.h>
#endif
+#define DEFAULT_STOP_TIMEOUT 10 // seconds
+
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
// convert presets to server_model_meta and add to mapping
for (const auto & preset : final_presets) {
server_model_meta meta{
- /* preset */ preset.second,
- /* name */ preset.first,
- /* port */ 0,
- /* status */ SERVER_MODEL_STATUS_UNLOADED,
- /* last_used */ 0,
- /* args */ std::vector<std::string>(),
- /* exit_code */ 0
+ /* preset */ preset.second,
+ /* name */ preset.first,
+ /* port */ 0,
+ /* status */ SERVER_MODEL_STATUS_UNLOADED,
+ /* last_used */ 0,
+ /* args */ std::vector<std::string>(),
+ /* exit_code */ 0,
+ /* stop_timeout */ DEFAULT_STOP_TIMEOUT,
};
add_model(std::move(meta));
}
}
}
+ // handle custom stop-timeout option
+ for (auto & [name, inst] : mapping) {
+ std::string val;
+ if (inst.meta.preset.get_option(COMMON_ARG_PRESET_STOP_TIMEOUT, val)) {
+ try {
+ inst.meta.stop_timeout = std::stoi(val);
+ } catch (...) {
+ SRV_WRN("invalid stop-timeout value '%s' for model '%s', using default %d seconds\n",
+ val.c_str(), name.c_str(), DEFAULT_STOP_TIMEOUT);
+ inst.meta.stop_timeout = DEFAULT_STOP_TIMEOUT;
+ }
+ }
+ }
+
// load any autoload models
std::vector<std::string> models_to_load;
for (const auto & [name, inst] : mapping) {
int64_t lru_last_used = ggml_time_ms();
size_t count_active = 0;
{
- std::lock_guard<std::mutex> lk(mutex);
+ std::unique_lock<std::mutex> lk(mutex);
for (const auto & m : mapping) {
if (m.second.meta.is_active()) {
count_active++;
if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) {
SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str());
unload(lru_model_name);
+ // wait for unload to complete
+ {
+ std::unique_lock<std::mutex> lk(mutex);
+ cv.wait(lk, [this, &lru_model_name]() {
+ return mapping[lru_model_name].meta.status == SERVER_MODEL_STATUS_UNLOADED;
+ });
+ }
}
}
// start a thread to manage the child process
// captured variables are guaranteed to be destroyed only after the thread is joined
- inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
- // read stdout/stderr and forward to main server log
- bool state_received = false; // true if child state received
- FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
- if (p_stdout_stderr) {
- char buffer[4096];
- while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
- LOG("[%5d] %s", port, buffer);
- if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
- // child process is ready
- this->update_status(name, SERVER_MODEL_STATUS_LOADED);
- state_received = true;
+ inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port, stop_timeout = inst.meta.stop_timeout]() {
+ FILE * stdin_file = subprocess_stdin(child_proc.get());
+ FILE * stdout_file = subprocess_stdout(child_proc.get()); // combined stdout/stderr
+
+ std::thread log_thread([&]() {
+ // read stdout/stderr and forward to main server log
+ // also handle status report from child process
+ bool state_received = false; // true if child state received
+ if (stdout_file) {
+ char buffer[4096];
+ while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) {
+ LOG("[%5d] %s", port, buffer);
+ if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
+ // child process is ready
+ this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0);
+ state_received = true;
+ }
}
+ } else {
+ SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
}
- } else {
- SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
- }
+ });
+
+ std::thread stopping_thread([&]() {
+ // thread to monitor stopping signal
+ auto is_stopping = [this, &name]() {
+ return this->stopping_models.find(name) != this->stopping_models.end();
+ };
+ {
+ std::unique_lock<std::mutex> lk(this->mutex);
+ this->cv_stop.wait(lk, is_stopping);
+ }
+ SRV_INF("stopping model instance name=%s\n", name.c_str());
+ // send interrupt to child process
+ fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
+ fflush(stdin_file);
+ // wait to stop gracefully or timeout
+ int64_t start_time = ggml_time_ms();
+ while (true) {
+ std::unique_lock<std::mutex> lk(this->mutex);
+ if (!is_stopping()) {
+ return; // already stopped
+ }
+ int64_t elapsed = ggml_time_ms() - start_time;
+ if (elapsed >= stop_timeout * 1000) {
+ // timeout, force kill
+ SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
+ subprocess_terminate(child_proc.get());
+ return;
+ }
+ this->cv_stop.wait_for(lk, std::chrono::seconds(1));
+ }
+ });
+
// we reach here when the child process exits
+ // note: we cannot join() prior to this point because it will close stdin_file
+ if (log_thread.joinable()) {
+ log_thread.join();
+ }
+
+ // stop the timeout monitoring thread
+ {
+ std::lock_guard<std::mutex> lk(this->mutex);
+ stopping_models.erase(name);
+ cv_stop.notify_all();
+ }
+ if (stopping_thread.joinable()) {
+ stopping_thread.join();
+ }
+
+ // get the exit code
int exit_code = 0;
subprocess_join(child_proc.get(), &exit_code);
subprocess_destroy(child_proc.get());
- // update PID and status
- {
- std::lock_guard<std::mutex> lk(mutex);
- auto it = mapping.find(name);
- if (it != mapping.end()) {
- auto & meta = it->second.meta;
- meta.exit_code = exit_code;
- meta.status = SERVER_MODEL_STATUS_UNLOADED;
- }
- cv.notify_all();
- }
+
+ // update status and exit code
+ this->update_status(name, SERVER_MODEL_STATUS_UNLOADED, exit_code);
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
});
cv.notify_all();
}
-static void interrupt_subprocess(FILE * stdin_file) {
- // because subprocess.h does not provide a way to send SIGINT,
- // we will send a command to the child process to exit gracefully
- if (stdin_file) {
- fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
- fflush(stdin_file);
- }
-}
-
void server_models::unload(const std::string & name) {
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
if (it->second.meta.is_active()) {
SRV_INF("unloading model instance name=%s\n", name.c_str());
- interrupt_subprocess(it->second.stdin_file);
+ stopping_models.insert(name);
+ cv_stop.notify_all();
// status change will be handled by the managing thread
} else {
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
for (auto & [name, inst] : mapping) {
if (inst.meta.is_active()) {
SRV_INF("unloading model instance name=%s\n", name.c_str());
- interrupt_subprocess(inst.stdin_file);
+ stopping_models.insert(name);
+ cv_stop.notify_all();
// status change will be handled by the managing thread
}
// moving the thread to join list to avoid deadlock
}
}
-void server_models::update_status(const std::string & name, server_model_status status) {
- // for now, we only allow updating to LOADED status
- if (status != SERVER_MODEL_STATUS_LOADED) {
- throw std::runtime_error("invalid status value");
- }
- auto meta = get_meta(name);
- if (meta.has_value()) {
- meta->status = status;
- update_meta(name, meta.value());
+void server_models::update_status(const std::string & name, server_model_status status, int exit_code) {
+ std::unique_lock<std::mutex> lk(mutex);
+ auto it = mapping.find(name);
+ if (it != mapping.end()) {
+ auto & meta = it->second.meta;
+ meta.status = status;
+ meta.exit_code = exit_code;
}
+ cv.notify_all();
}
void server_models::wait_until_loaded(const std::string & name) {
load(name);
}
+ // for loading state
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
wait_until_loaded(name);
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
return res;
}
- if (model->status != SERVER_MODEL_STATUS_LOADED) {
+ if (!model->is_active()) {
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
return res;
}