#define DEFAULT_STOP_TIMEOUT 10 // seconds
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
-#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
+#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" // also sent when waking up from sleep
+#define CMD_CHILD_TO_ROUTER_SLEEP "cmd_child_to_router:sleep"
// address for child process, this is needed because router may run on 0.0.0.0
// ref: https://github.com/ggml-org/llama.cpp/issues/17862
if (it != mapping.end()) {
it->second.meta = meta;
}
- cv.notify_all(); // notify wait_until_loaded
+ cv.notify_all(); // notify wait_until_loading_finished
}
bool server_models::has_model(const std::string & name) {
{
std::unique_lock<std::mutex> lk(mutex);
for (const auto & m : mapping) {
- if (m.second.meta.is_active()) {
+ if (m.second.meta.is_running()) {
count_active++;
if (m.second.meta.last_used < lru_last_used) {
lru_model_name = m.first;
if (base_params.models_max > 0) {
size_t count_active = 0;
for (const auto & m : mapping) {
- if (m.second.meta.is_active()) {
+ if (m.second.meta.is_running()) {
count_active++;
}
}
std::thread log_thread([&]() {
// read stdout/stderr and forward to main server log
// also handle status report from child process
- bool state_received = false; // true if child state received
if (stdout_file) {
char buffer[4096];
while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) {
LOG("[%5d] %s", port, buffer);
- if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
- // child process is ready
+ std::string str(buffer);
+ if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_READY)) {
this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0);
- state_received = true;
+ } else if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_SLEEP)) {
+ this->update_status(name, SERVER_MODEL_STATUS_SLEEPING, 0);
}
}
} else {
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
- if (it->second.meta.is_active()) {
- SRV_INF("unloading model instance name=%s\n", name.c_str());
+ if (it->second.meta.is_running()) {
+ SRV_INF("stopping model instance name=%s\n", name.c_str());
stopping_models.insert(name);
cv_stop.notify_all();
// status change will be handled by the managing thread
} else {
- SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
+ SRV_WRN("model instance name=%s is not running\n", name.c_str());
}
}
}
{
std::lock_guard<std::mutex> lk(mutex);
for (auto & [name, inst] : mapping) {
- if (inst.meta.is_active()) {
- SRV_INF("unloading model instance name=%s\n", name.c_str());
+ if (inst.meta.is_running()) {
+ SRV_INF("stopping model instance name=%s\n", name.c_str());
stopping_models.insert(name);
cv_stop.notify_all();
// status change will be handled by the managing thread
cv.notify_all();
}
-void server_models::wait_until_loaded(const std::string & name) {
+void server_models::wait_until_loading_finished(const std::string & name) {
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk, [this, &name]() {
auto it = mapping.find(name);
});
}
-bool server_models::ensure_model_loaded(const std::string & name) {
+bool server_models::ensure_model_ready(const std::string & name) {
auto meta = get_meta(name);
if (!meta.has_value()) {
throw std::runtime_error("model name=" + name + " is not found");
}
- if (meta->status == SERVER_MODEL_STATUS_LOADED) {
- return false; // already loaded
+ if (meta->is_ready()) {
+ return false; // ready for taking requests
+ }
+ if (meta->status == SERVER_MODEL_STATUS_SLEEPING) {
+ return false; // child is sleeping but still running; new request will wake it up
}
if (meta->status == SERVER_MODEL_STATUS_UNLOADED) {
SRV_INF("model name=%s is not loaded, loading...\n", name.c_str());
load(name);
}
- // for loading state
+ // wait for loading to complete
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
- wait_until_loaded(name);
+ wait_until_loading_finished(name);
// check final status
meta = get_meta(name);
if (!meta.has_value()) {
throw std::runtime_error("model name=" + name + " is not found");
}
- if (meta->status != SERVER_MODEL_STATUS_LOADED) {
- throw std::invalid_argument("model name=" + name + " is not loaded");
+ if (!meta->is_running()) {
+ throw std::invalid_argument("model name=" + name + " is not running");
}
if (update_last_used) {
std::unique_lock<std::mutex> lk(mutex);
return proxy;
}
+bool server_models::is_child_server() {
+ const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
+ return router_port != nullptr;
+}
+
std::thread server_models::setup_child_server(const std::function<void(int)> & shutdown_handler) {
// send a notification to the router server that a model instance is ready
common_log_pause(common_log_main());
});
}
+void server_models::notify_router_sleeping_state(bool is_sleeping) {
+ common_log_pause(common_log_main());
+ fflush(stdout);
+ fprintf(stdout, "%s\n", is_sleeping ? CMD_CHILD_TO_ROUTER_SLEEP : CMD_CHILD_TO_ROUTER_READY);
+ fflush(stdout);
+ common_log_resume(common_log_main());
+}
//
// resolve alias to canonical model name
name = meta->name;
if (models_autoload) {
- models.ensure_model_loaded(name);
+ models.ensure_model_ready(name);
} else {
- if (meta->status != SERVER_MODEL_STATUS_LOADED) {
+ if (!meta->is_running()) {
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
return false;
}
res_err(res, format_error_response("model is not found", ERROR_TYPE_NOT_FOUND));
return res;
}
- if (meta->status == SERVER_MODEL_STATUS_LOADED) {
- res_err(res, format_error_response("model is already loaded", ERROR_TYPE_INVALID_REQUEST));
+ if (meta->is_running()) {
+ res_err(res, format_error_response("model is already running", ERROR_TYPE_INVALID_REQUEST));
return res;
}
models.load(meta->name);
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
return res;
}
- if (!model->is_active()) {
- res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
+ if (!model->is_running()) {
+ res_err(res, format_error_response("model is not running", ERROR_TYPE_INVALID_REQUEST));
return res;
}
models.unload(model->name);
/**
* state diagram:
*
- * UNLOADED ──► LOADING ──► LOADED
- * ▲ │ │
- * └───failed───┘ │
- * â\96² â\94\82
+ * UNLOADED ──► LOADING ──► LOADED ◄──── SLEEPING
+ * ▲ │ │ ▲
+ * └───failed───┘ │ │
+ * â\96² â\94\94â\94\80â\94\80sleepingâ\94\80â\94\80â\94\80â\94\80â\94\80â\94\98
* └────────unloaded─────────┘
*/
enum server_model_status {
// TODO: also add downloading state when the logic is added
SERVER_MODEL_STATUS_UNLOADED,
SERVER_MODEL_STATUS_LOADING,
- SERVER_MODEL_STATUS_LOADED
+ SERVER_MODEL_STATUS_LOADED,
+ SERVER_MODEL_STATUS_SLEEPING
};
static server_model_status server_model_status_from_string(const std::string & status_str) {
if (status_str == "loaded") {
return SERVER_MODEL_STATUS_LOADED;
}
+ if (status_str == "sleeping") {
+ return SERVER_MODEL_STATUS_SLEEPING;
+ }
throw std::runtime_error("invalid server model status");
}
case SERVER_MODEL_STATUS_UNLOADED: return "unloaded";
case SERVER_MODEL_STATUS_LOADING: return "loading";
case SERVER_MODEL_STATUS_LOADED: return "loaded";
+ case SERVER_MODEL_STATUS_SLEEPING: return "sleeping";
default: return "unknown";
}
}
int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED)
int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown
- bool is_active() const {
- return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING;
+ bool is_ready() const {
+ return status == SERVER_MODEL_STATUS_LOADED;
+ }
+
+ bool is_running() const {
+ return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING || status == SERVER_MODEL_STATUS_SLEEPING;
}
bool is_failed() const {
void update_status(const std::string & name, server_model_status status, int exit_code);
// wait until the model instance is fully loaded (thread-safe)
- // return when the model is loaded or failed to load
- void wait_until_loaded(const std::string & name);
+ // return when the model no longer in "loading" state
+ void wait_until_loading_finished(const std::string & name);
- // load the model if not loaded, otherwise do nothing (thread-safe)
- // return false if model is already loaded; return true otherwise (meta may need to be refreshed)
- bool ensure_model_loaded(const std::string & name);
+ // ensure the model is in ready state (thread-safe)
+ // return false if model is ready
+ // otherwise, load the model and blocking wait until it's ready, then return true (meta may need to be refreshed)
+ bool ensure_model_ready(const std::string & name);
// proxy an HTTP request to the model instance
server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used);
+ // return true if the current process is a child server instance
+ static bool is_child_server();
+
// notify the router server that a model instance is ready
// return the monitoring thread (to be joined by the caller)
static std::thread setup_child_server(const std::function<void(int)> & shutdown_handler);
+
+ // notify the router server that the sleeping state has changed
+ static void notify_router_sleeping_state(bool sleeping);
};
struct server_models_routes {