#include <chrono>
#include <queue>
#include <filesystem>
+#include <cstring>
#ifdef _WIN32
#include <winsock2.h>
#include <limits.h>
#endif
-#define CMD_EXIT "exit"
+#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
+#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
// address for child process, this is needed because router may run on 0.0.0.0
// ref: https://github.com/ggml-org/llama.cpp/issues/17862
std::vector<char *> argv = to_char_ptr_array(child_args);
std::vector<char *> envp = to_char_ptr_array(child_env);
+ // TODO @ngxson : maybe separate stdout and stderr in the future
+ // so that we can use stdout for commands and stderr for logging
int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr;
int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get());
if (result != 0) {
// captured variables are guaranteed to be destroyed only after the thread is joined
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
// read stdout/stderr and forward to main server log
+ bool state_received = false; // true if child state received
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
if (p_stdout_stderr) {
char buffer[4096];
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
LOG("[%5d] %s", port, buffer);
+ if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
+ // child process is ready
+ this->update_status(name, SERVER_MODEL_STATUS_LOADED);
+ state_received = true;
+ }
}
} else {
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
// because subprocess.h does not provide a way to send SIGINT,
// we will send a command to the child process to exit gracefully
if (stdin_file) {
- fprintf(stdin_file, "%s\n", CMD_EXIT);
+ fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
fflush(stdin_file);
}
}
return proxy;
}
-std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) {
+std::thread server_models::setup_child_server(const std::function<void(int)> & shutdown_handler) {
// send a notification to the router server that a model instance is ready
- // TODO @ngxson : use HTTP client from libcommon
- httplib::Client cli(base_params.hostname, router_port);
- cli.set_connection_timeout(0, 200000); // 200 milliseconds
-
- httplib::Request req;
- req.method = "POST";
- req.path = "/models/status";
- req.set_header("Content-Type", "application/json");
- if (!base_params.api_keys.empty()) {
- req.set_header("Authorization", "Bearer " + base_params.api_keys[0]);
- }
-
- json body;
- body["model"] = name;
- body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
- req.body = body.dump();
-
- SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
- auto result = cli.send(std::move(req));
- if (result.error() != httplib::Error::Success) {
- auto err_str = httplib::to_string(result.error());
- SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
- exit(1); // force exit
- }
+ common_log_pause(common_log_main());
+ fflush(stdout);
+ fprintf(stdout, "%s\n", CMD_CHILD_TO_ROUTER_READY);
+ fflush(stdout);
+ common_log_resume(common_log_main());
// setup thread for monitoring stdin
return std::thread([shutdown_handler]() {
eof = true;
break;
}
- if (line.find(CMD_EXIT) != std::string::npos) {
+ if (line.find(CMD_ROUTER_TO_CHILD_EXIT) != std::string::npos) {
SRV_INF("%s", "exit command received, exiting...\n");
shutdown_handler(0);
break;
return res;
};
- // used by child process to notify the router about status change
- // TODO @ngxson : maybe implement authentication for this endpoint in the future
- this->post_router_models_status = [this](const server_http_req & req) {
- auto res = std::make_unique<server_http_res>();
- json body = json::parse(req.body);
- std::string model = json_value(body, "model", std::string());
- std::string value = json_value(body, "value", std::string());
- models.update_status(model, server_model_status_from_string(value));
- res_ok(res, {{"success", true}});
- return res;
- };
-
this->get_router_models = [this](const server_http_req &) {
auto res = std::make_unique<server_http_res>();
json models_json = json::array();