]> git.djapps.eu Git - pkg/ggml/sources/ggml/commitdiff
Hexagon Op queue & dispatch optimizations (llama/16820)
authorMax Krasnyansky <redacted>
Wed, 29 Oct 2025 13:29:12 +0000 (06:29 -0700)
committerGeorgi Gerganov <redacted>
Sat, 1 Nov 2025 07:41:35 +0000 (09:41 +0200)
* hexagon: remove dspqueue callbacks and do all read processing inplace

* hexagon: there is no need to ref/deref the buffers at this point

We're not going to release the buffers without flushing the session queue.
So there is no need to inc/dec the refcounts for every request.
We also don't need to include those bufs in the response.

* hexagon: bump the thread count in the adb wrapper scripts

We can use more CPU cores now that the dedicated dspqueue polling threads are not used (ie no contention).
Also enable more agressive polling for now since we still map Flash Attention (and a few other kernels) to
the CPU and those dspqueue threads were keeping the CPU cores are higher clock freqs.

* hexagon: add lhez as the second code owner

src/ggml-hexagon/ggml-hexagon.cpp
src/ggml-hexagon/htp/main.c

index 5e3dc0a3d0cc11ef89e5423c3611e2778c525500..2d376a6025c072f6586c5f65ebb54583551df805 100644 (file)
@@ -217,6 +217,9 @@ struct ggml_hexagon_session {
     void allocate(int dev_id) noexcept(false);
     void release() noexcept(true);
 
+    void enqueue(struct htp_general_req &req, struct dspqueue_buffer *bufs, uint32_t n_bufs, bool sync = false);
+    void flush();
+
     ggml_backend_buffer_type buffer_type;
     ggml_backend_buffer_type repack_buffer_type;
 
@@ -237,15 +240,37 @@ struct ggml_hexagon_session {
     uint32_t         prof_pkts;
 };
 
-// Packet callback
-static void htp_packet_callback(dspqueue_t queue, AEEResult error, void * context) {
-    auto sess = static_cast<ggml_hexagon_session *>(context);
+void ggml_hexagon_session::enqueue(struct htp_general_req &req, struct dspqueue_buffer *bufs, uint32_t n_bufs, bool sync) {
+    // Bump pending flag (cleared in the session::flush once we get the responce)
+    this->op_pending++;  // atomic inc
+
+    int err = dspqueue_write(this->queue,
+                             0,                       // flags - the framework will autoset this
+                             n_bufs,                  // number of buffers
+                             bufs,                    // buffer references
+                             sizeof(req),
+                             (const uint8_t *) &req,  // Message
+                             1000000                  // Timeout
+    );
+
+    if (err != 0) {
+        GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", this->name.c_str(), (unsigned) err);
+    }
+
+    if (sync) {
+        flush();
+    }
+}
+
+// Flush HTP response queue i.e wait for all outstanding requests to complete
+void ggml_hexagon_session::flush() {
+    dspqueue_t q = this->queue;
 
     // Repeatedly read packets from the queue until it's empty. We don't
     // necessarily get a separate callback for each packet, and new packets
     // may arrive while we're processing the previous one.
 
-    while (1) {
+    while (this->op_pending) {
         struct htp_general_rsp rsp;
         uint32_t               rsp_size;
         uint32_t               flags;
@@ -253,22 +278,23 @@ static void htp_packet_callback(dspqueue_t queue, AEEResult error, void * contex
         struct dspqueue_buffer bufs[HTP_MAX_PACKET_BUFFERS];
         uint32_t               n_bufs;
 
-        // Read packet from queue
-        int err = dspqueue_read_noblock(queue, &flags,
-                                        HTP_MAX_PACKET_BUFFERS,  // Maximum number of buffer references
-                                        &n_bufs,                 // Number of buffer references
-                                        bufs,                    // Buffer references
-                                        sizeof(rsp),             // Max message length
-                                        &rsp_size,               // Message length
-                                        (uint8_t *) &rsp);
-
-        if (err == AEE_EWOULDBLOCK) {
-            // Consumed all packets available for now
-            return;
+        // Read response packet from queue
+        int err = dspqueue_read(q, &flags,
+                                   HTP_MAX_PACKET_BUFFERS,  // Maximum number of buffer references
+                                   &n_bufs,                 // Number of buffer references
+                                   bufs,                    // Buffer references
+                                   sizeof(rsp),             // Max message length
+                                   &rsp_size,               // Message length
+                                   (uint8_t *) &rsp,
+                                   1000000);                // Timeout
+
+        if (err == AEE_EEXPIRED) {
+            // TODO: might need to bail out if the HTP is stuck on something
+            continue;
         }
 
         if (err != 0) {
-            GGML_ABORT("ggml-hex: dspqueue_read_noblock failed: 0x%08x\n", (unsigned) err);
+            GGML_ABORT("ggml-hex: dspqueue_read failed: 0x%08x\n", (unsigned) err);
         }
 
         // Basic sanity checks
@@ -281,21 +307,15 @@ static void htp_packet_callback(dspqueue_t queue, AEEResult error, void * contex
             // TODO: handle errors
         }
 
-        // FIXME: update profiling implementation
-        sess->prof_usecs  = rsp.prof_usecs;
-        sess->prof_cycles = rsp.prof_cycles;
-        sess->prof_pkts   = rsp.prof_pkts;
+        // TODO: update profiling implementation, currently only works for opt_opsync mode
+        this->prof_usecs  = rsp.prof_usecs;
+        this->prof_cycles = rsp.prof_cycles;
+        this->prof_pkts   = rsp.prof_pkts;
 
-        sess->op_pending--;  // atomic dec
+        this->op_pending--;  // atomic dec
     }
 }
 
-// Error callback - simply terminates with an error. Used where we don't
-// expect errors.
-[[noreturn]] static void htp_error_callback(dspqueue_t queue, AEEResult error, void * context) {
-    GGML_ABORT("ggml-hex: dspcall general error 0x%x: for queue %p\n", error, (void *) queue);
-}
-
 // ** backend buffers
 
 struct ggml_backend_hexagon_buffer_type_context {
@@ -1564,7 +1584,8 @@ void ggml_hexagon_session::allocate(int dev_id) noexcept(false) {
                           0,              // Flags
                           128 * 1024,     // Request  queue size (in bytes)
                           64 * 1024,      // Response queue size (in bytes)
-                          htp_packet_callback, htp_error_callback,
+                          nullptr,        // Read packet callback (we handle reads explicitly)
+                          nullptr,        // Error callback (we handle errors during reads)
                           (void *) this,  // Callback context
                           &queue);
     if (err != 0) {
@@ -2205,7 +2226,7 @@ static void ggml_hexagon_mul_mat(const struct ggml_tensor * op, uint32_t flags)
     bufs[0].ptr    = src0->data;
     bufs[0].offset = (uint8_t *) src0->data - src0_buf->base;
     bufs[0].size   = ggml_nbytes(src0);
-    bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_REF;
+    bufs[0].flags  = 0;
 
     // Second buffer Input Activations. This is a buffer that the CPU
     // writes and the DSP reads, so we'll need to flush CPU caches and
@@ -2215,8 +2236,7 @@ static void ggml_hexagon_mul_mat(const struct ggml_tensor * op, uint32_t flags)
     bufs[1].ptr    = src1->data;
     bufs[1].offset = (uint8_t *) src1->data - src1_buf->base;
     bufs[1].size   = ggml_nbytes(src1);
-    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
 
     // Third buffer Output Activations. We'll handle DSP
@@ -2227,7 +2247,7 @@ static void ggml_hexagon_mul_mat(const struct ggml_tensor * op, uint32_t flags)
     bufs[2].ptr    = dst->data;
     bufs[2].offset = (uint8_t *) dst->data - dst_buf->base;
     bufs[2].size   = ggml_nbytes(dst);
-    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_REF | DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
+    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
 
     // Primary DSP session from the src0 (normally weight) tensor
     auto sess = src0_buf->sess;
@@ -2255,27 +2275,7 @@ static void ggml_hexagon_mul_mat(const struct ggml_tensor * op, uint32_t flags)
     }
 
     if ((opt_opmask & HTP_OPMASK_QUEUE)) {
-        // Bump pending flag (cleared in the callback once we get the responce)
-        sess->op_pending++;  // atomic inc
-
-        int err = dspqueue_write(sess->queue,
-                                 0,                       // flags - the framework will autoset this
-                                 3,                       // number of buffers
-                                 bufs,                    // buffer references
-                                 sizeof(req),
-                                 (const uint8_t *) &req,  // Message
-                                 1000000                  // Timeout
-        );
-
-        if (err != 0) {
-            GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", sess->name.c_str(), (unsigned) err);
-        }
-    }
-
-    if (opt_opsync) {
-        while (sess->op_pending) {
-            ;
-        }
+        sess->enqueue(req, bufs, 3, opt_opsync);
     }
 
     t2 = ggml_time_us();
@@ -2331,7 +2331,7 @@ static void ggml_hexagon_mul_mat_id(const struct ggml_tensor * op, uint32_t flag
     bufs[0].ptr    = src0->data;
     bufs[0].offset = (uint8_t *) src0->data - src0_buf->base;
     bufs[0].size   = ggml_nbytes(src0);
-    bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_REF;
+    bufs[0].flags  = 0;
 
     // Second buffer Input Activations. This is a buffer that the CPU
     // writes and the DSP reads, so we'll need to flush CPU caches and
@@ -2341,8 +2341,7 @@ static void ggml_hexagon_mul_mat_id(const struct ggml_tensor * op, uint32_t flag
     bufs[1].ptr    = src1->data;
     bufs[1].offset = (uint8_t *) src1->data - src1_buf->base;
     bufs[1].size   = ggml_nbytes(src1);
-    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
 
     // Third buffer expert IDs. This is a buffer that the CPU
@@ -2353,8 +2352,7 @@ static void ggml_hexagon_mul_mat_id(const struct ggml_tensor * op, uint32_t flag
     bufs[2].ptr    = src2->data;
     bufs[2].offset = (uint8_t *) src2->data - src2_buf->base;
     bufs[2].size   = ggml_nbytes(src2);
-    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
 
     // Forth buffer Output Activations. We'll handle DSP
@@ -2365,7 +2363,7 @@ static void ggml_hexagon_mul_mat_id(const struct ggml_tensor * op, uint32_t flag
     bufs[3].ptr    = dst->data;
     bufs[3].offset = (uint8_t *) dst->data - dst_buf->base;
     bufs[3].size   = ggml_nbytes(dst);
-    bufs[3].flags  = (DSPQUEUE_BUFFER_FLAG_REF | DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
+    bufs[3].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
 
     // Primary DSP session from the src0 (normally weight) tensor
     auto sess = src0_buf->sess;
@@ -2394,27 +2392,7 @@ static void ggml_hexagon_mul_mat_id(const struct ggml_tensor * op, uint32_t flag
     }
 
     if ((opt_opmask & HTP_OPMASK_QUEUE)) {
-        // Bump pending flag (cleared in the callback once we get the responce)
-        sess->op_pending++;  // atomic inc
-
-        int err = dspqueue_write(sess->queue,
-                                 0,                       // flags - the framework will autoset this
-                                 4,                       // number of buffers
-                                 bufs,                    // buffer references
-                                 sizeof(req),
-                                 (const uint8_t *) &req,  // Message
-                                 1000000                  // Timeout
-        );
-
-        if (err != 0) {
-            GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", sess->name.c_str(), (unsigned) err);
-        }
-    }
-
-    if (opt_opsync) {
-        while (sess->op_pending) {
-            ;
-        }
+        sess->enqueue(req, bufs, 4, opt_opsync);
     }
 
     t2 = ggml_time_us();
@@ -2487,8 +2465,7 @@ static void ggml_hexagon_binary(const struct ggml_tensor * op, uint32_t flags) {
     bufs[0].ptr    = src0->data;
     bufs[0].offset = (uint8_t *) src0->data - src0_buf->base;
     bufs[0].size   = ggml_nbytes(src0);
-    bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP;
 
     // Second buffer = Second Operand of Binary op
@@ -2500,8 +2477,7 @@ static void ggml_hexagon_binary(const struct ggml_tensor * op, uint32_t flags) {
     bufs[1].ptr    = src1->data;
     bufs[1].offset = (uint8_t *) src1->data - src1_buf->base;
     bufs[1].size   = ggml_nbytes(src1);
-    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
 
     // Third buffer = Output Activations. We'll handle DSP
@@ -2512,7 +2488,7 @@ static void ggml_hexagon_binary(const struct ggml_tensor * op, uint32_t flags) {
     bufs[2].ptr    = dst->data;
     bufs[2].offset = (uint8_t *) dst->data - dst_buf->base;
     bufs[2].size   = ggml_nbytes(dst);
-    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_REF | DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
+    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
 
     // Primary DSP session from the src0 tensor
     ggml_hexagon_session * sess = src0_buf->sess;
@@ -2540,26 +2516,7 @@ static void ggml_hexagon_binary(const struct ggml_tensor * op, uint32_t flags) {
     }
 
     if ((opt_opmask & HTP_OPMASK_QUEUE)) {
-        // Bump pending flag (cleared in the callback once we get the responce)
-        sess->op_pending++;  // atomic inc
-
-        int err = dspqueue_write(sess->queue,
-                                 0,                       // flags - the framework will autoset this
-                                 3,                       // number of buffers
-                                 bufs,                    // buffer references
-                                 sizeof(req),
-                                 (const uint8_t *) &req,  // Message
-                                 1000000);                // Timeout
-
-        if (0 != err) {
-            GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", sess->name.c_str(), (unsigned) err);
-        }
-    }
-
-    if (opt_opsync) {
-        while (sess->op_pending) {
-            ;
-        }
+        sess->enqueue(req, bufs, 3, opt_opsync);
     }
 
     t2 = ggml_time_us();
@@ -2624,8 +2581,7 @@ static void ggml_hexagon_add_id(const struct ggml_tensor * op, uint32_t flags) {
     bufs[0].ptr    = src0->data;
     bufs[0].offset = (uint8_t *) src0->data - src0_buf->base;
     bufs[0].size   = ggml_nbytes(src0);
-    bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP;
 
     // Second buffer = experts bias
@@ -2633,8 +2589,7 @@ static void ggml_hexagon_add_id(const struct ggml_tensor * op, uint32_t flags) {
     bufs[1].ptr    = src1->data;
     bufs[1].offset = (uint8_t *) src1->data - src1_buf->base;
     bufs[1].size   = ggml_nbytes(src1);
-    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
 
     // Third buffer = activated experts
@@ -2642,8 +2597,7 @@ static void ggml_hexagon_add_id(const struct ggml_tensor * op, uint32_t flags) {
     bufs[2].ptr    = src2->data;
     bufs[2].offset = (uint8_t *) src2->data - src2_buf->base;
     bufs[2].size   = ggml_nbytes(src2);
-    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                     DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                      DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
 
     // Forth buffer = output activations
@@ -2651,7 +2605,7 @@ static void ggml_hexagon_add_id(const struct ggml_tensor * op, uint32_t flags) {
     bufs[3].ptr    = dst->data;
     bufs[3].offset = (uint8_t *) dst->data - dst_buf->base;
     bufs[3].size   = ggml_nbytes(dst);
-    bufs[3].flags  = (DSPQUEUE_BUFFER_FLAG_REF | DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
+    bufs[3].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
 
     // Primary DSP session from the src0 tensor
     ggml_hexagon_session * sess = src0_buf->sess;
@@ -2681,26 +2635,7 @@ static void ggml_hexagon_add_id(const struct ggml_tensor * op, uint32_t flags) {
     }
 
     if ((opt_opmask & HTP_OPMASK_QUEUE)) {
-        // Bump pending flag (cleared in the callback once we get the responce)
-        sess->op_pending++;  // atomic inc
-
-        int err = dspqueue_write(sess->queue,
-                                 0,                       // flags - the framework will autoset this
-                                 4,                       // number of buffers
-                                 bufs,                    // buffer references
-                                 sizeof(req),
-                                 (const uint8_t *) &req,  // Message
-                                 1000000);                // Timeout
-
-        if (0 != err) {
-            GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", sess->name.c_str(), (unsigned) err);
-        }
-    }
-
-    if (opt_opsync) {
-        while (sess->op_pending) {
-            ;
-        }
+        sess->enqueue(req, bufs, 4, opt_opsync);
     }
 
     t2 = ggml_time_us();
@@ -2798,8 +2733,7 @@ static void ggml_hexagon_unary(const struct ggml_tensor * op, uint32_t flags) {
     bufs[n_bufs].ptr    = src0->data;
     bufs[n_bufs].offset = (uint8_t *) src0->data - src0_buf->base;
     bufs[n_bufs].size   = ggml_nbytes(src0);
-    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                          DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                           DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP;
     ++n_bufs;
 
@@ -2814,8 +2748,7 @@ static void ggml_hexagon_unary(const struct ggml_tensor * op, uint32_t flags) {
         bufs[n_bufs].ptr    = src1->data;
         bufs[n_bufs].offset = (uint8_t *) src1->data - src1_buf->base;
         bufs[n_bufs].size   = ggml_nbytes(src1);
-        bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                              DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+        bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                               DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
         ++n_bufs;
     }
@@ -2830,7 +2763,7 @@ static void ggml_hexagon_unary(const struct ggml_tensor * op, uint32_t flags) {
     bufs[n_bufs].ptr    = dst->data;
     bufs[n_bufs].offset = (uint8_t *) dst->data - dst_buf->base;
     bufs[n_bufs].size   = ggml_nbytes(dst);
-    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF | DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
+    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
     ++n_bufs;
 
     // Primary DSP session from the src0 tensor
@@ -2863,26 +2796,7 @@ static void ggml_hexagon_unary(const struct ggml_tensor * op, uint32_t flags) {
     }
 
     if ((opt_opmask & HTP_OPMASK_QUEUE)) {
-        // Bump pending flag (cleared in the callback once we get the responce)
-        sess->op_pending++;  // atomic inc
-
-        int err = dspqueue_write(sess->queue,
-                                 0,                       // flags - the framework will autoset this
-                                 n_bufs,                  // number of buffers
-                                 bufs,                    // buffer references
-                                 sizeof(req),
-                                 (const uint8_t *) &req,  // Message
-                                 1000000);                // Timeout
-
-        if (0 != err) {
-            GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", sess->name.c_str(), (unsigned) err);
-        }
-    }
-
-    if (opt_opsync) {
-        while (sess->op_pending) {
-            ;
-        }
+        sess->enqueue(req, bufs, n_bufs, opt_opsync);
     }
 
     t2 = ggml_time_us();
@@ -2956,8 +2870,7 @@ static void ggml_hexagon_rope(const struct ggml_tensor * op, uint32_t flags) {
     bufs[n_bufs].ptr    = src0->data;
     bufs[n_bufs].offset = (uint8_t *) src0->data - src0_buf->base;
     bufs[n_bufs].size   = ggml_nbytes(src0);
-    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                          DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                           DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP;
     ++n_bufs;
 
@@ -2971,8 +2884,7 @@ static void ggml_hexagon_rope(const struct ggml_tensor * op, uint32_t flags) {
     bufs[n_bufs].ptr    = src1->data;
     bufs[n_bufs].offset = (uint8_t *) src1->data - src1_buf->base;
     bufs[n_bufs].size   = ggml_nbytes(src1);
-    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                          DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                           DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
     ++n_bufs;
 
@@ -2987,8 +2899,7 @@ static void ggml_hexagon_rope(const struct ggml_tensor * op, uint32_t flags) {
         bufs[n_bufs].ptr    = src2->data;
         bufs[n_bufs].offset = (uint8_t *) src2->data - src2_buf->base;
         bufs[n_bufs].size   = ggml_nbytes(src2);
-        bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF |                   // Take a reference
-                              DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush CPU
+        bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush CPU
                               DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate DSP
         ++n_bufs;
     }
@@ -3003,7 +2914,7 @@ static void ggml_hexagon_rope(const struct ggml_tensor * op, uint32_t flags) {
     bufs[n_bufs].ptr    = dst->data;
     bufs[n_bufs].offset = (uint8_t *) dst->data - dst_buf->base;
     bufs[n_bufs].size   = ggml_nbytes(dst);
-    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_REF | DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
+    bufs[n_bufs].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER);
     ++n_bufs;
 
     // Primary DSP session from the src0 tensor
@@ -3036,26 +2947,7 @@ static void ggml_hexagon_rope(const struct ggml_tensor * op, uint32_t flags) {
     }
 
     if ((opt_opmask & HTP_OPMASK_QUEUE)) {
-        // Bump pending flag (cleared in the callback once we get the responce)
-        sess->op_pending++;  // atomic inc
-
-        int err = dspqueue_write(sess->queue,
-                                 0,                       // flags - the framework will autoset this
-                                 n_bufs,                  // number of buffers
-                                 bufs,                    // buffer references
-                                 sizeof(req),
-                                 (const uint8_t *) &req,  // Message
-                                 1000000);                // Timeout
-
-        if (0 != err) {
-            GGML_ABORT("ggml-hex: %s dspqueue_write failed: 0x%08x\n", sess->name.c_str(), (unsigned) err);
-        }
-    }
-
-    if (opt_opsync) {
-        while (sess->op_pending) {
-            ;
-        }
+        sess->enqueue(req, bufs, n_bufs, opt_opsync);
     }
 
     t2 = ggml_time_us();
@@ -3200,9 +3092,7 @@ static ggml_status ggml_backend_hexagon_graph_compute(ggml_backend_t backend, gg
     }
 
     // Wait until all pending ops complete
-    while (sess->op_pending) {
-        ;
-    }
+    sess->flush();
 
     return GGML_STATUS_SUCCESS;
 }
@@ -3213,9 +3103,7 @@ static void ggml_backend_hexagon_synchronize(ggml_backend_t backend) {
     HEX_VERBOSE("ggml-hex: %s synchronize\n", sess->name.c_str());
 
     // Wait until all pending ops complete
-    while (sess->op_pending) {
-        ;
-    }
+    sess->flush();
 }
 
 struct node_info {
index e35ea3b0211c8cd2d728a8ecb78ce8e9dceb465c..10e2733324354fb1c6d6081667e71e33fcd12a9f 100644 (file)
@@ -395,28 +395,14 @@ static void proc_matmul_req(struct htp_context *     ctx,
                             struct htp_general_req * req,
                             struct dspqueue_buffer * bufs,
                             size_t                   n_bufs) {
-    // Prep response buffer structs (needed for error responses, etc)
-    struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    rsp_bufs[1].fd     = bufs[1].fd;
-    rsp_bufs[1].ptr    = bufs[1].ptr;
-    rsp_bufs[1].size   = bufs[1].size;
-    rsp_bufs[1].offset = bufs[1].offset;
-    rsp_bufs[1].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
+    struct dspqueue_buffer rsp_bufs[1];
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[2].fd     = bufs[2].fd;
-    rsp_bufs[2].ptr    = bufs[2].ptr;
-    rsp_bufs[2].size   = bufs[2].size;
-    rsp_bufs[2].offset = bufs[2].offset;
-    rsp_bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                         DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
+    rsp_bufs[0].fd     = bufs[2].fd;
+    rsp_bufs[0].ptr    = bufs[2].ptr;
+    rsp_bufs[0].size   = bufs[2].size;
+    rsp_bufs[0].offset = bufs[2].offset;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
 
     // Setup Op context
@@ -444,41 +430,21 @@ static void proc_matmul_req(struct htp_context *     ctx,
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 3, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void proc_matmul_id_req(struct htp_context *     ctx,
                                struct htp_general_req * req,
                                struct dspqueue_buffer * bufs,
                                size_t                   n_bufs) {
-    // Prep response buffer structs (needed for error responses, etc)
-    struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    rsp_bufs[1].fd     = bufs[1].fd;
-    rsp_bufs[1].ptr    = bufs[1].ptr;
-    rsp_bufs[1].size   = bufs[1].size;
-    rsp_bufs[1].offset = bufs[1].offset;
-    rsp_bufs[1].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    rsp_bufs[2].fd     = bufs[2].fd;
-    rsp_bufs[2].ptr    = bufs[2].ptr;
-    rsp_bufs[2].size   = bufs[2].size;
-    rsp_bufs[2].offset = bufs[2].offset;
-    rsp_bufs[2].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
+    struct dspqueue_buffer rsp_bufs[1];
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[3].fd     = bufs[3].fd;
-    rsp_bufs[3].ptr    = bufs[3].ptr;
-    rsp_bufs[3].size   = bufs[3].size;
-    rsp_bufs[3].offset = bufs[3].offset;
-    rsp_bufs[3].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                         DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
+    rsp_bufs[0].fd     = bufs[3].fd;
+    rsp_bufs[0].ptr    = bufs[3].ptr;
+    rsp_bufs[0].size   = bufs[3].size;
+    rsp_bufs[0].offset = bufs[3].offset;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
 
     // Setup Op context
@@ -508,32 +474,18 @@ static void proc_matmul_id_req(struct htp_context *     ctx,
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 4, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void proc_binary_req(struct htp_context * ctx, struct htp_general_req * req, struct dspqueue_buffer * bufs) {
-    struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    rsp_bufs[1].fd     = bufs[1].fd;
-    rsp_bufs[1].ptr    = bufs[1].ptr;
-    rsp_bufs[1].offset = bufs[1].offset;
-    rsp_bufs[1].size   = bufs[1].size;
-    rsp_bufs[1].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
+    struct dspqueue_buffer rsp_bufs[1];
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[2].fd     = bufs[2].fd;
-    rsp_bufs[2].ptr    = bufs[2].ptr;
-    rsp_bufs[2].offset = bufs[2].offset;
-    rsp_bufs[2].size   = bufs[2].size;
-    rsp_bufs[2].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                         DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
+    rsp_bufs[0].fd     = bufs[2].fd;
+    rsp_bufs[0].ptr    = bufs[2].ptr;
+    rsp_bufs[0].offset = bufs[2].offset;
+    rsp_bufs[0].size   = bufs[2].size;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
 
     // Setup Op context
@@ -561,38 +513,18 @@ static void proc_binary_req(struct htp_context * ctx, struct htp_general_req * r
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 3, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void proc_add_id_req(struct htp_context * ctx, struct htp_general_req * req, struct dspqueue_buffer * bufs) {
-    struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    rsp_bufs[1].fd     = bufs[1].fd;
-    rsp_bufs[1].ptr    = bufs[1].ptr;
-    rsp_bufs[1].offset = bufs[1].offset;
-    rsp_bufs[1].size   = bufs[1].size;
-    rsp_bufs[1].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    rsp_bufs[2].fd     = bufs[2].fd;
-    rsp_bufs[2].ptr    = bufs[2].ptr;
-    rsp_bufs[2].offset = bufs[2].offset;
-    rsp_bufs[2].size   = bufs[2].size;
-    rsp_bufs[2].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
+    struct dspqueue_buffer rsp_bufs[1];
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[3].fd     = bufs[3].fd;
-    rsp_bufs[3].ptr    = bufs[3].ptr;
-    rsp_bufs[3].offset = bufs[3].offset;
-    rsp_bufs[3].size   = bufs[3].size;
-    rsp_bufs[3].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                         DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
+    rsp_bufs[0].fd     = bufs[3].fd;
+    rsp_bufs[0].ptr    = bufs[3].ptr;
+    rsp_bufs[0].offset = bufs[3].offset;
+    rsp_bufs[0].size   = bufs[3].size;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
 
     // Setup Op context
@@ -622,26 +554,18 @@ static void proc_add_id_req(struct htp_context * ctx, struct htp_general_req * r
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 4, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void proc_unary_req(struct htp_context * ctx, struct htp_general_req * req, struct dspqueue_buffer * bufs) {
     struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[1].fd     = bufs[1].fd;
-    rsp_bufs[1].ptr    = bufs[1].ptr;
-    rsp_bufs[1].offset = bufs[1].offset;
-    rsp_bufs[1].size   = bufs[1].size;
-    rsp_bufs[1].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                         DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
+    rsp_bufs[0].fd     = bufs[1].fd;
+    rsp_bufs[0].ptr    = bufs[1].ptr;
+    rsp_bufs[0].offset = bufs[1].offset;
+    rsp_bufs[0].size   = bufs[1].size;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
 
     // Setup Op context
@@ -669,7 +593,7 @@ static void proc_unary_req(struct htp_context * ctx, struct htp_general_req * re
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 2, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void proc_activations_req(struct htp_context *     ctx,
@@ -677,33 +601,16 @@ static void proc_activations_req(struct htp_context *     ctx,
                                  struct dspqueue_buffer * bufs,
                                  uint32_t                 n_bufs) {
     struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
 
-    int write_idx = 1;
-    if (3 == n_bufs) {
-        rsp_bufs[1].fd     = bufs[1].fd;
-        rsp_bufs[1].ptr    = bufs[1].ptr;
-        rsp_bufs[1].offset = bufs[1].offset;
-        rsp_bufs[1].size   = bufs[1].size;
-        rsp_bufs[1].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-        write_idx = 2;
-    }
+    int write_idx = (n_bufs == 3) ? 2 : 1;
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[write_idx].fd     = bufs[write_idx].fd;
-    rsp_bufs[write_idx].ptr    = bufs[write_idx].ptr;
-    rsp_bufs[write_idx].offset = bufs[write_idx].offset;
-    rsp_bufs[write_idx].size   = bufs[write_idx].size;
-    rsp_bufs[write_idx].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                                 DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
-                                 DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
+    rsp_bufs[0].fd     = bufs[write_idx].fd;
+    rsp_bufs[0].ptr    = bufs[write_idx].ptr;
+    rsp_bufs[0].offset = bufs[write_idx].offset;
+    rsp_bufs[0].size   = bufs[write_idx].size;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
+                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT); // Invalidate CPU
 
     // Setup Op context
     struct htp_ops_context octx = { 0 };
@@ -742,7 +649,7 @@ static void proc_activations_req(struct htp_context *     ctx,
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, n_bufs, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void proc_rope_req(struct htp_context *     ctx,
@@ -750,39 +657,16 @@ static void proc_rope_req(struct htp_context *     ctx,
                           struct dspqueue_buffer * bufs,
                           uint32_t                 n_bufs) {
     struct dspqueue_buffer rsp_bufs[HTP_MAX_PACKET_BUFFERS];
-    memset(rsp_bufs, 0, sizeof(rsp_bufs));
-
-    rsp_bufs[0].fd     = bufs[0].fd;
-    rsp_bufs[0].ptr    = bufs[0].ptr;
-    rsp_bufs[0].offset = bufs[0].offset;
-    rsp_bufs[0].size   = bufs[0].size;
-    rsp_bufs[0].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
 
-    rsp_bufs[1].fd     = bufs[1].fd;
-    rsp_bufs[1].ptr    = bufs[1].ptr;
-    rsp_bufs[1].offset = bufs[1].offset;
-    rsp_bufs[1].size   = bufs[1].size;
-    rsp_bufs[1].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-    int write_idx = 2;
-    if (4 == n_bufs) {
-        rsp_bufs[write_idx].fd     = bufs[write_idx].fd;
-        rsp_bufs[write_idx].ptr    = bufs[write_idx].ptr;
-        rsp_bufs[write_idx].offset = bufs[write_idx].offset;
-        rsp_bufs[write_idx].size   = bufs[write_idx].size;
-        rsp_bufs[write_idx].flags  = DSPQUEUE_BUFFER_FLAG_DEREF;  // Release reference
-
-        write_idx++;
-    }
+    int write_idx = (n_bufs == 4) ? 3 : 2;
 
     // We had written to the output buffer, we'd also need to flush it
-    rsp_bufs[write_idx].fd     = bufs[write_idx].fd;
-    rsp_bufs[write_idx].ptr    = bufs[write_idx].ptr;
-    rsp_bufs[write_idx].offset = bufs[write_idx].offset;
-    rsp_bufs[write_idx].size   = bufs[write_idx].size;
-    rsp_bufs[write_idx].flags  = (DSPQUEUE_BUFFER_FLAG_DEREF |                 // Release reference
-                                 DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |          // Flush NSP
-                                 DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT);  // Invalidate CPU
+    rsp_bufs[0].fd     = bufs[write_idx].fd;
+    rsp_bufs[0].ptr    = bufs[write_idx].ptr;
+    rsp_bufs[0].offset = bufs[write_idx].offset;
+    rsp_bufs[0].size   = bufs[write_idx].size;
+    rsp_bufs[0].flags  = (DSPQUEUE_BUFFER_FLAG_FLUSH_SENDER |         // Flush HTP
+                          DSPQUEUE_BUFFER_FLAG_INVALIDATE_RECIPIENT); // Invalidate CPU
 
     // Setup Op context
     struct htp_ops_context octx = { 0 };
@@ -819,7 +703,7 @@ static void proc_rope_req(struct htp_context *     ctx,
     }
 
     profile_stop(&prof);
-    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, n_bufs, &prof);
+    send_htp_rsp(ctx, req->op, rsp_status, rsp_bufs, 1, &prof);
 }
 
 static void htp_packet_callback(dspqueue_t queue, int error, void * context) {