typedef pthread_t ggml_thread_t;
+#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
+#define GGML_THREADPOOL_N_THREADS_BITS (16)
+
#if defined(__APPLE__)
#include <unistd.h>
#include <mach/mach.h>
struct ggml_cplan * cplan;
// synchronization primitives
- atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
+ atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
atomic_int GGML_CACHE_ALIGN n_barrier;
atomic_int GGML_CACHE_ALIGN n_barrier_passed;
atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
// these are atomic as an annotation for thread-sanitizer
atomic_bool stop; // Used for stopping the threadpool altogether
atomic_bool pause; // Used for pausing the threadpool or individual threads
- atomic_int abort; // Used for aborting processing of a graph
+ atomic_int abort; // Used for aborting processing of a graph
struct ggml_compute_state * workers; // per thread state
- int n_threads_max; // number of threads in the pool
- atomic_int n_threads_cur; // number of threads used in the current graph
-
+ int n_threads; // Number of threads in the pool
int32_t prio; // Scheduling priority
uint32_t poll; // Polling level (0 - no polling)
static struct ggml_state g_state = {0};
void ggml_barrier(struct ggml_threadpool * tp) {
- int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
+ int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
if (n_threads == 1) {
return;
}
// last thread
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
- // exit barrier (fill seq-cst fence)
+ // exit barrier (full seq-cst fence)
atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
return;
}
void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
if (!threadpool) return;
- const int n_threads = threadpool->n_threads_max;
+ const int n_threads = threadpool->n_threads;
#ifndef GGML_USE_OPENMP
struct ggml_compute_state* workers = threadpool->workers;
//GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
}
if (n_threads <= 0) {
- n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
+ n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
}
#if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__)
struct ggml_compute_params params = {
/*.ith =*/ state->ith,
- /*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
+ /*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
/*.threadpool=*/ tp,
};
+ GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
+
for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
struct ggml_tensor * node = cgraph->nodes[node_n];
}
}
+ GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
+
ggml_barrier(state->threadpool);
return 0;
#ifndef GGML_USE_OPENMP
-// check if thread is active
-static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
- struct ggml_threadpool * threadpool = state->threadpool;
- int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
- return (state->ith < n_threads);
-}
-
// check if thread is ready to proceed (exit from polling or sleeping)
+// returns true if loops should exit, sets state->pending to indicate new work
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
// check for new graph/work
- int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
- if (new_graph != state->last_graph) {
- state->pending = ggml_graph_compute_thread_active(state);
- state->last_graph = new_graph;
+ int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
+ int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
+ if (n_graph != state->last_graph) {
+ state->pending = (state->ith < n_threads);
+ state->last_graph = n_graph;
+ return true;
}
- return state->pending;
+ return false;
}
// sync thread state after polling
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
struct ggml_threadpool * threadpool = state->threadpool;
- // Skip polling for unused threads
- if (!ggml_graph_compute_thread_active(state)) {
- return state->pending;
- }
-
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
// Perhaps, we can adjust it dynamically based on load and things.
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
ggml_graph_compute_check_for_work(state);
if (state->pending) {
state->pending = false;
-
ggml_graph_compute_thread(state);
}
}
ggml_mutex_lock(&threadpool->mutex);
- GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
+ // Update the number of active threads and the graph count
+ int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
+ n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);
- // Update the number of active threads
- atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
+ GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);
// Indicate the graph is ready to be processed
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
+ atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);
if (threadpool->pause) {
// Update main thread prio and affinity to match the threadpool settings
threadpool->pause = tpp->paused;
threadpool->abort = -1;
threadpool->workers = NULL;
- threadpool->n_threads_max = tpp->n_threads;
- threadpool->n_threads_cur = tpp->n_threads;
+ threadpool->n_threads = tpp->n_threads;
threadpool->poll = tpp->poll;
threadpool->prio = tpp->prio;
threadpool->ec = GGML_STATUS_SUCCESS;
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
- atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
+ atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
}
// Apply thread CPU mask and priority
ggml_graph_compute_thread(&threadpool->workers[ith]);
}
} else {
- atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
+ atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
ggml_graph_compute_thread(&threadpool->workers[0]);
}
#else
- if (n_threads > threadpool->n_threads_max) {
- GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
- n_threads = threadpool->n_threads_max;
+ if (n_threads > threadpool->n_threads) {
+ GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
+ n_threads = threadpool->n_threads;
}
// Kick all threads to start the new graph
#define MAX_NARGS 2
-int main(int argc, char *argv[]) {
-
- int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
- int n_rounds = 100;
-
- if (argc > 1) {
- n_threads = std::atoi(argv[1]);
- }
-
- if (argc > 2) {
- n_rounds = std::atoi(argv[2]);
- }
-
+static void test_barrier(int n_threads, int n_rounds) {
struct ggml_init_params params = {
/* .mem_size = */ 1024*1024*1024,
/* .mem_buffer = */ NULL,
exit(1);
}
- // Create compute plan
+ // The test runs with constant number of threads
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);
std::vector<uint8_t> work_data(cplan.work_size);
ggml_threadpool_free(threadpool);
ggml_free(ctx);
+}
+
+static void test_active(int n_threads, int n_rounds) {
+ struct ggml_init_params params = {
+ /* .mem_size = */ 1024*1024*1024,
+ /* .mem_buffer = */ NULL,
+ /* .no_alloc = */ false,
+ };
+
+ struct ggml_context * ctx = ggml_init(params);
+
+ // Create graph
+ struct ggml_cgraph * gf = ggml_new_graph(ctx);
+
+ // Small graph with, parallel ops with barriers
+ struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
+ for (int i = 0; i < 2; i++) {
+ struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
+ out = ggml_mul_mat(ctx, a, out);
+
+ struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
+ out = ggml_mul_mat(ctx, d, out);
+ }
+
+ ggml_build_forward_expand(gf, out);
+ int n_nodes = ggml_graph_n_nodes(gf);
+
+ // Create threadpool
+ struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
+ struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
+ if (!threadpool) {
+ fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
+ exit(1);
+ }
+
+ std::cerr << "graph-compute with"
+ << "\n n_threads: " << n_threads
+ << "\n n_nodes: " << n_nodes
+ << "\n n_rounds: " << n_rounds
+ << "\n";
+ // ggml_graph_print(gf);
+
+ // In this test we keep changing the number of threads every 4th iteration
+ // to test for race conditions in that path
+
+ for (int i=0; i < n_rounds; i++) {
+ struct ggml_cplan cplan = ggml_graph_plan(gf, (i % 4) == 0 ? 1 : n_threads, threadpool);
+
+ std::vector<uint8_t> work_data(cplan.work_size);
+ cplan.work_data = work_data.data();
+
+ ggml_graph_compute(gf, &cplan);
+ }
+
+ ggml_threadpool_free(threadpool);
+ ggml_free(ctx);
+}
+
+static void test_multi_graph(int n_threads, int n_rounds) {
+ struct ggml_init_params params = {
+ /* .mem_size = */ 1024*1024*1024,
+ /* .mem_buffer = */ NULL,
+ /* .no_alloc = */ false,
+ };
+
+ struct ggml_context * ctx = ggml_init(params);
+
+ // Create graphs
+ struct ggml_cgraph * gf0 = ggml_new_graph(ctx);
+ {
+ // Small graph with parallel ops with barriers
+ struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
+ for (int i = 0; i < 2; i++) {
+ struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
+ out = ggml_mul_mat(ctx, a, out);
+
+ struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
+ out = ggml_mul_mat(ctx, d, out);
+ }
+
+ ggml_build_forward_expand(gf0, out);
+ }
+
+ struct ggml_cgraph * gf1 = ggml_new_graph(ctx);
+ {
+ // Small graph with parallel ops with barriers
+ // Use larger tensors to make sure work_data size is larger than gf0
+ struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 256);
+ for (int i = 0; i < 4; i++) {
+ struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 256, 128);
+ out = ggml_mul_mat(ctx, a, out);
+
+ struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 256);
+ out = ggml_mul_mat(ctx, d, out);
+ }
+
+ ggml_build_forward_expand(gf1, out);
+ }
+
+
+ // Create threadpool
+ struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
+ struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
+ if (!threadpool) {
+ fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
+ exit(1);
+ }
+
+ std::cerr << "graph-compute with"
+ << "\n gf0 n_nodes: " << ggml_graph_n_nodes(gf0)
+ << "\n gf1 n_nodes: " << ggml_graph_n_nodes(gf1)
+ << "\n n_threads: " << n_threads
+ << "\n n_rounds: " << n_rounds
+ << "\n";
+
+ // In this test we keep changing the number of threads every 4th iteration
+ // and we compute two graphs back to back to test graph frequent graph switching
+
+ for (int i=0; i < n_rounds; i++) {
+ struct ggml_cplan cplan0 = ggml_graph_plan(gf0, (i % 4) == 0 ? 1 : n_threads, threadpool);
+ std::vector<uint8_t> work_data0(cplan0.work_size);
+ cplan0.work_data = work_data0.data();
+
+ struct ggml_cplan cplan1 = ggml_graph_plan(gf1, (i % 4) == 0 ? 1 : n_threads, threadpool);
+ std::vector<uint8_t> work_data1(cplan1.work_size);
+ cplan1.work_data = work_data1.data();
+
+ ggml_graph_compute(gf0, &cplan0);
+ ggml_graph_compute(gf1, &cplan1);
+ }
+
+ ggml_threadpool_free(threadpool);
+ ggml_free(ctx);
+}
+
+
+int main(int argc, char *argv[]) {
+
+ int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
+ int n_rounds = 100;
+
+ if (argc > 1) {
+ n_threads = std::atoi(argv[1]);
+ }
+
+ if (argc > 2) {
+ n_rounds = std::atoi(argv[2]);
+ }
+
+ test_barrier(n_threads, n_rounds);
+
+ test_active(n_threads, n_rounds * 100);
+
+ test_multi_graph(n_threads, n_rounds * 10);
return 0;
}