]> git.djapps.eu Git - pkg/ggml/sources/llama.cpp/commitdiff
ggml : synchronize threads using barriers (#7993)
authorslaren <redacted>
Wed, 19 Jun 2024 13:04:15 +0000 (15:04 +0200)
committerGitHub <redacted>
Wed, 19 Jun 2024 13:04:15 +0000 (15:04 +0200)
.github/workflows/server.yml
ggml.c

index 1fee9ac28194383aa61241f50c41d1e7055b60ed..6155e94156e4271665812c0767019ff1d95c6b84 100644 (file)
@@ -87,8 +87,22 @@ jobs:
             exit 1
           fi
 
+      - name: Build (no OpenMP)
+        id: cmake_build_no_openmp
+        if: ${{ matrix.sanitizer == 'THREAD' }}
+        run: |
+          cmake -B build \
+              -DLLAMA_NATIVE=OFF \
+              -DLLAMA_BUILD_SERVER=ON \
+              -DLLAMA_CURL=ON \
+              -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \
+              -DLLAMA_SANITIZE_${{ matrix.sanitizer }}=ON \
+              -DLLAMA_OPENMP=OFF ;
+          cmake --build build --config ${{ matrix.build_type }} -j $(nproc) --target llama-server
+
       - name: Build
         id: cmake_build
+        if: ${{ matrix.sanitizer != 'THREAD' }}
         run: |
           cmake -B build \
               -DLLAMA_NATIVE=OFF \
diff --git a/ggml.c b/ggml.c
index d5d33c2ba10299253ec9b731a24ccb0d32952f32..778ca3fdf1f8f8b864208640b0dfd146f20c3953 100644 (file)
--- a/ggml.c
+++ b/ggml.c
@@ -1753,9 +1753,8 @@ struct ggml_compute_state_shared {
     int n_threads;
 
     // synchronization primitives
-    atomic_int n_active;  // num active threads
-    atomic_int node_n;    // active graph node
-    atomic_int node_task; // active graph node task phase
+    atomic_int n_barrier;
+    atomic_int n_barrier_passed;
 
     ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
     void* abort_callback_data;
@@ -18972,47 +18971,49 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
     return n_tasks;
 }
 
-static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) {
-    // wait for other threads to finish
-    const int last_node_n = * node_n;
-
-    while (true) {
-        if (do_yield) {
-            sched_yield();
-        }
-
-        *node_n = atomic_load(&state->shared->node_n);
-        if (*node_n != last_node_n) {
-            break;
-        }
-
-#if defined(__SSE3__)
-        // Tell the processor we're spinning.  It's a processor hint for spinlocks.
-        _mm_pause();
-#endif
+#ifdef GGML_USE_OPENMP
+static void ggml_barrier(struct ggml_compute_state * state) {
+    if (state->shared->n_threads == 1) {
+        return;
     }
+
+    #pragma omp barrier
 }
+#else
+static void ggml_barrier(struct ggml_compute_state * state) {
+    if (state->shared->n_threads == 1) {
+        return;
+    }
 
-static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) {
-    // wait for other threads to finish
-    const int last_task_phase = *task_phase;
+    atomic_int * n_barrier = &state->shared->n_barrier;
+    atomic_int * n_barrier_passed = &state->shared->n_barrier_passed;
 
-    while (true) {
-        if (do_yield) {
-            sched_yield();
-        }
+    int n_threads = state->shared->n_threads;
+    int passed_old = atomic_load(n_barrier_passed);
 
-        *task_phase = atomic_load(&state->shared->node_task);
-        if (*task_phase != last_task_phase) {
-            break;
+    if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
+        // last thread
+        atomic_store(n_barrier, 0);
+        atomic_fetch_add(n_barrier_passed, 1);
+    } else {
+        // wait for other threads
+        //while (atomic_load(n_barrier_passed) == passed_old) {
+        //}
+        const int n_spin_before_sleep = 100000;
+        while (true) {
+            for (int i = 0; i < n_spin_before_sleep; i++) {
+                if (atomic_load(n_barrier_passed) != passed_old) {
+                    return;
+                }
+            #if defined(__SSE3__)
+                _mm_pause();
+            #endif
+            }
+            sched_yield();
         }
-
-#if defined(__SSE3__)
-        // Tell the processor we're spinning.  It's a processor hint for spinlocks.
-        _mm_pause();
-#endif
     }
 }
+#endif
 
 static thread_ret_t ggml_graph_compute_thread(void * data) {
     struct ggml_compute_state * state = (struct ggml_compute_state *) data;
@@ -19020,136 +19021,54 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
     const struct ggml_cgraph * cgraph = state->shared->cgraph;
     const struct ggml_cplan  * cplan  = state->shared->cplan;
 
-    const int   n_threads   = state->shared->n_threads;
+    const int ith       = state->ith;
+    const int n_threads = state->shared->n_threads;
 
-    set_numa_thread_affinity(state->ith);
+    set_numa_thread_affinity(ith);
 
-    int node_n     = -1;
-    int task_phase = GGML_TASK_TYPE_FINALIZE;
+    struct ggml_compute_params params = {
+        /*.type  =*/ GGML_TASK_TYPE_INIT,
+        /*.ith   =*/ ith,
+        /*.nth   =*/ state->shared->n_threads,
+        /*.wsize =*/ cplan->work_size,
+        /*.wdata =*/ cplan->work_data,
+    };
 
-    while (true) {
+    for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
         if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
-            state->shared->node_n += 1;
             state->ec = GGML_STATUS_ABORTED;
             return 0;
         }
 
-        if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
-            // all other threads are finished and spinning
-            // do finalize and init here so we don't have synchronize again
-            struct ggml_compute_params params = {
-                /*.type  =*/ GGML_TASK_TYPE_FINALIZE,
-                /*.ith   =*/ 0,
-                /*.nth   =*/ 0,
-                /*.wsize =*/ cplan->work_size,
-                /*.wdata =*/ cplan->work_data,
-            };
-
-            if (node_n != -1) {
-                /* FINALIZE */
-                struct ggml_tensor * node = cgraph->nodes[node_n];
-                if (GGML_OP_HAS_FINALIZE[node->op]) {
-                    params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
-                    ggml_compute_forward(&params, node, state);
-                }
-                ggml_graph_compute_perf_stats_node(node, state->shared);
-            }
-
-            // distribute new work or execute it direct if 1T
-            while (++node_n < cgraph->n_nodes) {
-                GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
-                struct ggml_tensor * node = cgraph->nodes[node_n];
-                const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
-
-                state->shared->perf_node_start_cycles  = ggml_perf_cycles();
-                state->shared->perf_node_start_time_us = ggml_perf_time_us();
-
-                params.nth = n_tasks;
-
-                if (n_tasks == 1) {
-                    /* INIT */
-                    if (GGML_OP_HAS_INIT[node->op]) {
-                        params.type = GGML_TASK_TYPE_INIT;
-                        ggml_compute_forward(&params, node, state);
-                    }
-
-                    // TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1,
-                    // they do something more efficient than spinning (?)
-                    params.type = GGML_TASK_TYPE_COMPUTE;
-                    ggml_compute_forward(&params, node, state);
-
-                    if (GGML_OP_HAS_FINALIZE[node->op]) {
-                        params.type = GGML_TASK_TYPE_FINALIZE;
-                        ggml_compute_forward(&params, node, state);
-                    }
-
-                    ggml_graph_compute_perf_stats_node(node, state->shared);
-                } else {
-                    break;
-                }
-
-                if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
-                    break;
-                }
-            }
-
-            task_phase = GGML_TASK_TYPE_INIT;
-            atomic_store(&state->shared->n_active,  n_threads);
-            atomic_store(&state->shared->node_n,    node_n);
-            atomic_store(&state->shared->node_task, task_phase);
-        } else {
-            ggml_graph_compute_thread_sync_node(&node_n,     state, false);
-            ggml_graph_compute_thread_sync_task(&task_phase, state, false);
-        }
-
-        // check if we should stop
-        if (node_n >= cgraph->n_nodes) break;
-
-        /* INIT & COMPUTE */
         struct ggml_tensor * node = cgraph->nodes[node_n];
         const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
 
-        struct ggml_compute_params params = {
-            /*.type  =*/ GGML_TASK_TYPE_INIT,
-            /*.ith   =*/ state->ith,
-            /*.nth   =*/ n_tasks,
-            /*.wsize =*/ cplan->work_size,
-            /*.wdata =*/ cplan->work_data,
-        };
+        params.nth = n_tasks;
 
-        if (state->ith < n_tasks) {
-            if (GGML_OP_HAS_INIT[node->op]) {
+        /* INIT */
+        if (GGML_OP_HAS_INIT[node->op]) {
+            if (ith < n_tasks) {
+                params.type = GGML_TASK_TYPE_INIT;
                 ggml_compute_forward(&params, node, state);
             }
+            ggml_barrier(state);
         }
 
-        if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
-            task_phase = GGML_TASK_TYPE_COMPUTE;
-            atomic_store(&state->shared->n_active,  n_threads);
-            atomic_store(&state->shared->node_task, task_phase);
-        }
-        else {
-            // TODO: this sched_yield can have significant impact on the performance - either positive or negative
-            //       depending on the workload and the operating system.
-            //       since it is not clear what is the best approach, it should potentially become user-configurable
-            //       ref: https://github.com/ggerganov/ggml/issues/291
-            // UPD:  adding the do_yield flag seems to resolve the issue universally
-            const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT;
-            ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield);
-        }
-
-        if (state->ith < n_tasks) {
+        /* COMPUTE */
+        if (ith < n_tasks) {
             params.type = GGML_TASK_TYPE_COMPUTE;
             ggml_compute_forward(&params, node, state);
         }
 
-        if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
-            task_phase = GGML_TASK_TYPE_FINALIZE;
-            atomic_store(&state->shared->n_active,  n_threads);
-            atomic_store(&state->shared->node_task, task_phase);
-        }
-        else {
-            ggml_graph_compute_thread_sync_task(&task_phase, state, false);
+        ggml_barrier(state);
+
+        /* FINALIZE */
+        if (GGML_OP_HAS_FINALIZE[node->op]) {
+            if (params.ith == 0) {
+                params.type = GGML_TASK_TYPE_FINALIZE;
+                ggml_compute_forward(&params, node, state);
+            }
+            ggml_barrier(state);
         }
     }
 
@@ -19336,7 +19255,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state *
                 // update the number of threads from the actual number of threads that we got from OpenMP
                 n_threads = omp_get_num_threads();
                 workers[0].shared->n_threads = n_threads;
-                workers[0].shared->n_active  = n_threads;
             }
             ggml_graph_compute_thread(&workers[omp_get_thread_num()]);
         }
@@ -19399,9 +19317,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
         /*.perf_node_start_cycles  =*/ 0,
         /*.perf_node_start_time_us =*/ 0,
         /*.n_threads               =*/ n_threads,
-        /*.n_active                =*/ n_threads,
-        /*.node_n                  =*/ -1,
-        /*.node_task               =*/ GGML_TASK_TYPE_FINALIZE,
+        /*.n_barrier               =*/ 0,
+        /*.n_barrier_passed        =*/ 0,
         /*.abort_callback          =*/ NULL,
         /*.abort_callback_data     =*/ NULL,
         /*.current_chunk;          =*/ 0,