]> git.djapps.eu Git - pkg/ggml/sources/llama.cpp/commitdiff
threadpool : skip polling for unused threads (#9461)
authorMax Krasnyansky <redacted>
Tue, 17 Sep 2024 08:19:46 +0000 (01:19 -0700)
committerGitHub <redacted>
Tue, 17 Sep 2024 08:19:46 +0000 (11:19 +0300)
* threadpool: skip polling for unused threads

Currently all threads do N polling rounds even if only 1 thread is active (n_threads_cur == 1).
This commit adds a check to skip the polling for unused threads (ith >= n_threads_cur).

n_threads_cur is now an atomic_int to explicitly tell thread sanitizer that it is written
from one thread and read from other threads (not a race conditions).

* threadpool: further simplify and improve ggml_barrier

Avoid using strict memory order while polling, yet make sure that all threads go through
full memory barrier (memory fence) on ggml_barrier entrace and exit.

* threads: add simple barrier test

This test does lots of small, parallel matmul ops where the barriers in between dominate the overhead.

* threadpool: improve thread sync for new-graphs

Using the same tricks as ggml_barrier. All the polling is done with relaxed memory order
to keep it efficient, once the new graph is detected we do full fence using read-modify-write
with strict memory order.

* threadpool: improve abort handling

Do not use threadpool->ec (exit code) to decide whether to exit the compute loop.
threadpool->ec is not atomic which makes thread-sanitizer rightfully unhappy about it.

Instead introduce atomic threadpool->abort flag used for this. This is consistent with
how we handle threadpool->stop or pause.

While at it add an explicit atomic_load for n_threads_cur for consistency.

* test-barrier: release threadpool before releasing the context

fixes use-after-free detected by gcc thread-sanitizer on x86-64
for some reason llvm sanitizer is not detecting this issue.

ggml/src/ggml.c
tests/CMakeLists.txt
tests/test-barrier.cpp [new file with mode: 0644]

index 3a8aadae85a78473f6d75962d85993bbe24eb1d6..bccb62377d7394c1a2fc813627019de3812f023f 100644 (file)
@@ -2013,10 +2013,11 @@ struct ggml_threadpool {
     // these are atomic as an annotation for thread-sanitizer
     atomic_bool stop;         // Used for stopping the threadpool altogether
     atomic_bool pause;        // Used for pausing the threadpool or individual threads
+    atomic_bool abort;        // Used for aborting processing of a graph
 
     struct ggml_compute_state * workers;   // per thread state
     int          n_threads_max; // number of threads in the pool
-    int          n_threads_cur; // number of threads used in the current graph
+    atomic_int   n_threads_cur; // number of threads used in the current graph
 
     int32_t      prio;        // Scheduling priority
     uint32_t     poll;        // Polling level (0 - no polling)
@@ -3178,41 +3179,36 @@ inline static void ggml_critical_section_start(void) {
     }
 }
 
-#ifdef GGML_USE_OPENMP
-static void ggml_barrier(struct ggml_threadpool * threadpool) {
-    if (threadpool->n_threads_cur == 1) {
+static void ggml_barrier(struct ggml_threadpool * tp) {
+    int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
+    if (n_threads == 1) {
         return;
     }
 
+#ifdef GGML_USE_OPENMP
     #pragma omp barrier
-}
 #else
-static void ggml_barrier(struct ggml_threadpool * threadpool) {
-    if (threadpool->n_threads_cur == 1) {
-        return;
-    }
-
-    atomic_int * n_barrier = &threadpool->n_barrier;
-    atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
+    int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
 
-    int n_threads = threadpool->n_threads_cur;
-    int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
+    // enter barrier (full seq-cst fence)
+    int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst);
 
-    if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
+    int last = 0;
+    if (n_barrier == (n_threads - 1)) {
         // last thread
-        atomic_store(n_barrier, 0);
-        atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed);
+        atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
+        last = 1;
     } else {
         // wait for other threads
-        while (true) {
-            if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
-                return;
-            }
+        while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
             ggml_thread_cpu_relax();
         }
     }
-}
+
+    // exit barrier (full seq-cst fence)
+    atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
 #endif
+}
 
 // TODO: make this somehow automatically executed
 //       some sort of "sentry" mechanism
@@ -19933,34 +19929,33 @@ struct ggml_cplan ggml_graph_plan(
 
 static thread_ret_t ggml_graph_compute_thread(void * data) {
     struct ggml_compute_state * state = (struct ggml_compute_state *) data;
+    struct ggml_threadpool    * tp    = state->threadpool;
 
-    const struct ggml_cgraph * cgraph = state->threadpool->cgraph;
-    const struct ggml_cplan  * cplan  = state->threadpool->cplan;
+    const struct ggml_cgraph * cgraph = tp->cgraph;
+    const struct ggml_cplan  * cplan  = tp->cplan;
 
     set_numa_thread_affinity(state->ith);
 
     struct ggml_compute_params params = {
         /*.ith       =*/ state->ith,
-        /*.nth       =*/ state->threadpool->n_threads_cur,
+        /*.nth       =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
         /*.wsize     =*/ cplan->work_size,
         /*.wdata     =*/ cplan->work_data,
-        /*.threadpool=*/ state->threadpool,
+        /*.threadpool=*/ tp,
     };
 
-    for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
+    for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort; node_n++) {
         struct ggml_tensor * node = cgraph->nodes[node_n];
 
         ggml_compute_forward(&params, node);
 
-        if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
-            state->threadpool->ec = GGML_STATUS_ABORTED;
+        if (state->ith == 0 && cplan->abort_callback &&
+                cplan->abort_callback(cplan->abort_callback_data)) {
+            tp->abort = true;
+            tp->ec    = GGML_STATUS_ABORTED;
         }
 
         ggml_barrier(state->threadpool);
-
-        if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
-            break;
-        }
     }
 
     return 0;
@@ -19968,7 +19963,15 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
 
 #ifndef GGML_USE_OPENMP
 
-static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
+// check if thread is active
+static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
+    struct ggml_threadpool * threadpool = state->threadpool;
+    int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
+    return (state->ith < n_threads);
+}
+
+// check if thread is ready to proceed (exit from polling or sleeping)
+static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
     struct ggml_threadpool * threadpool = state->threadpool;
 
     if (state->pending || threadpool->stop || threadpool->pause) { return true; }
@@ -19976,21 +19979,34 @@ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
     // check for new graph/work
     int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
     if (new_graph != state->last_graph) {
-        state->pending    = (state->ith < threadpool->n_threads_cur);
+        state->pending    = ggml_graph_compute_thread_active(state);
         state->last_graph = new_graph;
     }
 
     return state->pending;
 }
 
+// sync thread state after polling
+static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
+    struct ggml_threadpool * threadpool = state->threadpool;
+    // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
+    // so instead we just use a dummy read-modify-write
+    atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
+}
+
 static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
     struct ggml_threadpool * threadpool = state->threadpool;
 
+    // Skip polling for unused threads
+    if (!ggml_graph_compute_thread_active(state)) {
+        return state->pending;
+    }
+
     // This seems to make 0 ... 100 a decent range for polling level across modern processors.
     // Perhaps, we can adjust it dynamically based on load and things.
     const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
 
-    for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) {
+    for (uint64_t i=0; !ggml_graph_compute_thread_ready(state) && i < n_rounds; i++) {
         // No new work. Keep polling.
         ggml_thread_cpu_relax();
     }
@@ -20002,13 +20018,14 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
     struct ggml_threadpool * threadpool = state->threadpool;
 
     if (ggml_graph_compute_poll_for_work(state)) {
+        ggml_graph_compute_thread_sync(state);
         return state->pending;
     }
 
     ggml_mutex_lock_shared(&threadpool->mutex);
-    while (!ggml_graph_compute_ready(state)) {
+    while (!ggml_graph_compute_thread_ready(state)) {
         // No new work. Wait for the signal.
-        GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
+        GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping)\n", state->ith);
         ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
     }
     ggml_mutex_unlock_shared(&threadpool->mutex);
@@ -20055,13 +20072,20 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
 }
 
 // Start processing new graph
-static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
+static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads)
 {
-    // always take the mutex here because the worker threads are doing hybrid poll/wait
+    // Always take the mutex here because the worker threads are doing hybrid poll/wait
 
     ggml_mutex_lock(&threadpool->mutex);
 
-    atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
+    GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
+
+    // Update the number of active threads
+    atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
+
+    // Indicate the graph is ready to be processed
+    // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
+    atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
 
     if (threadpool->pause) {
        // Update main thread prio and affinity to match the threadpool settings
@@ -20120,6 +20144,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
         threadpool->current_chunk    = 0;
         threadpool->stop             = false;
         threadpool->pause            = tpp->paused;
+        threadpool->abort            = false;
         threadpool->workers          = NULL;
         threadpool->n_threads_max    = tpp->n_threads;
         threadpool->n_threads_cur    = tpp->n_threads;
@@ -20195,15 +20220,11 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
         // No worker threads should be accessing the parameters below at this stage
         threadpool->cgraph           = cgraph;
         threadpool->cplan            = cplan;
-        threadpool->n_threads_cur    = n_threads;
         threadpool->current_chunk    = 0;
+        threadpool->abort            = false;
         threadpool->ec               = GGML_STATUS_SUCCESS;
     }
 
-    if (n_threads > threadpool->n_threads_max) {
-        GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
-    }
-
 #ifdef GGML_USE_OPENMP
     if (n_threads > 1) {
         #pragma omp parallel num_threads(n_threads)
@@ -20212,7 +20233,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
             {
                 // update the number of threads from the actual number of threads that we got from OpenMP
                 n_threads = omp_get_num_threads();
-                threadpool->n_threads_cur = n_threads;
+                atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
             }
 
             ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@@ -20221,8 +20242,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
         ggml_graph_compute_thread(&threadpool->workers[0]);
     }
 #else
+    if (n_threads > threadpool->n_threads_max) {
+        GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
+        n_threads = threadpool->n_threads_max;
+    }
+
     // Kick all threads to start the new graph
-    ggml_graph_compute_kickoff(threadpool);
+    ggml_graph_compute_kickoff(threadpool, n_threads);
 
     // This is a work thread too
     ggml_graph_compute_thread(&threadpool->workers[0]);
index 7dcd3fce827bd6f542a627b1fbbfa690190d045e..08ad66b49fdd411b14f533816352db5b47b44c52 100644 (file)
@@ -119,6 +119,7 @@ llama_target_and_test(test-grammar-parser.cpp)
 llama_target_and_test(test-llama-grammar.cpp)
 llama_target_and_test(test-grammar-integration.cpp)
 llama_target_and_test(test-grad0.cpp)
+llama_target_and_test(test-barrier.cpp)
 # llama_target_and_test(test-opt.cpp) # SLOW
 llama_target_and_test(test-backend-ops.cpp)
 
diff --git a/tests/test-barrier.cpp b/tests/test-barrier.cpp
new file mode 100644 (file)
index 0000000..cf54237
--- /dev/null
@@ -0,0 +1,93 @@
+#include "ggml.h"
+#include "ggml-backend.h"
+
+#include <chrono>
+#include <iostream>
+#include <cstdio>
+#include <cstdlib>
+#include <cassert>
+#include <vector>
+
+#define MAX_NARGS 2
+
+int main(int argc, char *argv[]) {
+
+    int n_threads = 4;
+    int n_rounds  = 100;
+
+    if (argc > 1) {
+        n_threads = std::atoi(argv[1]);
+    }
+
+    if (argc > 2) {
+        n_rounds  = std::atoi(argv[2]);
+    }
+
+    struct ggml_init_params params = {
+        /* .mem_size   = */ 1024*1024*1024,
+        /* .mem_buffer = */ NULL,
+        /* .no_alloc   = */ false,
+    };
+
+    struct ggml_context * ctx = ggml_init(params);
+
+    // Create graph
+    struct ggml_cgraph * gf = ggml_new_graph(ctx);
+
+    // Lots of small, parallel ops where barriers in between will dominate
+    struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32,  64);
+    for (int i = 0; i < 1000; i++) {
+        struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
+        out = ggml_mul_mat(ctx, a, out);
+
+        struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
+        out = ggml_mul_mat(ctx, d, out);
+    }
+
+    ggml_build_forward_expand(gf, out);
+    int n_nodes = ggml_graph_n_nodes(gf);
+
+    // Create threadpool
+    struct ggml_threadpool_params tpp  = ggml_threadpool_params_default(n_threads);
+    struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
+    if (!threadpool) {
+        fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
+        exit(1);
+    }
+
+    // Create compute plan
+    struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);
+
+    std::vector<uint8_t> work_data(cplan.work_size);
+    cplan.work_data = work_data.data();
+
+    std::cerr << "graph-compute with"
+              << "\n n_threads: " << n_threads
+              << "\n   n_nodes: " << n_nodes
+              << "\n  n_rounds: " << n_rounds
+              << "\n";
+    // ggml_graph_print(gf);
+
+    // Warmup
+    ggml_graph_compute(gf, &cplan);
+
+    auto t0 = std::chrono::high_resolution_clock::now();
+
+    for (int i=0; i < n_rounds; i++) {
+        ggml_graph_compute(gf, &cplan);
+    }
+
+    auto t1 = std::chrono::high_resolution_clock::now();
+
+    auto usec = std::chrono::duration_cast<std::chrono::microseconds>(t1-t0).count();
+    auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(t1-t0).count();
+    std::cerr << "graph-compute took " << usec << " usec "
+              << "\n " << (float) usec / n_rounds << " usec per-iter"
+              << "\n " << (float) nsec / (n_rounds * n_nodes) << " nsec per-node"
+              << "\n";
+
+    ggml_threadpool_free(threadpool);
+    ggml_free(ctx);
+
+    return 0;
+}