int n_threads;
// synchronization primitives
- atomic_int n_active; // num active threads
- atomic_int node_n; // active graph node
- atomic_int node_task; // active graph node task phase
+ atomic_int n_barrier;
+ atomic_int n_barrier_passed;
ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
void* abort_callback_data;
return n_tasks;
}
-static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) {
- // wait for other threads to finish
- const int last_node_n = * node_n;
-
- while (true) {
- if (do_yield) {
- sched_yield();
- }
-
- *node_n = atomic_load(&state->shared->node_n);
- if (*node_n != last_node_n) {
- break;
- }
-
-#if defined(__SSE3__)
- // Tell the processor we're spinning. It's a processor hint for spinlocks.
- _mm_pause();
-#endif
+#ifdef GGML_USE_OPENMP
+static void ggml_barrier(struct ggml_compute_state * state) {
+ if (state->shared->n_threads == 1) {
+ return;
}
+
+ #pragma omp barrier
}
+#else
+static void ggml_barrier(struct ggml_compute_state * state) {
+ if (state->shared->n_threads == 1) {
+ return;
+ }
-static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) {
- // wait for other threads to finish
- const int last_task_phase = *task_phase;
+ atomic_int * n_barrier = &state->shared->n_barrier;
+ atomic_int * n_barrier_passed = &state->shared->n_barrier_passed;
- while (true) {
- if (do_yield) {
- sched_yield();
- }
+ int n_threads = state->shared->n_threads;
+ int passed_old = atomic_load(n_barrier_passed);
- *task_phase = atomic_load(&state->shared->node_task);
- if (*task_phase != last_task_phase) {
- break;
+ if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
+ // last thread
+ atomic_store(n_barrier, 0);
+ atomic_fetch_add(n_barrier_passed, 1);
+ } else {
+ // wait for other threads
+ //while (atomic_load(n_barrier_passed) == passed_old) {
+ //}
+ const int n_spin_before_sleep = 100000;
+ while (true) {
+ for (int i = 0; i < n_spin_before_sleep; i++) {
+ if (atomic_load(n_barrier_passed) != passed_old) {
+ return;
+ }
+ #if defined(__SSE3__)
+ _mm_pause();
+ #endif
+ }
+ sched_yield();
}
-
-#if defined(__SSE3__)
- // Tell the processor we're spinning. It's a processor hint for spinlocks.
- _mm_pause();
-#endif
}
}
+#endif
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
const struct ggml_cgraph * cgraph = state->shared->cgraph;
const struct ggml_cplan * cplan = state->shared->cplan;
- const int n_threads = state->shared->n_threads;
+ const int ith = state->ith;
+ const int n_threads = state->shared->n_threads;
- set_numa_thread_affinity(state->ith);
+ set_numa_thread_affinity(ith);
- int node_n = -1;
- int task_phase = GGML_TASK_TYPE_FINALIZE;
+ struct ggml_compute_params params = {
+ /*.type =*/ GGML_TASK_TYPE_INIT,
+ /*.ith =*/ ith,
+ /*.nth =*/ state->shared->n_threads,
+ /*.wsize =*/ cplan->work_size,
+ /*.wdata =*/ cplan->work_data,
+ };
- while (true) {
+ for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
- state->shared->node_n += 1;
state->ec = GGML_STATUS_ABORTED;
return 0;
}
- if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
- // all other threads are finished and spinning
- // do finalize and init here so we don't have synchronize again
- struct ggml_compute_params params = {
- /*.type =*/ GGML_TASK_TYPE_FINALIZE,
- /*.ith =*/ 0,
- /*.nth =*/ 0,
- /*.wsize =*/ cplan->work_size,
- /*.wdata =*/ cplan->work_data,
- };
-
- if (node_n != -1) {
- /* FINALIZE */
- struct ggml_tensor * node = cgraph->nodes[node_n];
- if (GGML_OP_HAS_FINALIZE[node->op]) {
- params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
- ggml_compute_forward(¶ms, node, state);
- }
- ggml_graph_compute_perf_stats_node(node, state->shared);
- }
-
- // distribute new work or execute it direct if 1T
- while (++node_n < cgraph->n_nodes) {
- GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
- struct ggml_tensor * node = cgraph->nodes[node_n];
- const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
-
- state->shared->perf_node_start_cycles = ggml_perf_cycles();
- state->shared->perf_node_start_time_us = ggml_perf_time_us();
-
- params.nth = n_tasks;
-
- if (n_tasks == 1) {
- /* INIT */
- if (GGML_OP_HAS_INIT[node->op]) {
- params.type = GGML_TASK_TYPE_INIT;
- ggml_compute_forward(¶ms, node, state);
- }
-
- // TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1,
- // they do something more efficient than spinning (?)
- params.type = GGML_TASK_TYPE_COMPUTE;
- ggml_compute_forward(¶ms, node, state);
-
- if (GGML_OP_HAS_FINALIZE[node->op]) {
- params.type = GGML_TASK_TYPE_FINALIZE;
- ggml_compute_forward(¶ms, node, state);
- }
-
- ggml_graph_compute_perf_stats_node(node, state->shared);
- } else {
- break;
- }
-
- if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
- break;
- }
- }
-
- task_phase = GGML_TASK_TYPE_INIT;
- atomic_store(&state->shared->n_active, n_threads);
- atomic_store(&state->shared->node_n, node_n);
- atomic_store(&state->shared->node_task, task_phase);
- } else {
- ggml_graph_compute_thread_sync_node(&node_n, state, false);
- ggml_graph_compute_thread_sync_task(&task_phase, state, false);
- }
-
- // check if we should stop
- if (node_n >= cgraph->n_nodes) break;
-
- /* INIT & COMPUTE */
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
- struct ggml_compute_params params = {
- /*.type =*/ GGML_TASK_TYPE_INIT,
- /*.ith =*/ state->ith,
- /*.nth =*/ n_tasks,
- /*.wsize =*/ cplan->work_size,
- /*.wdata =*/ cplan->work_data,
- };
+ params.nth = n_tasks;
- if (state->ith < n_tasks) {
- if (GGML_OP_HAS_INIT[node->op]) {
+ /* INIT */
+ if (GGML_OP_HAS_INIT[node->op]) {
+ if (ith < n_tasks) {
+ params.type = GGML_TASK_TYPE_INIT;
ggml_compute_forward(¶ms, node, state);
}
+ ggml_barrier(state);
}
- if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
- task_phase = GGML_TASK_TYPE_COMPUTE;
- atomic_store(&state->shared->n_active, n_threads);
- atomic_store(&state->shared->node_task, task_phase);
- }
- else {
- // TODO: this sched_yield can have significant impact on the performance - either positive or negative
- // depending on the workload and the operating system.
- // since it is not clear what is the best approach, it should potentially become user-configurable
- // ref: https://github.com/ggerganov/ggml/issues/291
- // UPD: adding the do_yield flag seems to resolve the issue universally
- const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT;
- ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield);
- }
-
- if (state->ith < n_tasks) {
+ /* COMPUTE */
+ if (ith < n_tasks) {
params.type = GGML_TASK_TYPE_COMPUTE;
ggml_compute_forward(¶ms, node, state);
}
- if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
- task_phase = GGML_TASK_TYPE_FINALIZE;
- atomic_store(&state->shared->n_active, n_threads);
- atomic_store(&state->shared->node_task, task_phase);
- }
- else {
- ggml_graph_compute_thread_sync_task(&task_phase, state, false);
+ ggml_barrier(state);
+
+ /* FINALIZE */
+ if (GGML_OP_HAS_FINALIZE[node->op]) {
+ if (params.ith == 0) {
+ params.type = GGML_TASK_TYPE_FINALIZE;
+ ggml_compute_forward(¶ms, node, state);
+ }
+ ggml_barrier(state);
}
}
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
workers[0].shared->n_threads = n_threads;
- workers[0].shared->n_active = n_threads;
}
ggml_graph_compute_thread(&workers[omp_get_thread_num()]);
}
/*.perf_node_start_cycles =*/ 0,
/*.perf_node_start_time_us =*/ 0,
/*.n_threads =*/ n_threads,
- /*.n_active =*/ n_threads,
- /*.node_n =*/ -1,
- /*.node_task =*/ GGML_TASK_TYPE_FINALIZE,
+ /*.n_barrier =*/ 0,
+ /*.n_barrier_passed =*/ 0,
/*.abort_callback =*/ NULL,
/*.abort_callback_data =*/ NULL,
/*.current_chunk; =*/ 0,