退回到 b7516 版本
This commit is contained in:
@@ -21,13 +21,11 @@
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <winsock2.h>
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <unistd.h>
|
||||
extern char **environ;
|
||||
#endif
|
||||
|
||||
#if defined(__APPLE__) && defined(__MACH__)
|
||||
@@ -36,8 +34,6 @@ extern char **environ;
|
||||
#include <limits.h>
|
||||
#endif
|
||||
|
||||
#define DEFAULT_STOP_TIMEOUT 10 // seconds
|
||||
|
||||
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
|
||||
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
|
||||
|
||||
@@ -101,49 +97,6 @@ static void unset_reserved_args(common_preset & preset, bool unset_model_args) {
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
static std::string wide_to_utf8(const wchar_t * ws) {
|
||||
if (!ws || !*ws) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const int len = static_cast<int>(std::wcslen(ws));
|
||||
const int bytes = WideCharToMultiByte(CP_UTF8, 0, ws, len, nullptr, 0, nullptr, nullptr);
|
||||
if (bytes == 0) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string utf8(bytes, '\0');
|
||||
WideCharToMultiByte(CP_UTF8, 0, ws, len, utf8.data(), bytes, nullptr, nullptr);
|
||||
|
||||
return utf8;
|
||||
}
|
||||
#endif
|
||||
|
||||
static std::vector<std::string> get_environment() {
|
||||
std::vector<std::string> env;
|
||||
|
||||
#ifdef _WIN32
|
||||
LPWCH env_block = GetEnvironmentStringsW();
|
||||
if (!env_block) {
|
||||
return env;
|
||||
}
|
||||
for (LPWCH e = env_block; *e; e += wcslen(e) + 1) {
|
||||
env.emplace_back(wide_to_utf8(e));
|
||||
}
|
||||
FreeEnvironmentStringsW(env_block);
|
||||
#else
|
||||
if (environ == nullptr) {
|
||||
return env;
|
||||
}
|
||||
for (char ** e = environ; *e != nullptr; e++) {
|
||||
env.emplace_back(*e);
|
||||
}
|
||||
#endif
|
||||
|
||||
return env;
|
||||
}
|
||||
|
||||
void server_model_meta::update_args(common_preset_context & ctx_preset, std::string bin_path) {
|
||||
// update params
|
||||
unset_reserved_args(preset, false);
|
||||
@@ -162,11 +115,14 @@ void server_model_meta::update_args(common_preset_context & ctx_preset, std::str
|
||||
server_models::server_models(
|
||||
const common_params & params,
|
||||
int argc,
|
||||
char ** argv)
|
||||
char ** argv,
|
||||
char ** envp)
|
||||
: ctx_preset(LLAMA_EXAMPLE_SERVER),
|
||||
base_params(params),
|
||||
base_env(get_environment()),
|
||||
base_preset(ctx_preset.load_from_args(argc, argv)) {
|
||||
for (char ** env = envp; *env != nullptr; env++) {
|
||||
base_env.push_back(std::string(*env));
|
||||
}
|
||||
// clean up base preset
|
||||
unset_reserved_args(base_preset, true);
|
||||
// set binary path
|
||||
@@ -247,14 +203,13 @@ void server_models::load_models() {
|
||||
// convert presets to server_model_meta and add to mapping
|
||||
for (const auto & preset : final_presets) {
|
||||
server_model_meta meta{
|
||||
/* preset */ preset.second,
|
||||
/* name */ preset.first,
|
||||
/* port */ 0,
|
||||
/* status */ SERVER_MODEL_STATUS_UNLOADED,
|
||||
/* last_used */ 0,
|
||||
/* args */ std::vector<std::string>(),
|
||||
/* exit_code */ 0,
|
||||
/* stop_timeout */ DEFAULT_STOP_TIMEOUT,
|
||||
/* preset */ preset.second,
|
||||
/* name */ preset.first,
|
||||
/* port */ 0,
|
||||
/* status */ SERVER_MODEL_STATUS_UNLOADED,
|
||||
/* last_used */ 0,
|
||||
/* args */ std::vector<std::string>(),
|
||||
/* exit_code */ 0
|
||||
};
|
||||
add_model(std::move(meta));
|
||||
}
|
||||
@@ -272,20 +227,6 @@ void server_models::load_models() {
|
||||
}
|
||||
}
|
||||
|
||||
// handle custom stop-timeout option
|
||||
for (auto & [name, inst] : mapping) {
|
||||
std::string val;
|
||||
if (inst.meta.preset.get_option(COMMON_ARG_PRESET_STOP_TIMEOUT, val)) {
|
||||
try {
|
||||
inst.meta.stop_timeout = std::stoi(val);
|
||||
} catch (...) {
|
||||
SRV_WRN("invalid stop-timeout value '%s' for model '%s', using default %d seconds\n",
|
||||
val.c_str(), name.c_str(), DEFAULT_STOP_TIMEOUT);
|
||||
inst.meta.stop_timeout = DEFAULT_STOP_TIMEOUT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// load any autoload models
|
||||
std::vector<std::string> models_to_load;
|
||||
for (const auto & [name, inst] : mapping) {
|
||||
@@ -421,7 +362,7 @@ void server_models::unload_lru() {
|
||||
int64_t lru_last_used = ggml_time_ms();
|
||||
size_t count_active = 0;
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
std::lock_guard<std::mutex> lk(mutex);
|
||||
for (const auto & m : mapping) {
|
||||
if (m.second.meta.is_active()) {
|
||||
count_active++;
|
||||
@@ -435,13 +376,6 @@ void server_models::unload_lru() {
|
||||
if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) {
|
||||
SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str());
|
||||
unload(lru_model_name);
|
||||
// wait for unload to complete
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
cv.wait(lk, [this, &lru_model_name]() {
|
||||
return mapping[lru_model_name].meta.status == SERVER_MODEL_STATUS_UNLOADED;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -502,83 +436,38 @@ void server_models::load(const std::string & name) {
|
||||
|
||||
// start a thread to manage the child process
|
||||
// captured variables are guaranteed to be destroyed only after the thread is joined
|
||||
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port, stop_timeout = inst.meta.stop_timeout]() {
|
||||
FILE * stdin_file = subprocess_stdin(child_proc.get());
|
||||
FILE * stdout_file = subprocess_stdout(child_proc.get()); // combined stdout/stderr
|
||||
|
||||
std::thread log_thread([&]() {
|
||||
// read stdout/stderr and forward to main server log
|
||||
// also handle status report from child process
|
||||
bool state_received = false; // true if child state received
|
||||
if (stdout_file) {
|
||||
char buffer[4096];
|
||||
while (fgets(buffer, sizeof(buffer), stdout_file) != nullptr) {
|
||||
LOG("[%5d] %s", port, buffer);
|
||||
if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
|
||||
// child process is ready
|
||||
this->update_status(name, SERVER_MODEL_STATUS_LOADED, 0);
|
||||
state_received = true;
|
||||
}
|
||||
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
|
||||
// read stdout/stderr and forward to main server log
|
||||
bool state_received = false; // true if child state received
|
||||
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
|
||||
if (p_stdout_stderr) {
|
||||
char buffer[4096];
|
||||
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
|
||||
LOG("[%5d] %s", port, buffer);
|
||||
if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
|
||||
// child process is ready
|
||||
this->update_status(name, SERVER_MODEL_STATUS_LOADED);
|
||||
state_received = true;
|
||||
}
|
||||
} else {
|
||||
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
|
||||
}
|
||||
});
|
||||
|
||||
std::thread stopping_thread([&]() {
|
||||
// thread to monitor stopping signal
|
||||
auto is_stopping = [this, &name]() {
|
||||
return this->stopping_models.find(name) != this->stopping_models.end();
|
||||
};
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(this->mutex);
|
||||
this->cv_stop.wait(lk, is_stopping);
|
||||
}
|
||||
SRV_INF("stopping model instance name=%s\n", name.c_str());
|
||||
// send interrupt to child process
|
||||
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
|
||||
fflush(stdin_file);
|
||||
// wait to stop gracefully or timeout
|
||||
int64_t start_time = ggml_time_ms();
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lk(this->mutex);
|
||||
if (!is_stopping()) {
|
||||
return; // already stopped
|
||||
}
|
||||
int64_t elapsed = ggml_time_ms() - start_time;
|
||||
if (elapsed >= stop_timeout * 1000) {
|
||||
// timeout, force kill
|
||||
SRV_WRN("force-killing model instance name=%s after %d seconds timeout\n", name.c_str(), stop_timeout);
|
||||
subprocess_terminate(child_proc.get());
|
||||
return;
|
||||
}
|
||||
this->cv_stop.wait_for(lk, std::chrono::seconds(1));
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
|
||||
}
|
||||
// we reach here when the child process exits
|
||||
// note: we cannot join() prior to this point because it will close stdin_file
|
||||
if (log_thread.joinable()) {
|
||||
log_thread.join();
|
||||
}
|
||||
|
||||
// stop the timeout monitoring thread
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(this->mutex);
|
||||
stopping_models.erase(name);
|
||||
cv_stop.notify_all();
|
||||
}
|
||||
if (stopping_thread.joinable()) {
|
||||
stopping_thread.join();
|
||||
}
|
||||
|
||||
// get the exit code
|
||||
int exit_code = 0;
|
||||
subprocess_join(child_proc.get(), &exit_code);
|
||||
subprocess_destroy(child_proc.get());
|
||||
|
||||
// update status and exit code
|
||||
this->update_status(name, SERVER_MODEL_STATUS_UNLOADED, exit_code);
|
||||
// update PID and status
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(mutex);
|
||||
auto it = mapping.find(name);
|
||||
if (it != mapping.end()) {
|
||||
auto & meta = it->second.meta;
|
||||
meta.exit_code = exit_code;
|
||||
meta.status = SERVER_MODEL_STATUS_UNLOADED;
|
||||
}
|
||||
cv.notify_all();
|
||||
}
|
||||
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
|
||||
});
|
||||
|
||||
@@ -599,14 +488,22 @@ void server_models::load(const std::string & name) {
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
static void interrupt_subprocess(FILE * stdin_file) {
|
||||
// because subprocess.h does not provide a way to send SIGINT,
|
||||
// we will send a command to the child process to exit gracefully
|
||||
if (stdin_file) {
|
||||
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
|
||||
fflush(stdin_file);
|
||||
}
|
||||
}
|
||||
|
||||
void server_models::unload(const std::string & name) {
|
||||
std::lock_guard<std::mutex> lk(mutex);
|
||||
auto it = mapping.find(name);
|
||||
if (it != mapping.end()) {
|
||||
if (it->second.meta.is_active()) {
|
||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||
stopping_models.insert(name);
|
||||
cv_stop.notify_all();
|
||||
interrupt_subprocess(it->second.stdin_file);
|
||||
// status change will be handled by the managing thread
|
||||
} else {
|
||||
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
|
||||
@@ -621,8 +518,7 @@ void server_models::unload_all() {
|
||||
for (auto & [name, inst] : mapping) {
|
||||
if (inst.meta.is_active()) {
|
||||
SRV_INF("unloading model instance name=%s\n", name.c_str());
|
||||
stopping_models.insert(name);
|
||||
cv_stop.notify_all();
|
||||
interrupt_subprocess(inst.stdin_file);
|
||||
// status change will be handled by the managing thread
|
||||
}
|
||||
// moving the thread to join list to avoid deadlock
|
||||
@@ -636,15 +532,16 @@ void server_models::unload_all() {
|
||||
}
|
||||
}
|
||||
|
||||
void server_models::update_status(const std::string & name, server_model_status status, int exit_code) {
|
||||
std::unique_lock<std::mutex> lk(mutex);
|
||||
auto it = mapping.find(name);
|
||||
if (it != mapping.end()) {
|
||||
auto & meta = it->second.meta;
|
||||
meta.status = status;
|
||||
meta.exit_code = exit_code;
|
||||
void server_models::update_status(const std::string & name, server_model_status status) {
|
||||
// for now, we only allow updating to LOADED status
|
||||
if (status != SERVER_MODEL_STATUS_LOADED) {
|
||||
throw std::runtime_error("invalid status value");
|
||||
}
|
||||
auto meta = get_meta(name);
|
||||
if (meta.has_value()) {
|
||||
meta->status = status;
|
||||
update_meta(name, meta.value());
|
||||
}
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
void server_models::wait_until_loaded(const std::string & name) {
|
||||
@@ -671,7 +568,6 @@ bool server_models::ensure_model_loaded(const std::string & name) {
|
||||
load(name);
|
||||
}
|
||||
|
||||
// for loading state
|
||||
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
|
||||
wait_until_loaded(name);
|
||||
|
||||
@@ -704,10 +600,7 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
|
||||
req.path,
|
||||
req.headers,
|
||||
req.body,
|
||||
req.should_stop,
|
||||
base_params.timeout_read,
|
||||
base_params.timeout_write
|
||||
);
|
||||
req.should_stop);
|
||||
return proxy;
|
||||
}
|
||||
|
||||
@@ -902,7 +795,7 @@ void server_models_routes::init_routes() {
|
||||
res_err(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
|
||||
return res;
|
||||
}
|
||||
if (!model->is_active()) {
|
||||
if (model->status != SERVER_MODEL_STATUS_LOADED) {
|
||||
res_err(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
|
||||
return res;
|
||||
}
|
||||
@@ -995,18 +888,13 @@ server_http_proxy::server_http_proxy(
|
||||
const std::string & path,
|
||||
const std::map<std::string, std::string> & headers,
|
||||
const std::string & body,
|
||||
const std::function<bool()> should_stop,
|
||||
int32_t timeout_read,
|
||||
int32_t timeout_write
|
||||
) {
|
||||
const std::function<bool()> should_stop) {
|
||||
// shared between reader and writer threads
|
||||
auto cli = std::make_shared<httplib::Client>(host, port);
|
||||
auto pipe = std::make_shared<pipe_t<msg_t>>();
|
||||
|
||||
// setup Client
|
||||
cli->set_connection_timeout(0, 200000); // 200 milliseconds
|
||||
cli->set_write_timeout(timeout_read, 0); // reversed for cli (client) vs srv (server)
|
||||
cli->set_read_timeout(timeout_write, 0);
|
||||
this->status = 500; // to be overwritten upon response
|
||||
this->cleanup = [pipe]() {
|
||||
pipe->close_read();
|
||||
|
||||
Reference in New Issue
Block a user