]> git.djapps.eu Git - pkg/ggml/sources/llama.cpp/commitdiff
rpc : send hash when tensor data is above some fixed threshold (#12496)
authorRadoslav Gerganov <redacted>
Fri, 28 Mar 2025 06:18:04 +0000 (08:18 +0200)
committerGitHub <redacted>
Fri, 28 Mar 2025 06:18:04 +0000 (08:18 +0200)
* rpc : send hash when tensor data is above some fixed threshold

ref #10095

* rpc : put cache under $HOME/.cache/llama.cpp

* try to fix win32 build

* another try to fix win32 build

* remove llama as dependency

examples/rpc/CMakeLists.txt
examples/rpc/rpc-server.cpp
ggml/include/ggml-rpc.h
ggml/src/ggml-rpc/ggml-rpc.cpp

index ae48fb98d0913bad1e6fc9a2a27d82e985fd3f2b..c2c748148645e66aeeae90066f07f56665bf1701 100644 (file)
@@ -1,2 +1,4 @@
-add_executable(rpc-server rpc-server.cpp)
-target_link_libraries(rpc-server PRIVATE ggml llama)
+set(TARGET rpc-server)
+add_executable(${TARGET} rpc-server.cpp)
+target_link_libraries(${TARGET} PRIVATE ggml)
+target_compile_features(${TARGET} PRIVATE cxx_std_17)
index 8b1b23edad174c4a104f15769dd94bab9c4c6c16..3d590feb086c6ff8fb9bb6baa21c92c3731be490 100644 (file)
@@ -1,3 +1,7 @@
+#if defined(_MSC_VER)
+#define _SILENCE_CXX17_CODECVT_HEADER_DEPRECATION_WARNING
+#endif
+
 #include "ggml-cpu.h"
 
 #ifdef GGML_USE_CUDA
 
 #include "ggml-rpc.h"
 #ifdef _WIN32
+#  define DIRECTORY_SEPARATOR '\\'
+#  include <locale>
 #  include <windows.h>
+#  include <fcntl.h>
+#  include <io.h>
 #else
+#  define DIRECTORY_SEPARATOR '/'
 #  include <unistd.h>
+#  include <sys/stat.h>
 #endif
+#include <codecvt>
 #include <string>
 #include <stdio.h>
+#include <vector>
+#include <filesystem>
+
+namespace fs = std::filesystem;
+
+// NOTE: this is copied from common.cpp to avoid linking with libcommon
+// returns true if successful, false otherwise
+static bool fs_create_directory_with_parents(const std::string & path) {
+#ifdef _WIN32
+    std::wstring_convert<std::codecvt_utf8<wchar_t>> converter;
+    std::wstring wpath = converter.from_bytes(path);
+
+    // if the path already exists, check whether it's a directory
+    const DWORD attributes = GetFileAttributesW(wpath.c_str());
+    if ((attributes != INVALID_FILE_ATTRIBUTES) && (attributes & FILE_ATTRIBUTE_DIRECTORY)) {
+        return true;
+    }
+
+    size_t pos_slash = 0;
+
+    // process path from front to back, procedurally creating directories
+    while ((pos_slash = path.find('\\', pos_slash)) != std::string::npos) {
+        const std::wstring subpath = wpath.substr(0, pos_slash);
+        const wchar_t * test = subpath.c_str();
+
+        const bool success = CreateDirectoryW(test, NULL);
+        if (!success) {
+            const DWORD error = GetLastError();
+
+            // if the path already exists, ensure that it's a directory
+            if (error == ERROR_ALREADY_EXISTS) {
+                const DWORD attributes = GetFileAttributesW(subpath.c_str());
+                if (attributes == INVALID_FILE_ATTRIBUTES || !(attributes & FILE_ATTRIBUTE_DIRECTORY)) {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        }
+
+        pos_slash += 1;
+    }
+
+    return true;
+#else
+    // if the path already exists, check whether it's a directory
+    struct stat info;
+    if (stat(path.c_str(), &info) == 0) {
+        return S_ISDIR(info.st_mode);
+    }
+
+    size_t pos_slash = 1; // skip leading slashes for directory creation
+
+    // process path from front to back, procedurally creating directories
+    while ((pos_slash = path.find('/', pos_slash)) != std::string::npos) {
+        const std::string subpath = path.substr(0, pos_slash);
+        struct stat info;
+
+        // if the path already exists, ensure that it's a directory
+        if (stat(subpath.c_str(), &info) == 0) {
+            if (!S_ISDIR(info.st_mode)) {
+                return false;
+            }
+        } else {
+            // create parent directories
+            const int ret = mkdir(subpath.c_str(), 0755);
+            if (ret != 0) {
+                return false;
+            }
+        }
+
+        pos_slash += 1;
+    }
+
+    return true;
+#endif // _WIN32
+}
+
+// NOTE: this is copied from common.cpp to avoid linking with libcommon
+static std::string fs_get_cache_directory() {
+    std::string cache_directory = "";
+    auto ensure_trailing_slash = [](std::string p) {
+        // Make sure to add trailing slash
+        if (p.back() != DIRECTORY_SEPARATOR) {
+            p += DIRECTORY_SEPARATOR;
+        }
+        return p;
+    };
+    if (getenv("LLAMA_CACHE")) {
+        cache_directory = std::getenv("LLAMA_CACHE");
+    } else {
+#ifdef __linux__
+        if (std::getenv("XDG_CACHE_HOME")) {
+            cache_directory = std::getenv("XDG_CACHE_HOME");
+        } else {
+            cache_directory = std::getenv("HOME") + std::string("/.cache/");
+        }
+#elif defined(__APPLE__)
+        cache_directory = std::getenv("HOME") + std::string("/Library/Caches/");
+#elif defined(_WIN32)
+        cache_directory = std::getenv("LOCALAPPDATA");
+#endif // __linux__
+        cache_directory = ensure_trailing_slash(cache_directory);
+        cache_directory += "llama.cpp";
+    }
+    return ensure_trailing_slash(cache_directory);
+}
 
 struct rpc_server_params {
     std::string host        = "127.0.0.1";
     int         port        = 50052;
     size_t      backend_mem = 0;
+    bool        use_cache   = false;
 };
 
 static void print_usage(int /*argc*/, char ** argv, rpc_server_params params) {
     fprintf(stderr, "Usage: %s [options]\n\n", argv[0]);
     fprintf(stderr, "options:\n");
-    fprintf(stderr, "  -h, --help            show this help message and exit\n");
-    fprintf(stderr, "  -H HOST, --host HOST  host to bind to (default: %s)\n", params.host.c_str());
-    fprintf(stderr, "  -p PORT, --port PORT  port to bind to (default: %d)\n", params.port);
-    fprintf(stderr, "  -m MEM, --mem MEM     backend memory size (in MB)\n");
+    fprintf(stderr, "  -h, --help                show this help message and exit\n");
+    fprintf(stderr, "  -H HOST, --host HOST      host to bind to (default: %s)\n", params.host.c_str());
+    fprintf(stderr, "  -p PORT, --port PORT      port to bind to (default: %d)\n", params.port);
+    fprintf(stderr, "  -m MEM,  --mem MEM        backend memory size (in MB)\n");
+    fprintf(stderr, "  -c,      --cache          enable local file cache\n");
     fprintf(stderr, "\n");
 }
 
@@ -58,6 +178,8 @@ static bool rpc_server_params_parse(int argc, char ** argv, rpc_server_params &
             if (params.port <= 0 || params.port > 65535) {
                 return false;
             }
+        } else if (arg == "-c" || arg == "--cache") {
+            params.use_cache = true;
         } else if (arg == "-m" || arg == "--mem") {
             if (++i >= argc) {
                 return false;
@@ -164,8 +286,20 @@ int main(int argc, char * argv[]) {
     } else {
         get_backend_memory(&free_mem, &total_mem);
     }
-    printf("Starting RPC server on %s, backend memory: %zu MB\n", endpoint.c_str(), free_mem / (1024 * 1024));
-    ggml_backend_rpc_start_server(backend, endpoint.c_str(), free_mem, total_mem);
+    const char * cache_dir = nullptr;
+    std::string cache_dir_str = fs_get_cache_directory() + "rpc/";
+    if (params.use_cache) {
+        if (!fs_create_directory_with_parents(cache_dir_str)) {
+            fprintf(stderr, "Failed to create cache directory: %s\n", cache_dir_str.c_str());
+            return 1;
+        }
+        cache_dir = cache_dir_str.c_str();
+    }
+    printf("Starting RPC server\n");
+    printf("  endpoint       : %s\n", endpoint.c_str());
+    printf("  local cache    : %s\n", cache_dir ? cache_dir : "n/a");
+    printf("  backend memory : %zu MB\n", free_mem / (1024 * 1024));
+    ggml_backend_rpc_start_server(backend, endpoint.c_str(), cache_dir, free_mem, total_mem);
     ggml_backend_free(backend);
     return 0;
 }
index ade6c3b0ef13bd8783b58efa3558aede40cd5fe8..4e0d210f8ec973a3a60bc75e2d2b200bcf98d53d 100644 (file)
@@ -17,7 +17,9 @@ GGML_BACKEND_API ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const c
 
 GGML_BACKEND_API void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, size_t * total);
 
-GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, size_t free_mem, size_t total_mem);
+GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint,
+                                                    const char * cache_dir,
+                                                    size_t free_mem, size_t total_mem);
 
 GGML_BACKEND_API ggml_backend_reg_t ggml_backend_rpc_reg(void);
 
index 6c3b80b0883c9e332b1054c6bd32a0becc2d852d..862b9b666175d88fc8e9491e24510c7ae0e7fc27 100644 (file)
 #  include <unistd.h>
 #endif
 #include <cstring>
+#include <fstream>
+#include <filesystem>
+
+namespace fs = std::filesystem;
 
 #ifdef _WIN32
 typedef SOCKET sockfd_t;
@@ -80,6 +84,7 @@ enum rpc_cmd {
     RPC_CMD_FREE_BUFFER,
     RPC_CMD_BUFFER_CLEAR,
     RPC_CMD_SET_TENSOR,
+    RPC_CMD_SET_TENSOR_HASH,
     RPC_CMD_GET_TENSOR,
     RPC_CMD_COPY_TENSOR,
     RPC_CMD_GRAPH_COMPUTE,
@@ -89,6 +94,9 @@ enum rpc_cmd {
     RPC_CMD_COUNT,
 };
 
+// Try RPC_CMD_SET_TENSOR_HASH first when data size is larger than this threshold
+const size_t HASH_THRESHOLD = 10 * 1024 * 1024;
+
 struct rpc_msg_get_alloc_size_req {
     rpc_tensor tensor;
 };
@@ -135,6 +143,10 @@ struct rpc_msg_buffer_clear_req {
     uint8_t value;
 };
 
+struct rpc_msg_set_tensor_hash_rsp {
+    uint8_t result;
+};
+
 struct rpc_msg_get_tensor_req {
     rpc_tensor tensor;
     uint64_t offset;
@@ -187,6 +199,18 @@ struct ggml_backend_rpc_buffer_context {
 
 // RPC helper functions
 
+// Computes FNV-1a hash of the data
+static uint64_t fnv_hash(const uint8_t * data, size_t len) {
+    const uint64_t fnv_prime = 0x100000001b3ULL;
+    uint64_t hash = 0xcbf29ce484222325ULL;
+
+    for (size_t i = 0; i < len; ++i) {
+        hash ^= data[i];
+        hash *= fnv_prime;
+    }
+    return hash;
+}
+
 static std::shared_ptr<socket_t> make_socket(sockfd_t fd) {
 #ifdef _WIN32
     if (fd == INVALID_SOCKET) {
@@ -483,10 +507,26 @@ static enum ggml_status ggml_backend_rpc_buffer_init_tensor(ggml_backend_buffer_
 
 static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, const void * data, size_t offset, size_t size) {
     ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context;
-    // input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) |
+    rpc_tensor rpc_tensor = serialize_tensor(tensor);
+    if (size > HASH_THRESHOLD) {
+        // input serialization format: | rpc_tensor | offset (8 bytes) | hash (8 bytes)
+        size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + sizeof(uint64_t);
+        std::vector<uint8_t> input(input_size, 0);
+        uint64_t hash = fnv_hash((const uint8_t*)data, size);
+        memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor));
+        memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset));
+        memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), &hash, sizeof(hash));
+        rpc_msg_set_tensor_hash_rsp response;
+        bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR_HASH, input.data(), input.size(), &response, sizeof(response));
+        GGML_ASSERT(status);
+        if (response.result) {
+            // the server has the same data, no need to send it
+            return;
+        }
+    }
+    // input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes)
     size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + size;
     std::vector<uint8_t> input(input_size, 0);
-    rpc_tensor rpc_tensor = serialize_tensor(tensor);
     memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor));
     memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset));
     memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size);
@@ -772,7 +812,9 @@ void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, si
 
 class rpc_server {
 public:
-    rpc_server(ggml_backend_t backend) : backend(backend) {}
+    rpc_server(ggml_backend_t backend, const char * cache_dir)
+        : backend(backend), cache_dir(cache_dir) {
+    }
     ~rpc_server();
 
     void alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_alloc_buffer_rsp & response);
@@ -782,6 +824,7 @@ public:
     bool free_buffer(const rpc_msg_free_buffer_req & request);
     bool buffer_clear(const rpc_msg_buffer_clear_req & request);
     bool set_tensor(const std::vector<uint8_t> & input);
+    bool set_tensor_hash(const std::vector<uint8_t> & input, rpc_msg_set_tensor_hash_rsp & response);
     bool get_tensor(const rpc_msg_get_tensor_req & request, std::vector<uint8_t> & response);
     bool copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_copy_tensor_rsp & response);
     bool graph_compute(const std::vector<uint8_t> & input, rpc_msg_graph_compute_rsp & response);
@@ -789,6 +832,7 @@ public:
     bool get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response);
 
 private:
+    bool get_cached_file(uint64_t hash, std::vector<uint8_t> & data);
     ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor);
     ggml_tensor * create_node(uint64_t id,
                               struct ggml_context * ctx,
@@ -797,6 +841,7 @@ private:
 
 
     ggml_backend_t backend;
+    const char * cache_dir;
     std::unordered_set<ggml_backend_buffer_t> buffers;
 };
 
@@ -960,11 +1005,85 @@ bool rpc_server::set_tensor(const std::vector<uint8_t> & input) {
     }
 
     const void * data = input.data() + sizeof(rpc_tensor) + sizeof(offset);
+    if (cache_dir && size > HASH_THRESHOLD) {
+        uint64_t hash = fnv_hash((const uint8_t*)data, size);
+        char hash_str[17];
+        snprintf(hash_str, sizeof(hash_str), "%016" PRIx64, hash);
+        // save to cache_dir/hash_str
+        fs::path cache_file = fs::path(cache_dir) / hash_str;
+        std::ofstream ofs(cache_file, std::ios::binary);
+        ofs.write((const char *)data, size);
+        printf("[%s] saved to '%s'\n", __func__, cache_file.c_str());
+    }
     ggml_backend_tensor_set(tensor, data, offset, size);
     ggml_free(ctx);
     return true;
 }
 
+bool rpc_server::get_cached_file(uint64_t hash, std::vector<uint8_t> & data) {
+    if (!cache_dir) {
+        return false;
+    }
+    char hash_str[17];
+    snprintf(hash_str, sizeof(hash_str), "%016" PRIx64, hash);
+    fs::path cache_file = fs::path(cache_dir) / hash_str;
+    if (!fs::exists(cache_file)) {
+        return false;
+    }
+    std::ifstream ifs(cache_file, std::ios::binary);
+    ifs.seekg(0, std::ios::end);
+    size_t size = ifs.tellg();
+    ifs.seekg(0, std::ios::beg);
+    data.resize(size);
+    ifs.read((char *)data.data(), size);
+    return true;
+}
+
+bool rpc_server::set_tensor_hash(const std::vector<uint8_t> & input, rpc_msg_set_tensor_hash_rsp & response)
+{
+    // serialization format: | rpc_tensor | offset (8 bytes) | hash (8 bytes) |
+    if (input.size() != sizeof(rpc_tensor) + 16) {
+        return false;
+    }
+    const rpc_tensor * in_tensor = (const rpc_tensor *)input.data();
+    uint64_t offset;
+    memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset));
+    const uint64_t * hash = (const uint64_t *)(input.data() + sizeof(rpc_tensor) + sizeof(offset));
+    std::vector<uint8_t> cached_file;
+    if (!get_cached_file(*hash, cached_file)) {
+        response.result = 0;
+        return true;
+    }
+    size_t size = cached_file.size();
+    struct ggml_init_params params {
+        /*.mem_size   =*/ ggml_tensor_overhead(),
+        /*.mem_buffer =*/ NULL,
+        /*.no_alloc   =*/ true,
+    };
+    struct ggml_context * ctx = ggml_init(params);
+    ggml_tensor * tensor = deserialize_tensor(ctx, in_tensor);
+    if (tensor == nullptr) {
+        GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__);
+        ggml_free(ctx);
+        return false;
+    }
+    GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n", __func__, (void*)tensor->buffer, tensor->data, offset, size, *hash);
+
+    // sanitize tensor->data
+    {
+        const size_t p0 = (size_t) ggml_backend_buffer_get_base(tensor->buffer);
+        const size_t p1 = p0 + ggml_backend_buffer_get_size(tensor->buffer);
+
+        if (in_tensor->data + offset < p0 || in_tensor->data + offset >= p1 || size > (p1 - in_tensor->data - offset)) {
+            GGML_ABORT("[%s] tensor->data out of bounds\n", __func__);
+        }
+    }
+    ggml_backend_tensor_set(tensor, cached_file.data(), offset, size);
+    response.result = 1;
+    ggml_free(ctx);
+    return true;
+}
+
 bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) {
     struct ggml_init_params params {
         /*.mem_size   =*/ ggml_tensor_overhead(),
@@ -1148,8 +1267,9 @@ rpc_server::~rpc_server() {
     }
 }
 
-static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t free_mem, size_t total_mem) {
-    rpc_server server(backend);
+static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir,
+                             sockfd_t sockfd, size_t free_mem, size_t total_mem) {
+    rpc_server server(backend, cache_dir);
     while (true) {
         uint8_t cmd;
         if (!recv_data(sockfd, &cmd, 1)) {
@@ -1260,6 +1380,20 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre
                 }
                 break;
             }
+            case RPC_CMD_SET_TENSOR_HASH: {
+                std::vector<uint8_t> input;
+                if (!recv_msg(sockfd, input)) {
+                    return;
+                }
+                rpc_msg_set_tensor_hash_rsp response;
+                if (!server.set_tensor_hash(input, response)) {
+                    return;
+                }
+                if (!send_msg(sockfd, &response, sizeof(response))) {
+                    return;
+                }
+                break;
+            }
             case RPC_CMD_INIT_TENSOR: {
                 rpc_msg_init_tensor_req request;
                 if (!recv_msg(sockfd, &request,sizeof(request))) {
@@ -1335,7 +1469,9 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre
     }
 }
 
-void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, size_t free_mem, size_t total_mem) {
+void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint,
+                                   const char * cache_dir,
+                                   size_t free_mem, size_t total_mem) {
     std::string host;
     int port;
     if (!parse_endpoint(endpoint, host, port)) {
@@ -1364,7 +1500,7 @@ void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint
         }
         printf("Accepted client connection, free_mem=%zu, total_mem=%zu\n", free_mem, total_mem);
         fflush(stdout);
-        rpc_serve_client(backend, client_socket->fd, free_mem, total_mem);
+        rpc_serve_client(backend, cache_dir, client_socket->fd, free_mem, total_mem);
         printf("Client connection closed\n");
         fflush(stdout);
     }