#endif
#include <windows.h>
+#if !defined(__clang__)
typedef volatile LONG atomic_int;
typedef atomic_int atomic_bool;
typedef atomic_int atomic_flag;
#define ATOMIC_FLAG_INIT 0
+typedef enum {
+ memory_order_relaxed,
+ memory_order_consume,
+ memory_order_acquire,
+ memory_order_release,
+ memory_order_acq_rel,
+ memory_order_seq_cst
+} memory_order;
+
static void atomic_store(atomic_int * ptr, LONG val) {
InterlockedExchange(ptr, val);
}
+static void atomic_store_explicit(atomic_int * ptr, LONG val, memory_order mo) {
+ // TODO: add support for explicit memory order
+ InterlockedExchange(ptr, val);
+}
static LONG atomic_load(atomic_int * ptr) {
return InterlockedCompareExchange(ptr, 0, 0);
}
+static LONG atomic_load_explicit(atomic_int * ptr, memory_order mo) {
+ // TODO: add support for explicit memory order
+ return InterlockedCompareExchange(ptr, 0, 0);
+}
static LONG atomic_fetch_add(atomic_int * ptr, LONG inc) {
return InterlockedExchangeAdd(ptr, inc);
}
-static LONG atomic_fetch_sub(atomic_int * ptr, LONG dec) {
- return atomic_fetch_add(ptr, -(dec));
+static LONG atomic_fetch_add_explicit(atomic_int * ptr, LONG inc, memory_order mo) {
+ // TODO: add support for explicit memory order
+ return InterlockedExchangeAdd(ptr, inc);
}
static atomic_bool atomic_flag_test_and_set(atomic_flag * ptr) {
return InterlockedExchange(ptr, 1);
static void atomic_flag_clear(atomic_flag * ptr) {
InterlockedExchange(ptr, 0);
}
+#else // clang
+#include <stdatomic.h>
+#endif
typedef HANDLE pthread_t;
return 0;
}
#else
+
#include <pthread.h>
#include <stdatomic.h>
+#include <sched.h>
typedef void * thread_ret_t;
struct ggml_context context;
};
-struct ggml_compute_state_shared {
- const struct ggml_cgraph * cgraph;
- const struct ggml_cplan * cplan;
+//
+// Threading defs
+//
+
+typedef pthread_t ggml_thread_t;
+
+#if defined(_WIN32)
+
+typedef CONDITION_VARIABLE ggml_cond_t;
+typedef SRWLOCK ggml_mutex_t;
+
+#define ggml_mutex_init(m) InitializeSRWLock(m)
+#define ggml_mutex_destroy(m)
+#define ggml_mutex_lock(m) AcquireSRWLockExclusive(m)
+#define ggml_mutex_unlock(m) ReleaseSRWLockExclusive(m)
+#define ggml_mutex_lock_shared(m) AcquireSRWLockShared(m)
+#define ggml_mutex_unlock_shared(m) ReleaseSRWLockShared(m)
+
+#define ggml_cond_init(c) InitializeConditionVariable(c)
+#define ggml_cond_destroy(c)
+#define ggml_cond_wait(c, m) SleepConditionVariableSRW(c, m, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED)
+#define ggml_cond_broadcast(c) WakeAllConditionVariable(c)
+
+#define ggml_thread_create pthread_create
+#define ggml_thread_join pthread_join
+
+#else
- int n_threads;
+typedef pthread_cond_t ggml_cond_t;
+typedef pthread_mutex_t ggml_mutex_t;
+
+#define ggml_mutex_init(m) pthread_mutex_init(m, NULL)
+#define ggml_mutex_destroy(m) pthread_mutex_destroy(m)
+#define ggml_mutex_lock(m) pthread_mutex_lock(m)
+#define ggml_mutex_unlock(m) pthread_mutex_unlock(m)
+#define ggml_mutex_lock_shared(m) pthread_mutex_lock(m)
+#define ggml_mutex_unlock_shared(m) pthread_mutex_unlock(m)
+
+#define ggml_lock_init(x) UNUSED(x)
+#define ggml_lock_destroy(x) UNUSED(x)
+#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64))
+#define ggml_lock_lock(x) _mm_pause()
+#else
+#define ggml_lock_lock(x) UNUSED(x)
+#endif
+#define ggml_lock_unlock(x) UNUSED(x)
+
+#define GGML_LOCK_INITIALIZER 0
+#define ggml_cond_init(c) pthread_cond_init(c, NULL)
+#define ggml_cond_destroy(c) pthread_cond_destroy(c)
+#define ggml_cond_wait(c, m) pthread_cond_wait(c, m)
+#define ggml_cond_broadcast(c) pthread_cond_broadcast(c)
+
+#define ggml_thread_create pthread_create
+#define ggml_thread_join pthread_join
+
+#endif
+
+// Threadpool def
+struct ggml_threadpool {
+ ggml_mutex_t mutex; // mutex for cond.var
+ ggml_cond_t cond; // cond.var for waiting for new work
+
+ struct ggml_cgraph * cgraph;
+ struct ggml_cplan * cplan;
// synchronization primitives
+ atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
atomic_int n_barrier;
atomic_int n_barrier_passed;
+ atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
- ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
- void * abort_callback_data;
+ // these are atomic as an annotation for thread-sanitizer
+ atomic_bool stop; // Used for stopping the threadpool altogether
+ atomic_bool pause; // Used for pausing the threadpool or individual threads
- atomic_int current_chunk; // currently processing chunk during mul_mat, shared between all the threads
+ struct ggml_compute_state * workers; // per thread state
+ int n_threads_max; // number of threads in the pool
+ int n_threads_cur; // number of threads used in the current graph
+
+ int32_t prio; // Scheduling priority
+ uint32_t poll; // Polling level (0 - no polling)
enum ggml_status ec;
};
+// Per-thread state
struct ggml_compute_state {
+#ifndef GGML_USE_OPENMP
ggml_thread_t thrd;
+ bool cpumask[GGML_MAX_N_THREADS];
+ int last_graph;
+ bool pending;
+#endif
+ struct ggml_threadpool * threadpool;
int ith;
- struct ggml_compute_state_shared * shared;
};
struct ggml_compute_params {
size_t wsize;
void * wdata;
- struct ggml_compute_state_shared * shared;
+ struct ggml_threadpool * threadpool;
};
//
static_assert(sizeof(struct ggml_object)%GGML_MEM_ALIGN == 0, "ggml_object size must be a multiple of GGML_MEM_ALIGN");
static_assert(sizeof(struct ggml_tensor)%GGML_MEM_ALIGN == 0, "ggml_tensor size must be a multiple of GGML_MEM_ALIGN");
+// Helpers for polling loops
+#if defined(__aarch64__) && ( defined(__clang__) || defined(__GNUC__) )
+static inline void ggml_thread_cpu_relax(void) {
+ __asm__ volatile("yield" ::: "memory");
+}
+#elif defined(__x86_64__)
+static inline void ggml_thread_cpu_relax(void) {
+ _mm_pause();
+}
+#else
+static inline void ggml_thread_cpu_relax(void) {;}
+#endif
+
//
// NUMA support
//
}
#ifdef GGML_USE_OPENMP
-static void ggml_barrier(struct ggml_compute_state_shared * shared) {
- if (shared->n_threads == 1) {
+static void ggml_barrier(struct ggml_threadpool * threadpool) {
+ if (threadpool->n_threads_cur == 1) {
return;
}
#pragma omp barrier
}
#else
-static void ggml_barrier(struct ggml_compute_state_shared * shared) {
- if (shared->n_threads == 1) {
+static void ggml_barrier(struct ggml_threadpool * threadpool) {
+ if (threadpool->n_threads_cur == 1) {
return;
}
- atomic_int * n_barrier = &shared->n_barrier;
- atomic_int * n_barrier_passed = &shared->n_barrier_passed;
+ atomic_int * n_barrier = &threadpool->n_barrier;
+ atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
- int n_threads = shared->n_threads;
- int passed_old = atomic_load(n_barrier_passed);
+ int n_threads = threadpool->n_threads_cur;
+ int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
// last thread
atomic_store(n_barrier, 0);
- atomic_fetch_add(n_barrier_passed, 1);
+ atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed);
} else {
// wait for other threads
- const int n_spin_before_sleep = 100000;
while (true) {
- for (int i = 0; i < n_spin_before_sleep; i++) {
- if (atomic_load(n_barrier_passed) != passed_old) {
- return;
- }
- #if defined(__SSE3__)
- _mm_pause();
- #endif
+ if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
+ return;
}
- sched_yield();
+ ggml_thread_cpu_relax();
}
}
}
((char *) src0->data),
ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
}
const int ith = params->ith;
if (ith == 0) {
// Every thread starts at ith, so the first unprocessed chunk is nth. This save a bit of coordination right at the start.
- atomic_store(¶ms->shared->current_chunk, nth);
+ atomic_store_explicit(¶ms->threadpool->current_chunk, nth, memory_order_relaxed);
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
#if GGML_USE_LLAMAFILE
if (src1->type != vec_dot_type) {
break;
}
- current_chunk = atomic_fetch_add(¶ms->shared->current_chunk, 1);
+ current_chunk = atomic_fetch_add_explicit(¶ms->threadpool->current_chunk, 1, memory_order_relaxed);
}
}
}
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
// compute each matrix multiplication in sequence
for (int cur_a = 0; cur_a < n_as; ++cur_a) {
if (ith == 0) {
ggml_vec_set_f32(ne0*ne1*ne2*ne3, dst->data, 0);
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
// dst[:,:,:,:] = 0
// for i2,i3:
if (ith == 0) {
ggml_vec_set_f32(ne0*ne1*ne2*ne3, dst->data, 0);
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
// parallelize by last three dimensions
((char *) src0->data),
ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
}
const int ith = params->ith;
((char *) src0->data),
ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
}
// TODO: handle transposed/permuted matrices
// need to zero dst since we are accumulating into it
memset(dst->data, 0, ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
const int32_t s0 = ((const int32_t*)(dst->op_params))[0];
// need to zero dst since we are accumulating into it
memset(dst->data, 0, ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
const int32_t s0 = ((const int32_t*)(dst->op_params))[0];
memset(dst->data, 0, ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
const int32_t stride = ggml_get_op_params_i32(dst, 0);
if (ith == 0) {
memset(dst->data, 0, nb0*ne0*ne1*ne2*ne3);
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
const int64_t elem_q = ggml_nelements(q);
const int64_t elem_k = ggml_nelements(k);
if (params->ith == 0) {
memcpy((char *) dst->data, (char *) src0->data, ggml_nbytes(dst));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
}
// ref: https://github.com/facebookresearch/segment-anything/blob/main/segment_anything/modeling/image_encoder.py#L357-L359
if (ith == 0) {
memset(sums, 0, sizeof(float) * (nth + nth * nc));
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
// rows per thread
const int dr = (nr + nth - 1)/nth;
}
#endif
}
- ggml_barrier(params->shared);
+ ggml_barrier(params->threadpool);
if (ith == 0) {
float * dp = (float *) dst->data;
ggml_hash_set_reset(&cgraph->visited_hash_set);
}
-//
-// thread data
-//
-// synchronization is done via busy loops
-// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops
-//
-
-#ifdef __APPLE__
-
-//#include <os/lock.h>
-//
-//typedef os_unfair_lock ggml_lock_t;
-//
-//#define ggml_lock_init(x) UNUSED(x)
-//#define ggml_lock_destroy(x) UNUSED(x)
-//#define ggml_lock_lock os_unfair_lock_lock
-//#define ggml_lock_unlock os_unfair_lock_unlock
-//
-//#define GGML_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT
-
-typedef int ggml_lock_t;
-
-#define ggml_lock_init(x) UNUSED(x)
-#define ggml_lock_destroy(x) UNUSED(x)
-#define ggml_lock_lock(x) UNUSED(x)
-#define ggml_lock_unlock(x) UNUSED(x)
-
-#define GGML_LOCK_INITIALIZER 0
-
-#define ggml_thread_create pthread_create
-#define ggml_thread_join pthread_join
-
-#else
-
-//typedef pthread_spinlock_t ggml_lock_t;
-
-//#define ggml_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE)
-//#define ggml_lock_destroy pthread_spin_destroy
-//#define ggml_lock_lock pthread_spin_lock
-//#define ggml_lock_unlock pthread_spin_unlock
-
-typedef int ggml_lock_t;
-
-#define ggml_lock_init(x) UNUSED(x)
-#define ggml_lock_destroy(x) UNUSED(x)
-#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64))
-#define ggml_lock_lock(x) _mm_pause()
-#else
-#define ggml_lock_lock(x) UNUSED(x)
-#endif
-#define ggml_lock_unlock(x) UNUSED(x)
-
-#define GGML_LOCK_INITIALIZER 0
-
-#define ggml_thread_create pthread_create
-#define ggml_thread_join pthread_join
-
-#endif
-
// Android's libc implementation "bionic" does not support setting affinity
#if defined(__gnu_linux__)
static void set_numa_thread_affinity(int thread_n) {
return n_tasks;
}
-struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) {
+static thread_ret_t ggml_graph_compute_secondary_thread(void* data);
+
+#if defined(_WIN32)
+#include "windows.h"
+
+// TODO: support > 64 CPUs
+bool ggml_thread_apply_affinity(bool * mask) {
+ HANDLE h = GetCurrentThread();
+ uint64_t bitmask = 0ULL;
+
+ assert(GGML_MAX_N_THREADS >= 64);
+
+ for (int32_t i = 0; i < 8; i++) {
+ int32_t idx = i * 8;
+ uint8_t val = 0;
+ val |= mask[idx + 0] << 0;
+ val |= mask[idx + 1] << 1;
+ val |= mask[idx + 2] << 2;
+ val |= mask[idx + 3] << 3;
+ val |= mask[idx + 4] << 4;
+ val |= mask[idx + 5] << 5;
+ val |= mask[idx + 6] << 6;
+ val |= mask[idx + 7] << 7;
+ bitmask |= (uint64_t)val << idx;
+ }
+
+ for (int32_t i = 64; i < GGML_MAX_N_THREADS; i++) {
+ if (mask[i]) {
+ fprintf(stderr, "warn: setting thread-affinity for > 64 CPUs isn't supported on windows!\n");
+ break;
+ }
+ }
+
+ DWORD_PTR m = (DWORD_PTR)bitmask;
+
+ m = SetThreadAffinityMask(h, m);
+
+ return m != 0;
+}
+
+static bool ggml_thread_apply_priority(int32_t prio) {
+ // Note that on Windows the Process Priority Class must be updated in order to set Thread priority.
+ // This is up to the applications.
+ DWORD p = THREAD_PRIORITY_NORMAL;
+ switch (prio) {
+ case GGML_SCHED_PRIO_NORMAL: p = THREAD_PRIORITY_NORMAL; break;
+ case GGML_SCHED_PRIO_MEDIUM: p = THREAD_PRIORITY_ABOVE_NORMAL; break;
+ case GGML_SCHED_PRIO_HIGH: p = THREAD_PRIORITY_HIGHEST; break;
+ case GGML_SCHED_PRIO_REALTIME: p = THREAD_PRIORITY_TIME_CRITICAL; break;
+ }
+
+ if (prio == GGML_SCHED_PRIO_NORMAL) {
+ // Keep inherited policy/priority
+ return true;
+ }
+
+ if (!SetThreadPriority(GetCurrentThread(), p)) {
+ fprintf(stderr, "warn: failed to set thread priority %d : (%d)\n", prio, (int) GetLastError());
+ return false;
+ }
+
+ return true;
+}
+
+#elif defined(__APPLE__)
+#include <sys/types.h>
+#include <sys/resource.h>
+
+static bool ggml_thread_apply_affinity(const bool * mask) {
+ // Not supported on Apple platforms
+ UNUSED(mask);
+ return true;
+}
+
+static bool ggml_thread_apply_priority(int32_t prio) {
+ struct sched_param p;
+ int32_t policy = SCHED_OTHER;
+ switch (prio) {
+ case GGML_SCHED_PRIO_NORMAL: policy = SCHED_OTHER; p.sched_priority = 0; break;
+ case GGML_SCHED_PRIO_MEDIUM: policy = SCHED_FIFO; p.sched_priority = 40; break;
+ case GGML_SCHED_PRIO_HIGH: policy = SCHED_FIFO; p.sched_priority = 80; break;
+ case GGML_SCHED_PRIO_REALTIME: policy = SCHED_FIFO; p.sched_priority = 90; break;
+ }
+
+ if (prio == GGML_SCHED_PRIO_NORMAL) {
+ // Keep inherited policy/priority
+ return true;
+ }
+
+ int32_t err = pthread_setschedparam(pthread_self(), policy, &p);
+ if (err != 0) {
+ fprintf(stderr, "warn: failed to set thread priority %d : %s (%d)\n", prio, strerror(err), err);
+ return false;
+ }
+
+ return true;
+}
+
+#else // posix?
+
+static bool ggml_thread_apply_affinity(const bool * mask) {
+ cpu_set_t cpuset;
+ int err;
+
+ CPU_ZERO(&cpuset);
+
+ for (uint32_t i = 0; i < GGML_MAX_N_THREADS; i++) {
+ if (mask[i]) {
+ GGML_PRINT_DEBUG("Thread %lx: adding %d to cpuset\n", pthread_self(), i);
+ CPU_SET(i, &cpuset);
+ }
+ }
+
+#ifdef __ANDROID__
+ err = sched_setaffinity(0, sizeof(cpuset), &cpuset);
+ if (err < 0) {
+ err = errno;
+ }
+#else
+ err = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
+#endif
+ if (err != 0) {
+ fprintf(stderr, "warn: failed to set affinity mask 0x%llx : %s (%d)\n", (unsigned long long)mask, strerror(err), err);
+ return false;
+ }
+
+ return true;
+}
+
+static bool ggml_thread_apply_priority(int32_t prio) {
+ struct sched_param p;
+ int32_t policy = SCHED_OTHER;
+ switch (prio) {
+ case GGML_SCHED_PRIO_NORMAL: policy = SCHED_OTHER; p.sched_priority = 0; break;
+ case GGML_SCHED_PRIO_MEDIUM: policy = SCHED_FIFO; p.sched_priority = 40; break;
+ case GGML_SCHED_PRIO_HIGH: policy = SCHED_FIFO; p.sched_priority = 80; break;
+ case GGML_SCHED_PRIO_REALTIME: policy = SCHED_FIFO; p.sched_priority = 90; break;
+ }
+
+ if (prio == GGML_SCHED_PRIO_NORMAL) {
+ // Keep inherited policy/priority
+ return true;
+ }
+
+ int32_t err = pthread_setschedparam(pthread_self(), policy, &p);
+ if (err != 0) {
+ fprintf(stderr, "warn: failed to set thread priority %d : %s (%d)\n", prio, strerror(err), err);
+ return false;
+ }
+
+ return true;
+}
+
+#endif
+
+static bool ggml_thread_cpumask_is_valid(const bool * mask) {
+ for (int i = 0; i < GGML_MAX_N_THREADS; i++) {
+ if (mask[i]) { return true; }
+ }
+ return false;
+}
+
+static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask, bool strict, int32_t* iter) {
+ if (!strict) {
+ memcpy(local_mask, global_mask, GGML_MAX_N_THREADS);
+ return;
+ } else {
+ memset(local_mask, 0, GGML_MAX_N_THREADS);
+ int32_t base_idx = *iter;
+ for (int32_t i = 0; i < GGML_MAX_N_THREADS; i++) {
+ int32_t idx = base_idx + i;
+ if (idx >= GGML_MAX_N_THREADS) {
+ // Just a cheaper modulo
+ idx -= GGML_MAX_N_THREADS;
+ }
+ if (global_mask[idx]) {
+ local_mask[idx] = 1;
+ *iter = idx + 1;
+ return;
+ }
+ }
+ }
+}
+
+void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
+ if (!threadpool) return;
+
+#ifndef GGML_USE_OPENMP
+ struct ggml_compute_state* workers = threadpool->workers;
+ const int n_threads = threadpool->n_threads_max;
+
+ ggml_mutex_lock(&threadpool->mutex);
+
+ threadpool->stop = true;
+ threadpool->pause = false;
+
+ ggml_cond_broadcast(&threadpool->cond);
+ ggml_mutex_unlock(&threadpool->mutex);
+
+ for (int j = 1; j < n_threads; j++) {
+ int32_t rc = ggml_thread_join(workers[j].thrd, NULL);
+ GGML_ASSERT(rc == GGML_EXIT_SUCCESS || rc == GGML_EXIT_ABORTED);
+ UNUSED(rc);
+ }
+
+ ggml_mutex_destroy(&threadpool->mutex);
+ ggml_cond_destroy(&threadpool->cond);
+#endif // GGML_USE_OPENMP
+
+ GGML_ALIGNED_FREE(threadpool->workers);
+ GGML_ALIGNED_FREE(threadpool);
+}
+
+#ifndef GGML_USE_OPENMP
+// pause/resume must be called under mutex
+static void ggml_threadpool_pause_locked(struct ggml_threadpool * threadpool) {
+ GGML_PRINT_DEBUG("Pausing threadpool\n");
+ threadpool->pause = true;
+ ggml_cond_broadcast(&threadpool->cond);
+}
+
+static void ggml_threadpool_resume_locked(struct ggml_threadpool * threadpool) {
+ GGML_PRINT_DEBUG("Resuming threadpool\n");
+ threadpool->pause = false;
+ ggml_cond_broadcast(&threadpool->cond);
+}
+#endif
+
+void ggml_threadpool_pause(struct ggml_threadpool * threadpool) {
+#ifndef GGML_USE_OPENMP
+ ggml_mutex_lock(&threadpool->mutex);
+ if (!threadpool->pause) {
+ ggml_threadpool_pause_locked(threadpool);
+ }
+ ggml_mutex_unlock(&threadpool->mutex);
+#else
+ UNUSED(threadpool);
+#endif
+}
+
+void ggml_threadpool_resume(struct ggml_threadpool * threadpool) {
+#ifndef GGML_USE_OPENMP
+ ggml_mutex_lock(&threadpool->mutex);
+ if (threadpool->pause) {
+ ggml_threadpool_resume_locked(threadpool);
+ }
+ ggml_mutex_unlock(&threadpool->mutex);
+#else
+ UNUSED(threadpool);
+#endif
+}
+
+struct ggml_cplan ggml_graph_plan(
+ const struct ggml_cgraph * cgraph,
+ int n_threads,
+ struct ggml_threadpool * threadpool) {
+
+ if (threadpool == NULL) {
+ GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
+ }
if (n_threads <= 0) {
- n_threads = GGML_DEFAULT_N_THREADS;
+ n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
}
size_t work_size = 0;
}
if (work_size > 0) {
- work_size += CACHE_LINE_SIZE*(n_threads - 1);
+ work_size += CACHE_LINE_SIZE*(n_threads);
}
- cplan.n_threads = MIN(max_tasks, n_threads);
- cplan.work_size = work_size;
- cplan.work_data = NULL;
+ cplan.threadpool = threadpool;
+ cplan.n_threads = MIN(max_tasks, n_threads);
+ cplan.work_size = work_size;
+ cplan.work_data = NULL;
return cplan;
}
static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
- const struct ggml_cgraph * cgraph = state->shared->cgraph;
- const struct ggml_cplan * cplan = state->shared->cplan;
+ const struct ggml_cgraph * cgraph = state->threadpool->cgraph;
+ const struct ggml_cplan * cplan = state->threadpool->cplan;
set_numa_thread_affinity(state->ith);
struct ggml_compute_params params = {
- /*.ith =*/ state->ith,
- /*.nth =*/ state->shared->n_threads,
- /*.wsize =*/ cplan->work_size,
- /*.wdata =*/ cplan->work_data,
- /*.shared=*/ state->shared,
+ /*.ith =*/ state->ith,
+ /*.nth =*/ state->threadpool->n_threads_cur,
+ /*.wsize =*/ cplan->work_size,
+ /*.wdata =*/ cplan->work_data,
+ /*.threadpool=*/ state->threadpool,
};
for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
ggml_compute_forward(¶ms, node);
if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
- state->shared->ec = GGML_STATUS_ABORTED;
+ state->threadpool->ec = GGML_STATUS_ABORTED;
}
- ggml_barrier(state->shared);
+ ggml_barrier(state->threadpool);
- if (state->shared->ec != GGML_STATUS_SUCCESS) {
+ if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
break;
}
}
return 0;
}
+#ifndef GGML_USE_OPENMP
+
+static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
+ struct ggml_threadpool * threadpool = state->threadpool;
+
+ if (state->pending || threadpool->stop || threadpool->pause) { return true; }
+
+ // check for new graph/work
+ int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
+ if (new_graph != state->last_graph) {
+ state->pending = (state->ith < threadpool->n_threads_cur);
+ state->last_graph = new_graph;
+ }
+
+ return state->pending;
+}
+
+static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
+ struct ggml_threadpool * threadpool = state->threadpool;
+
+ // This seems to make 0 ... 100 a decent range for polling level across modern processors.
+ // Perhaps, we can adjust it dynamically based on load and things.
+ const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
+
+ for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) {
+ // No new work. Keep polling.
+ ggml_thread_cpu_relax();
+ }
+
+ return state->pending;
+}
+
+static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) {
+ struct ggml_threadpool * threadpool = state->threadpool;
+
+ if (ggml_graph_compute_poll_for_work(state)) {
+ return state->pending;
+ }
+
+ ggml_mutex_lock_shared(&threadpool->mutex);
+ while (!ggml_graph_compute_ready(state)) {
+ // No new work. Wait for the signal.
+ GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
+ ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
+ }
+ ggml_mutex_unlock_shared(&threadpool->mutex);
+
+ return state->pending;
+}
+
+static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
+ struct ggml_compute_state * state = (struct ggml_compute_state *) data;
+ struct ggml_threadpool * threadpool = state->threadpool;
+
+ ggml_thread_apply_priority(threadpool->prio);
+ if (ggml_thread_cpumask_is_valid(state->cpumask)) {
+ ggml_thread_apply_affinity(state->cpumask);
+ }
+
+ while (true) {
+ // Check if we need to sleep
+ while (threadpool->pause) {
+ GGML_PRINT_DEBUG("thread #%d inside pause loop\n", state->ith);
+ ggml_mutex_lock_shared(&threadpool->mutex);
+ if (threadpool->pause) {
+ ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
+ }
+ GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith);
+ ggml_mutex_unlock_shared(&threadpool->mutex);
+ }
+
+ // This needs to be checked for after the cond_wait
+ if (threadpool->stop) break;
+
+ // Check if there is new work
+ // The main thread is the only one that can dispatch new work
+
+ ggml_graph_compute_check_for_work(state);
+ if (state->pending) {
+ state->pending = false;
+
+ ggml_graph_compute_thread(state);
+ }
+ }
+
+ return (thread_ret_t) 0;
+}
+
+// Start processing new graph
+static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
+{
+ // always take the mutex here because the worker threads are doing hybrid poll/wait
+
+ ggml_mutex_lock(&threadpool->mutex);
+
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
+
+ if (threadpool->pause) {
+ // Update main thread prio and affinity to match the threadpool settings
+ ggml_thread_apply_priority(threadpool->prio);
+ if (ggml_thread_cpumask_is_valid(threadpool->workers[0].cpumask)) {
+ ggml_thread_apply_affinity(threadpool->workers[0].cpumask);
+ }
+
+ // resume does cond broadcast
+ ggml_threadpool_resume_locked(threadpool);
+ } else {
+ ggml_cond_broadcast(&threadpool->cond);
+ }
+
+ ggml_mutex_unlock(&threadpool->mutex);
+}
+
+#endif // GGML_USE_OPENMP
+
+void ggml_threadpool_params_init(struct ggml_threadpool_params * p, int n_threads) {
+ p->n_threads = n_threads;
+ p->prio = 0; // default priority (usually means normal or inherited)
+ p->poll = 50; // hybrid-polling enabled
+ p->strict_cpu = false; // no strict placement (all threads share same cpumask)
+ p->paused = false; // threads are ready to go
+ memset(p->cpumask, 0, GGML_MAX_N_THREADS); // all-zero means use the default affinity (usually inherited)
+}
+
+struct ggml_threadpool_params ggml_threadpool_params_default(int n_threads) {
+ struct ggml_threadpool_params p;
+ ggml_threadpool_params_init(&p, n_threads);
+ return p;
+}
+
+bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) {
+ if (p0->n_threads != p1->n_threads ) return false;
+ if (p0->prio != p1->prio ) return false;
+ if (p0->poll != p1->poll ) return false;
+ if (p0->strict_cpu != p1->strict_cpu ) return false;
+ return memcmp(p0->cpumask, p1->cpumask, GGML_MAX_N_THREADS) == 0;
+}
+
+static struct ggml_threadpool * ggml_threadpool_new_impl(
+ struct ggml_threadpool_params * tpp,
+ struct ggml_cgraph * cgraph,
+ struct ggml_cplan * cplan) {
+
+ struct ggml_threadpool * threadpool =
+ GGML_ALIGNED_MALLOC(sizeof(struct ggml_threadpool));
+ {
+ threadpool->cgraph = cgraph;
+ threadpool->cplan = cplan;
+ threadpool->n_graph = 0;
+ threadpool->n_barrier = 0;
+ threadpool->n_barrier_passed = 0;
+ threadpool->current_chunk = 0;
+ threadpool->stop = false;
+ threadpool->pause = tpp->paused;
+ threadpool->workers = NULL;
+ threadpool->n_threads_max = tpp->n_threads;
+ threadpool->n_threads_cur = tpp->n_threads;
+ threadpool->poll = tpp->poll;
+ threadpool->prio = tpp->prio;
+ threadpool->ec = GGML_STATUS_SUCCESS;
+ }
+
+ // Allocate and init workers state
+ const size_t workers_size = sizeof(struct ggml_compute_state) * tpp->n_threads;
+ struct ggml_compute_state * workers = GGML_ALIGNED_MALLOC(workers_size);
+
+ memset(workers, 0, workers_size);
+ for (int j = 0; j < tpp->n_threads; j++) {
+ workers[j].threadpool = threadpool;
+ workers[j].ith = j;
+ }
+
+ threadpool->workers = workers;
+
+#ifndef GGML_USE_OPENMP
+ ggml_mutex_init(&threadpool->mutex);
+ ggml_cond_init(&threadpool->cond);
+
+ // Spin the threads for all workers, and update CPU placements.
+ // Place the main thread last (towards the higher numbered CPU cores).
+
+ int32_t cpumask_iter = 0;
+
+ for (int j = 1; j < tpp->n_threads; j++) {
+ ggml_thread_cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter);
+
+ int32_t rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_secondary_thread, &workers[j]);
+ GGML_ASSERT(rc == 0);
+ }
+
+ ggml_thread_cpumask_next(tpp->cpumask, workers[0].cpumask, tpp->strict_cpu, &cpumask_iter);
+
+ if (!threadpool->pause) {
+ // Update main thread prio and affinity at the start, otherwise we'll do it in resume
+ ggml_thread_apply_priority(threadpool->prio);
+ if (ggml_thread_cpumask_is_valid(threadpool->workers[0].cpumask)) {
+ ggml_thread_apply_affinity(threadpool->workers[0].cpumask);
+ }
+ }
+#endif // GGML_USE_OPENMP
+
+ return threadpool;
+}
+
+struct ggml_threadpool * ggml_threadpool_new(struct ggml_threadpool_params * tpp) {
+ return ggml_threadpool_new_impl(tpp, NULL, NULL);
+}
+
enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
GGML_ASSERT(cplan);
GGML_ASSERT(cplan->n_threads > 0);
GGML_ASSERT(cplan->work_size == 0 || cplan->work_data != NULL);
- int n_threads = cplan->n_threads;
-
- struct ggml_compute_state_shared state_shared = {
- /*.cgraph =*/ cgraph,
- /*.cgraph_plan =*/ cplan,
- /*.n_threads =*/ n_threads,
- /*.n_barrier =*/ 0,
- /*.n_barrier_passed =*/ 0,
- /*.abort_callback =*/ NULL,
- /*.abort_callback_data =*/ NULL,
- /*.current_chunk =*/ 0,
- /*.ec =*/ GGML_STATUS_SUCCESS,
- };
+ int n_threads = cplan->n_threads;
+ struct ggml_threadpool * threadpool = cplan->threadpool;
+
+ bool disposable_threadpool = false;
+
+ if (threadpool == NULL) {
+ GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
+ disposable_threadpool = true;
+
+ struct ggml_threadpool_params ttp = ggml_threadpool_params_default(n_threads);
+ threadpool = ggml_threadpool_new_impl(&ttp, cgraph, cplan);
+ } else {
+ // Reset some of the parameters that need resetting
+ // No worker threads should be accessing the parameters below at this stage
+ threadpool->cgraph = cgraph;
+ threadpool->cplan = cplan;
+ threadpool->n_threads_cur = n_threads;
+ threadpool->current_chunk = 0;
+ threadpool->ec = GGML_STATUS_SUCCESS;
+ }
+
+ if (n_threads > threadpool->n_threads_max) {
+ GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
+ }
#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
{
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
- state_shared.n_threads = n_threads;
+ threadpool->n_threads_cur = n_threads;
}
- struct ggml_compute_state worker = {
- .thrd = 0,
- .ith = omp_get_thread_num(),
- .shared = &state_shared,
- };
- ggml_graph_compute_thread(&worker);
+ ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
}
} else {
- struct ggml_compute_state worker = {
- .thrd = 0,
- .ith = 0,
- .shared = &state_shared,
- };
- ggml_graph_compute_thread(&worker);
+ ggml_graph_compute_thread(&threadpool->workers[0]);
}
#else
- struct ggml_compute_state * workers = alloca(sizeof(struct ggml_compute_state)*n_threads);
-
- for (int j = 0; j < n_threads; ++j) {
- workers[j] = (struct ggml_compute_state) {
- .thrd = 0,
- .ith = j,
- .shared = &state_shared,
- };
- }
-
- // create thread pool
- for (int j = 1; j < n_threads; ++j) {
- const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
- GGML_ASSERT(rc == 0);
- UNUSED(rc);
- }
-
- // this is a work thread too
- ggml_graph_compute_thread(&workers[0]);
+ // Kick all threads to start the new graph
+ ggml_graph_compute_kickoff(threadpool);
- // join or kill thread pool
- if (n_threads > 1) {
- for (int j = 1; j < n_threads; j++) {
- const int rc = ggml_thread_join(workers[j].thrd, NULL);
- GGML_ASSERT(rc == 0);
- UNUSED(rc);
- }
- }
+ // This is a work thread too
+ ggml_graph_compute_thread(&threadpool->workers[0]);
#endif
// don't leave affinity set on the main thread
clear_numa_thread_affinity();
- return state_shared.ec;
+ enum ggml_status ret = threadpool->ec;
+
+ if (disposable_threadpool) {
+ ggml_threadpool_free(threadpool);
+ }
+
+ return ret;
}
enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads) {
- struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads);
+ struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads, NULL);
struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size);
float * pf = params.past > 0 ? opt->adam.pf->data : NULL; // past function values
- struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads);
+ struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads, NULL);
struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size);
cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs;
opt->iter = iter;
}
- struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads);
+ struct ggml_cplan cplan = ggml_graph_plan(gb, params.n_threads, NULL);
struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size);
cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs;