// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads
+ atomic_bool abort; // Used for aborting processing of a graph
struct ggml_compute_state * workers; // per thread state
int n_threads_max; // number of threads in the pool
- int n_threads_cur; // number of threads used in the current graph
+ atomic_int n_threads_cur; // number of threads used in the current graph
int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling)
}
}
-#ifdef GGML_USE_OPENMP
-static void ggml_barrier(struct ggml_threadpool * threadpool) {
- if (threadpool->n_threads_cur == 1) {
+static void ggml_barrier(struct ggml_threadpool * tp) {
+ int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
+ if (n_threads == 1) {
return;
}
+#ifdef GGML_USE_OPENMP
#pragma omp barrier
-}
#else
-static void ggml_barrier(struct ggml_threadpool * threadpool) {
- if (threadpool->n_threads_cur == 1) {
- return;
- }
-
- atomic_int * n_barrier = &threadpool->n_barrier;
- atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
+ int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
- int n_threads = threadpool->n_threads_cur;
- int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
+ // enter barrier (full seq-cst fence)
+ int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst);
- if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
+ int last = 0;
+ if (n_barrier == (n_threads - 1)) {
// last thread
- atomic_store(n_barrier, 0);
- atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed);
+ atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
+ last = 1;
} else {
// wait for other threads
- while (true) {
- if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
- return;
- }
+ while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
ggml_thread_cpu_relax();
}
}
-}
+
+ // exit barrier (full seq-cst fence)
+ atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
#endif
+}
// TODO: make this somehow automatically executed
// some sort of "sentry" mechanism
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
+ struct ggml_threadpool * tp = state->threadpool;
- const struct ggml_cgraph * cgraph = state->threadpool->cgraph;
- const struct ggml_cplan * cplan = state->threadpool->cplan;
+ const struct ggml_cgraph * cgraph = tp->cgraph;
+ const struct ggml_cplan * cplan = tp->cplan;
set_numa_thread_affinity(state->ith);
struct ggml_compute_params params = {
/*.ith =*/ state->ith,
- /*.nth =*/ state->threadpool->n_threads_cur,
+ /*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
- /*.threadpool=*/ state->threadpool,
+ /*.threadpool=*/ tp,
};
- for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
+ for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n];
ggml_compute_forward(¶ms, node);
- if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
- state->threadpool->ec = GGML_STATUS_ABORTED;
+ if (state->ith == 0 && cplan->abort_callback &&
+ cplan->abort_callback(cplan->abort_callback_data)) {
+ tp->abort = true;
+ tp->ec = GGML_STATUS_ABORTED;
}
ggml_barrier(state->threadpool);
-
- if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
- break;
- }
}
return 0;
#ifndef GGML_USE_OPENMP
-static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
+// check if thread is active
+static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
+ struct ggml_threadpool * threadpool = state->threadpool;
+ int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
+ return (state->ith < n_threads);
+}
+
+// check if thread is ready to proceed (exit from polling or sleeping)
+static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
// check for new graph/work
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
if (new_graph != state->last_graph) {
- state->pending = (state->ith < threadpool->n_threads_cur);
+ state->pending = ggml_graph_compute_thread_active(state);
state->last_graph = new_graph;
}
return state->pending;
}
+// sync thread state after polling
+static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
+ struct ggml_threadpool * threadpool = state->threadpool;
+ // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
+ // so instead we just use a dummy read-modify-write
+ atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
+}
+
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
+ // Skip polling for unused threads
+ if (!ggml_graph_compute_thread_active(state)) {
+ return state->pending;
+ }
+
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
// Perhaps, we can adjust it dynamically based on load and things.
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
- for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) {
+ for (uint64_t i=0; !ggml_graph_compute_thread_ready(state) && i < n_rounds; i++) {
// No new work. Keep polling.
ggml_thread_cpu_relax();
}
struct ggml_threadpool * threadpool = state->threadpool;
if (ggml_graph_compute_poll_for_work(state)) {
+ ggml_graph_compute_thread_sync(state);
return state->pending;
}
ggml_mutex_lock_shared(&threadpool->mutex);
- while (!ggml_graph_compute_ready(state)) {
+ while (!ggml_graph_compute_thread_ready(state)) {
// No new work. Wait for the signal.
- GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
+ GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping)\n", state->ith);
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
}
ggml_mutex_unlock_shared(&threadpool->mutex);
}
// Start processing new graph
-static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
+static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads)
{
- // always take the mutex here because the worker threads are doing hybrid poll/wait
+ // Always take the mutex here because the worker threads are doing hybrid poll/wait
ggml_mutex_lock(&threadpool->mutex);
- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
+ GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
+
+ // Update the number of active threads
+ atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
+
+ // Indicate the graph is ready to be processed
+ // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
if (threadpool->pause) {
// Update main thread prio and affinity to match the threadpool settings
threadpool->current_chunk = 0;
threadpool->stop = false;
threadpool->pause = tpp->paused;
+ threadpool->abort = false;
threadpool->workers = NULL;
threadpool->n_threads_max = tpp->n_threads;
threadpool->n_threads_cur = tpp->n_threads;
// No worker threads should be accessing the parameters below at this stage
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
- threadpool->n_threads_cur = n_threads;
threadpool->current_chunk = 0;
+ threadpool->abort = false;
threadpool->ec = GGML_STATUS_SUCCESS;
}
- if (n_threads > threadpool->n_threads_max) {
- GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
- }
-
#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
- threadpool->n_threads_cur = n_threads;
+ atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
}
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
ggml_graph_compute_thread(&threadpool->workers[0]);
}
#else
+ if (n_threads > threadpool->n_threads_max) {
+ GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
+ n_threads = threadpool->n_threads_max;
+ }
+
// Kick all threads to start the new graph
- ggml_graph_compute_kickoff(threadpool);
+ ggml_graph_compute_kickoff(threadpool, n_threads);
// This is a work thread too
ggml_graph_compute_thread(&threadpool->workers[0]);
--- /dev/null
+#include "ggml.h"
+#include "ggml-backend.h"
+
+#include <chrono>
+#include <iostream>
+#include <cstdio>
+#include <cstdlib>
+#include <cassert>
+#include <vector>
+
+#define MAX_NARGS 2
+
+int main(int argc, char *argv[]) {
+
+ int n_threads = 4;
+ int n_rounds = 100;
+
+ if (argc > 1) {
+ n_threads = std::atoi(argv[1]);
+ }
+
+ if (argc > 2) {
+ n_rounds = std::atoi(argv[2]);
+ }
+
+ struct ggml_init_params params = {
+ /* .mem_size = */ 1024*1024*1024,
+ /* .mem_buffer = */ NULL,
+ /* .no_alloc = */ false,
+ };
+
+ struct ggml_context * ctx = ggml_init(params);
+
+ // Create graph
+ struct ggml_cgraph * gf = ggml_new_graph(ctx);
+
+ // Lots of small, parallel ops where barriers in between will dominate
+ struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
+ for (int i = 0; i < 1000; i++) {
+ struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
+ out = ggml_mul_mat(ctx, a, out);
+
+ struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
+ out = ggml_mul_mat(ctx, d, out);
+ }
+
+ ggml_build_forward_expand(gf, out);
+ int n_nodes = ggml_graph_n_nodes(gf);
+
+ // Create threadpool
+ struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
+ struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
+ if (!threadpool) {
+ fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
+ exit(1);
+ }
+
+ // Create compute plan
+ struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);
+
+ std::vector<uint8_t> work_data(cplan.work_size);
+ cplan.work_data = work_data.data();
+
+ std::cerr << "graph-compute with"
+ << "\n n_threads: " << n_threads
+ << "\n n_nodes: " << n_nodes
+ << "\n n_rounds: " << n_rounds
+ << "\n";
+ // ggml_graph_print(gf);
+
+ // Warmup
+ ggml_graph_compute(gf, &cplan);
+
+ auto t0 = std::chrono::high_resolution_clock::now();
+
+ for (int i=0; i < n_rounds; i++) {
+ ggml_graph_compute(gf, &cplan);
+ }
+
+ auto t1 = std::chrono::high_resolution_clock::now();
+
+ auto usec = std::chrono::duration_cast<std::chrono::microseconds>(t1-t0).count();
+ auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(t1-t0).count();
+ std::cerr << "graph-compute took " << usec << " usec "
+ << "\n " << (float) usec / n_rounds << " usec per-iter"
+ << "\n " << (float) nsec / (n_rounds * n_nodes) << " nsec per-node"
+ << "\n";
+
+ ggml_threadpool_free(threadpool);
+ ggml_free(ctx);
+
+ return 0;
+}