#include <thread>
#include <mutex>
#include <chrono>
+#include <condition_variable>
#ifndef SERVER_VERBOSE
#define SERVER_VERBOSE 1
std::vector<task_result> queue_results;
std::vector<task_multi> queue_multitasks;
std::mutex mutex_tasks; // also guards id_gen, and queue_multitasks
+ std::condition_variable condition_tasks;
std::mutex mutex_results;
+ std::condition_variable condition_results;
~llama_server_context()
{
void send_error(task_server& task, std::string error)
{
- std::lock_guard<std::mutex> lock(mutex_results);
+ std::unique_lock<std::mutex> lock(mutex_results);
task_result res;
res.id = task.id;
res.multitask_id = task.multitask_id;
res.error = true;
res.result_json = { { "content", error } };
queue_results.push_back(res);
+ condition_results.notify_all();
}
void add_multi_task(int id, std::vector<int>& sub_ids)
multi.id = id;
std::copy(sub_ids.begin(), sub_ids.end(), std::inserter(multi.subtasks_remaining, multi.subtasks_remaining.end()));
queue_multitasks.push_back(multi);
+ condition_tasks.notify_one();
}
void update_multi_task(int multitask_id, int subtask_id, task_result& result)
{
multitask.subtasks_remaining.erase(subtask_id);
multitask.results.push_back(result);
+ condition_tasks.notify_one();
}
}
}
void send_partial_response(llama_client_slot &slot, completion_token_output tkn)
{
- std::lock_guard<std::mutex> lock(mutex_results);
+ std::unique_lock<std::mutex> lock(mutex_results);
task_result res;
res.id = slot.task_id;
res.multitask_id = slot.multitask_id;
}
queue_results.push_back(res);
+ condition_results.notify_all();
}
void send_final_response(llama_client_slot &slot)
{
- std::lock_guard<std::mutex> lock(mutex_results);
+ std::unique_lock<std::mutex> lock(mutex_results);
task_result res;
res.id = slot.task_id;
res.multitask_id = slot.multitask_id;
}
queue_results.push_back(res);
+ condition_results.notify_all();
}
void send_embedding(llama_client_slot &slot)
{
- std::lock_guard<std::mutex> lock(mutex_results);
+ std::unique_lock<std::mutex> lock(mutex_results);
task_result res;
res.id = slot.task_id;
res.multitask_id = slot.multitask_id;
};
}
queue_results.push_back(res);
+ condition_results.notify_all();
}
int request_completion(json data, bool infill, bool embedding, int multitask_id)
// otherwise, it's a single-prompt task, we actually queue it
queue_tasks.push_back(task);
+ condition_tasks.notify_one();
return task.id;
}
{
while (true)
{
- std::this_thread::sleep_for(std::chrono::microseconds(5));
- std::lock_guard<std::mutex> lock(mutex_results);
-
- if (queue_results.empty())
- {
- continue;
- }
+ std::unique_lock<std::mutex> lock(mutex_results);
+ condition_results.wait(lock, [&]{
+ return !queue_results.empty();
+ });
for (int i = 0; i < (int) queue_results.size(); i++)
{
void request_cancel(int task_id)
{
- std::lock_guard<std::mutex> lock(mutex_tasks);
+ std::unique_lock<std::mutex> lock(mutex_tasks);
task_server task;
task.id = id_gen++;
task.type = CANCEL_TASK;
task.target_id = task_id;
queue_tasks.push_back(task);
+ condition_tasks.notify_one();
}
int split_multiprompt_task(task_server& multiprompt_task)
void process_tasks()
{
- std::lock_guard<std::mutex> lock(mutex_tasks);
+ std::unique_lock<std::mutex> lock(mutex_tasks);
while (!queue_tasks.empty())
{
task_server task = queue_tasks.front();
std::lock_guard<std::mutex> lock(mutex_results);
queue_results.push_back(aggregate_result);
+ condition_results.notify_all();
queue_iterator = queue_multitasks.erase(queue_iterator);
}
LOG_TEE("all slots are idle and system prompt is empty, clear the KV cache\n");
kv_cache_clear();
}
- // avoid 100% usage of cpu all time
- std::this_thread::sleep_for(std::chrono::milliseconds(5));
+ std::unique_lock<std::mutex> lock(mutex_tasks);
+ condition_tasks.wait(lock, [&]{
+ return !queue_tasks.empty();
+ });
}
for (llama_client_slot &slot : slots)