#include <filesystem>
#include <algorithm>
+static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG");
+
+#define LOG_DBG(...) \
+ do { if (RPC_DEBUG) GGML_LOG_DEBUG(__VA_ARGS__); } while (0)
+
+
namespace fs = std::filesystem;
static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB
sockfd_t fd;
socket_t(sockfd_t fd) : fd(fd) {}
~socket_t() {
- GGML_PRINT_DEBUG("[%s] closing socket %d\n", __func__, this->fd);
+ LOG_DBG("[%s] closing socket %d\n", __func__, this->fd);
#ifdef _WIN32
closesocket(this->fd);
#else
return nullptr;
}
if (!set_no_delay(sockfd)) {
- fprintf(stderr, "Failed to set TCP_NODELAY\n");
+ GGML_LOG_ERROR("Failed to set TCP_NODELAY\n");
return nullptr;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
struct hostent * server = gethostbyname(host);
if (server == NULL) {
- fprintf(stderr, "Cannot resolve host '%s'\n", host);
+ GGML_LOG_ERROR("Cannot resolve host '%s'\n", host);
return nullptr;
}
memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length);
return nullptr;
}
if (!set_no_delay(client_socket_fd)) {
- fprintf(stderr, "Failed to set TCP_NODELAY\n");
+ GGML_LOG_ERROR("Failed to set TCP_NODELAY\n");
return nullptr;
}
return client_socket;
return nullptr;
}
if (!set_reuse_addr(sockfd)) {
- fprintf(stderr, "Failed to set SO_REUSEADDR\n");
+ GGML_LOG_ERROR("Failed to set SO_REUSEADDR\n");
return nullptr;
}
if (inet_addr(host) == INADDR_NONE) {
- fprintf(stderr, "Invalid host address: %s\n", host);
+ GGML_LOG_ERROR("Invalid host address: %s\n", host);
return nullptr;
}
struct sockaddr_in serv_addr;
return false;
}
if (n == 0) {
- GGML_LOG_ERROR("recv returned 0 (peer closed?)\n");
+ LOG_DBG("recv returned 0 (peer closed?)\n");
return false;
}
bytes_recv += (size_t)n;
try {
input.resize(size);
} catch (const std::bad_alloc & e) {
- fprintf(stderr, "Failed to allocate input buffer of size %" PRIu64 "\n", size);
+ GGML_LOG_ERROR("Failed to allocate input buffer of size %" PRIu64 "\n", size);
return false;
}
return recv_data(sockfd, input.data(), size);
bool status = send_rpc_cmd(sock, RPC_CMD_HELLO, nullptr, 0, &response, sizeof(response));
RPC_STATUS_ASSERT(status);
if (response.major != RPC_PROTO_MAJOR_VERSION || response.minor > RPC_PROTO_MINOR_VERSION) {
- fprintf(stderr, "RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
+ GGML_LOG_ERROR("RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
return false;
}
if (response.minor != RPC_PROTO_MINOR_VERSION || response.patch != RPC_PROTO_PATCH_VERSION) {
- fprintf(stderr, "WARNING: RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
+ GGML_LOG_INFO("WARNING: RPC server version mismatch: %d.%d.%d\n", response.major, response.minor, response.patch);
}
return true;
}
if (!check_server_version(sock)) {
return nullptr;
}
- GGML_PRINT_DEBUG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
+ LOG_DBG("[%s] connected to %s, sockfd=%d\n", __func__, endpoint.c_str(), sock->fd);
sockets[endpoint] = sock;
return sock;
}
}
auto sock = get_socket(endpoint);
if (sock == nullptr) {
- fprintf(stderr, "Failed to connect to %s\n", endpoint);
+ GGML_LOG_ERROR("Failed to connect to %s\n", endpoint);
return nullptr;
}
size_t alignment = get_alignment(sock);
response.major = RPC_PROTO_MAJOR_VERSION;
response.minor = RPC_PROTO_MINOR_VERSION;
response.patch = RPC_PROTO_PATCH_VERSION;
- GGML_PRINT_DEBUG("[%s] version: %d.%d.%d\n", __func__, response.major, response.minor, response.patch);
+ LOG_DBG("[%s] version: %d.%d.%d\n", __func__, response.major, response.minor, response.patch);
}
bool rpc_server::get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response) {
GGML_LOG_ERROR("Null tensor pointer passed to server get_alloc_size function.\n");
return false;
}
-
+ LOG_DBG("[%s] buffer: %p, data: %p\n", __func__, (void*)tensor->buffer, tensor->data);
if (tensor->buffer == nullptr) {
//No buffer allocated.
buft = ggml_backend_get_default_buffer_type(backend);
buft = tensor->buffer->buft;
}
- response.alloc_size = ggml_backend_buft_get_alloc_size(buft,tensor);
+ response.alloc_size = ggml_backend_buft_get_alloc_size(buft, tensor);
return true;
}
if (buffer != nullptr) {
response.remote_ptr = reinterpret_cast<uint64_t>(buffer);
response.remote_size = buffer->size;
- GGML_PRINT_DEBUG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, request.size, response.remote_ptr, response.remote_size);
+ LOG_DBG("[%s] size: %" PRIu64 " -> remote_ptr: %" PRIx64 ", remote_size: %" PRIu64 "\n", __func__, request.size, response.remote_ptr, response.remote_size);
buffers.insert(buffer);
} else {
- GGML_LOG_ERROR("[%s] size: %" PRIu64 " -> failed\n", __func__, request.size);
+ LOG_DBG("[%s] size: %" PRIu64 " -> failed\n", __func__, request.size);
}
}
void rpc_server::get_alignment(rpc_msg_get_alignment_rsp & response) {
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend);
size_t alignment = ggml_backend_buft_get_alignment(buft);
- GGML_PRINT_DEBUG("[%s] alignment: %lu\n", __func__, alignment);
+ LOG_DBG("[%s] alignment: %lu\n", __func__, alignment);
response.alignment = alignment;
}
void rpc_server::get_max_size(rpc_msg_get_max_size_rsp & response) {
ggml_backend_buffer_type_t buft = ggml_backend_get_default_buffer_type(backend);
size_t max_size = ggml_backend_buft_get_max_size(buft);
- GGML_PRINT_DEBUG("[%s] max_size: %lu\n", __func__, max_size);
+ LOG_DBG("[%s] max_size: %lu\n", __func__, max_size);
response.max_size = max_size;
}
bool rpc_server::buffer_get_base(const rpc_msg_buffer_get_base_req & request, rpc_msg_buffer_get_base_rsp & response) {
- GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
+ LOG_DBG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
if (buffers.find(buffer) == buffers.end()) {
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
}
bool rpc_server::free_buffer(const rpc_msg_free_buffer_req & request) {
- GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
+ LOG_DBG("[%s] remote_ptr: %" PRIx64 "\n", __func__, request.remote_ptr);
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
if (buffers.find(buffer) == buffers.end()) {
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
}
bool rpc_server::buffer_clear(const rpc_msg_buffer_clear_req & request) {
- GGML_PRINT_DEBUG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, request.remote_ptr, request.value);
+ LOG_DBG("[%s] remote_ptr: %" PRIx64 ", value: %u\n", __func__, request.remote_ptr, request.value);
ggml_backend_buffer_t buffer = reinterpret_cast<ggml_backend_buffer_t>(request.remote_ptr);
if (buffers.find(buffer) == buffers.end()) {
GGML_LOG_ERROR("[%s] buffer not found\n", __func__);
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
return false;
}
- GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size);
+ LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu\n", __func__, (void*)tensor->buffer, tensor->data, offset, size);
// sanitize tensor->data
{
fs::path cache_file = fs::path(cache_dir) / hash_str;
std::ofstream ofs(cache_file, std::ios::binary);
ofs.write((const char *)data, size);
- printf("[%s] saved to '%s'\n", __func__, cache_file.c_str());
+ GGML_LOG_INFO("[%s] saved to '%s'\n", __func__, cache_file.c_str());
}
ggml_backend_tensor_set(tensor, data, offset, size);
return true;
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
return false;
}
- GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n",
- __func__, (void*)tensor->buffer, tensor->data, request.offset, size, request.hash);
+ LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n",
+ __func__, (void*)tensor->buffer, tensor->data, request.offset, size, request.hash);
// sanitize tensor->data
{
GGML_LOG_ERROR("Null tensor pointer passed to server init_tensor function.\n");
return false;
}
-
+ LOG_DBG("[%s] buffer: %p, data: %p\n", __func__, (void*)tensor->buffer, tensor->data);
// Call the backend's buffer_init_tensor function
ggml_backend_buffer_t buffer = tensor->buffer;
if (buffer && buffer->iface.init_tensor) {
GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
return false;
}
- GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %" PRIu64 "\n", __func__, (void*)tensor->buffer, tensor->data, request.offset, request.size);
+ LOG_DBG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %" PRIu64 "\n", __func__, (void*)tensor->buffer, tensor->data, request.offset, request.size);
// sanitize tensor->data
{
uint64_t dst_buf_sz = (uint64_t) ggml_backend_buffer_get_size(dst->buffer);
if (dst_data + src_size > dst_base + dst_buf_sz) {
- GGML_PRINT_DEBUG("[%s] out-of-bounds write in rpc_server::copy_tensor:\n"
+ GGML_LOG_ERROR("[%s] out-of-bounds write in rpc_server::copy_tensor:\n"
" write range : [0x%" PRIx64 ", 0x%" PRIx64 "]\n"
" buffer base: [0x%" PRIx64 ", 0x%" PRIx64 "]\n",
__func__,
return false;
}
- GGML_PRINT_DEBUG("[%s] src->buffer: %p, dst->buffer: %p\n",
- __func__, (void*) src->buffer, (void*) dst->buffer);
+ LOG_DBG("[%s] src->buffer: %p, dst->buffer: %p\n",
+ __func__, (void*) src->buffer, (void*) dst->buffer);
response.result = ggml_backend_buffer_copy_tensor(src, dst);
return true;
return false;
}
const rpc_tensor * tensors = (const rpc_tensor *)(input.data() + sizeof(n_nodes) + n_nodes*sizeof(uint64_t) + sizeof(n_tensors));
- GGML_PRINT_DEBUG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors);
+ LOG_DBG("[%s] n_nodes: %u, n_tensors: %u\n", __func__, n_nodes, n_tensors);
size_t buf_size = ggml_tensor_overhead()*(n_nodes + n_tensors) + ggml_graph_overhead_custom(n_nodes, false);
}
// the first command sent by the client must be HELLO
if (cmd != RPC_CMD_HELLO) {
- fprintf(stderr, "Expected HELLO command, update client\n");
+ GGML_LOG_ERROR("Expected HELLO command, update client\n");
return;
}
if (!recv_msg(sockfd, nullptr, 0)) {
}
if (cmd >= RPC_CMD_COUNT) {
// fail fast if the command is invalid
- fprintf(stderr, "Unknown command: %d\n", cmd);
+ GGML_LOG_ERROR("Unknown command: %d\n", cmd);
break;
}
switch (cmd) {
break;
}
default: {
- fprintf(stderr, "Unknown command: %d\n", cmd);
+ GGML_LOG_ERROR("Unknown command: %d\n", cmd);
return;
}
}