]> git.djapps.eu Git - pkg/ggml/sources/ggml/commitdiff
CANN: fix multi-thread set_tensor race conditions (llama/20151)
authorhipudding <redacted>
Tue, 31 Mar 2026 14:00:51 +0000 (22:00 +0800)
committerGeorgi Gerganov <redacted>
Wed, 1 Apr 2026 13:00:26 +0000 (16:00 +0300)
* CANN: fix multi-thread set_tensor race conditions

When ollama calls ggml_backend_tensor_set from multiple threads (each
writing a different chunk of the same tensor), the CANN backend had
three concurrency issues:

1. Quantized tensors (Q4_0/Q8_0) require a full-tensor format transform
   before uploading to device. Per-chunk transforms produced corrupt data.

2. ND-to-NZ weight conversion requires complete tensor data on device.
   Per-chunk conversion operated on incomplete data.

3. The global g_nz_workspaces array had unprotected concurrent access.

Fix by introducing a TensorSetTracker that accumulates write progress
per tensor. For quantized tensors, raw data is staged in a host buffer
and the transform + upload is deferred until all chunks arrive. For NZ
weights, chunks are uploaded directly but conversion is deferred. The
tracker and its staging buffer are released immediately after
post-processing completes.

Add per-device mutex to g_nz_workspaces to prevent data races.

* CANN: fix L2_NORM ignoring eps parameter

The L2_NORM implementation was not using the eps parameter from
op_params, causing incorrect results when eps is large (e.g. 10.0).
The CPU reference computes scale = 1/fmaxf(norm, eps), so add a
Clamp step to clamp the norm to at least eps before dividing.

* ggml/cann: compare op_params for POOL_2D in ACL graph cache matching

When ACL graph mode is enabled, the graph LRU cache checks whether a
cached graph matches the current computation graph. Previously,
GGML_OP_POOL_2D was not included in the op_params comparison, so two
POOL_2D nodes with different pooling parameters (kernel size, stride,
padding) but identical tensor shapes and addresses could incorrectly
reuse a cached graph, leading to wrong results or aclnn errors.

Add GGML_OP_POOL_2D to the list of ops that require op_params matching
in ggml_graph_node_properties::has_matching_properties().

* cann: fix ACL graph cache matching by adding tensor type and unconditional op_params comparison

The ACL graph LRU cache was incorrectly reusing cached graphs for
operations with different tensor types or op_params, causing test
failures for CPY (f16 vs bf16), POOL_2D, L2_NORM, NORM_MUL_ADD,
RMS_NORM_MUL_ADD, and ADD_RMS_NORM.

Changes:
- Add node_type and src_type[] fields to ggml_graph_node_properties
  so the cache can distinguish tensors with different types but
  identical ne/nb (e.g. f16 and bf16 both have 2-byte elements)
- Compare op_params unconditionally for all ops instead of only for
  SCALE/UNARY/GLU/ROPE/POOL_2D

src/ggml-cann/aclnn_ops.cpp
src/ggml-cann/common.h
src/ggml-cann/ggml-cann.cpp

index adb4d68e8687e7fe1b3a82e16221f003e25ce98e..a950475fc3b04dbdbf126201f1926806a2800a5f 100644 (file)
@@ -434,6 +434,9 @@ void ggml_cann_norm(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
 void ggml_cann_l2_norm(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
     ggml_tensor * src = dst->src[0];
 
+    float eps;
+    memcpy(&eps, dst->op_params, sizeof(float));
+
     acl_tensor_ptr acl_src = ggml_cann_create_tensor(src);
     acl_tensor_ptr acl_dst = ggml_cann_create_tensor(dst);
 
@@ -456,6 +459,13 @@ void ggml_cann_l2_norm(ggml_backend_cann_context & ctx, ggml_tensor * dst) {
     float          p_value  = 2.0f;
     acl_scalar_ptr p_scalar = ggml_cann_create_scalar(&p_value, aclDataType::ACL_FLOAT);
     GGML_CANN_CALL_ACLNN_OP(ctx, Norm, acl_src.get(), p_scalar.get(), dims_array.get(), true, acl_div.get());
+
+    // Clamp norm to at least eps: scale = 1/fmaxf(norm, eps)
+    acl_scalar_ptr acl_min = ggml_cann_create_scalar(&eps, aclDataType::ACL_FLOAT);
+    float          flt_max = FLT_MAX;
+    acl_scalar_ptr acl_max = ggml_cann_create_scalar(&flt_max, aclDataType::ACL_FLOAT);
+    GGML_CANN_CALL_ACLNN_OP(ctx, Clamp, acl_div.get(), acl_min.get(), acl_max.get(), acl_div.get());
+
     GGML_CANN_CALL_ACLNN_OP(ctx, Div, acl_src.get(), acl_div.get(), acl_dst.get());
 }
 
index 5f960548cd2e2f5b47bb5548b5bf816e2a93c296..1c6e685c38cac5de85bf70914a95ce3afc90488b 100644 (file)
@@ -216,14 +216,16 @@ struct ggml_cann_pool_alloc {
 #ifdef USE_ACL_GRAPH
 struct ggml_graph_node_properties {
     // dst tensor
-    void *  node_address;
-    int64_t ne[GGML_MAX_DIMS];
-    size_t  nb[GGML_MAX_DIMS];
+    void *    node_address;
+    ggml_type node_type;
+    int64_t   ne[GGML_MAX_DIMS];
+    size_t    nb[GGML_MAX_DIMS];
 
     // src tensor
-    void *  src_address[GGML_MAX_SRC];
-    int64_t src_ne[GGML_MAX_SRC][GGML_MAX_DIMS];
-    size_t  src_nb[GGML_MAX_SRC][GGML_MAX_DIMS];
+    void *    src_address[GGML_MAX_SRC];
+    ggml_type src_type[GGML_MAX_SRC];
+    int64_t   src_ne[GGML_MAX_SRC][GGML_MAX_DIMS];
+    size_t    src_nb[GGML_MAX_SRC][GGML_MAX_DIMS];
 
     // op
     ggml_op node_op;
@@ -247,6 +249,10 @@ struct ggml_graph_node_properties {
             return false;
         }
 
+        if (node->type != this->node_type) {
+            return false;
+        }
+
         for (int i = 0; i < GGML_MAX_DIMS; i++) {
             if (node->ne[i] != this->ne[i]) {
                 return false;
@@ -262,6 +268,10 @@ struct ggml_graph_node_properties {
                     return false;
                 }
 
+                if (node->src[i]->type != this->src_type[i]) {
+                    return false;
+                }
+
                 for (int d = 0; d < GGML_MAX_DIMS; d++) {
                     if (node->src[i]->ne[d] != this->src_ne[i][d]) {
                         return false;
@@ -277,10 +287,7 @@ struct ggml_graph_node_properties {
             }
         }
 
-        if (node->op == GGML_OP_SCALE || node->op == GGML_OP_UNARY || node->op == GGML_OP_GLU || node->op == GGML_OP_ROPE){
-            return memcmp(this->op_params, node->op_params, GGML_MAX_OP_PARAMS) == 0;
-        }
-        return true;
+        return memcmp(this->op_params, node->op_params, GGML_MAX_OP_PARAMS) == 0;
     }
 };
 
@@ -322,6 +329,7 @@ struct ggml_cann_graph {
 
             prop.node_address = node->data;
             prop.node_op      = node->op;
+            prop.node_type    = node->type;
 
             std::copy_n(node->ne, GGML_MAX_DIMS, prop.ne);
             std::copy_n(node->nb, GGML_MAX_DIMS, prop.nb);
@@ -329,10 +337,12 @@ struct ggml_cann_graph {
             for (int src = 0; src < GGML_MAX_SRC; ++src) {
                 if (node->src[src]) {
                     prop.src_address[src] = node->src[src]->data;
+                    prop.src_type[src]    = node->src[src]->type;
                     std::copy_n(node->src[src]->ne, GGML_MAX_DIMS, prop.src_ne[src]);
                     std::copy_n(node->src[src]->nb, GGML_MAX_DIMS, prop.src_nb[src]);
                 } else {
                     prop.src_address[src] = nullptr;
+                    prop.src_type[src]    = GGML_TYPE_COUNT;
                     std::fill_n(prop.src_ne[src], GGML_MAX_DIMS, 0);
                     std::fill_n(prop.src_nb[src], GGML_MAX_DIMS, 0);
                 }
index 6f26e91e046525cf303f7bd30bf0e2a920fce85f..40fe3d82ecce30a5911741ca6678c49ff9fea4f3 100644 (file)
 #include <cmath>
 #include <cstdio>
 #include <cstring>
+#include <memory>
 #include <mutex>
 #include <optional>
 #include <queue>
+#include <unordered_map>
 #include <unordered_set>
+#include <vector>
 
 #define GGML_COMMON_DECL_C
 
@@ -770,6 +773,21 @@ std::unique_ptr<ggml_cann_pool> ggml_backend_cann_context::new_pool_for_device(i
 }
 
 // cann buffer
+
+/**
+ * @brief Tracks multi-threaded write progress for a single tensor.
+ *
+ * When multiple threads call set_tensor on different chunks of the same tensor,
+ * this tracker accumulates progress and defers post-processing (quantized format
+ * transform or ND-to-NZ conversion) until all data has been written.
+ */
+struct TensorSetTracker {
+    std::mutex mtx;                   ///< Protects concurrent access to this tracker
+    size_t bytes_written = 0;         ///< Accumulated bytes written so far
+    size_t total_bytes = 0;           ///< Target size (full tensor)
+    std::vector<uint8_t> host_buffer; ///< Host staging buffer for quantized tensors
+};
+
 /**
  * @brief Context for managing a CANN buffer associated with a specific device.
  *
@@ -780,6 +798,9 @@ struct ggml_backend_cann_buffer_context {
     int32_t device;             ///< The device ID associated with this buffer context.
     void *  dev_ptr = nullptr;  ///< Pointer to the device memory allocated for the buffer.
 
+    std::mutex tracker_mutex;   ///< Protects the trackers map
+    std::unordered_map<void *, std::unique_ptr<TensorSetTracker>> trackers;
+
     /**
      * @brief Constructor to initialize the CANN buffer context.
      *
@@ -792,6 +813,31 @@ struct ggml_backend_cann_buffer_context {
      * @brief Destructor to free the device memory allocated for the buffer.
      */
     ~ggml_backend_cann_buffer_context() { ACL_CHECK(aclrtFree(dev_ptr)); }
+
+    /**
+     * @brief Get or create a tracker for the given tensor.
+     */
+    TensorSetTracker * get_or_create_tracker(ggml_tensor * tensor) {
+        std::lock_guard<std::mutex> lock(tracker_mutex);
+        auto key = tensor->data;
+        auto it = trackers.find(key);
+        if (it == trackers.end()) {
+            auto tracker = std::make_unique<TensorSetTracker>();
+            tracker->total_bytes = ggml_nbytes(tensor);
+            auto * ptr = tracker.get();
+            trackers[key] = std::move(tracker);
+            return ptr;
+        }
+        return it->second.get();
+    }
+
+    /**
+     * @brief Remove the tracker for the given tensor.
+     */
+    void remove_tracker(ggml_tensor * tensor) {
+        std::lock_guard<std::mutex> lock(tracker_mutex);
+        trackers.erase(tensor->data);
+    }
 };
 
 // cann buffer type
@@ -1124,6 +1170,7 @@ static enum ggml_status ggml_backend_cann_buffer_init_tensor(ggml_backend_buffer
  * designed to be used with a global array, one per device.
  */
 struct ggml_cann_nz_workspace {
+    std::mutex mtx;    // Protects ptr/allocated from concurrent access
     void * ptr;        // Pointer to allocated device buffer
     size_t allocated;  // Size of currently allocated buffer in bytes
 
@@ -1190,13 +1237,15 @@ static ggml_cann_nz_workspace g_nz_workspaces[GGML_CANN_MAX_DEVICES];
  * @note The workspace buffer used in this function is managed globally and reused
  *       across calls. This reduces overhead from repeated memory allocation and deallocation.
  */
-static void weight_format_to_nz(ggml_tensor * tensor, size_t offset, int device) {
-    acl_tensor_ptr weightTransposed = ggml_cann_create_tensor(tensor, tensor->ne, tensor->nb, 2, ACL_FORMAT_ND, offset);
+static void weight_format_to_nz(ggml_tensor * tensor, int device) {
+    acl_tensor_ptr weightTransposed = ggml_cann_create_tensor(tensor, tensor->ne, tensor->nb, 2, ACL_FORMAT_ND, 0);
     uint64_t       workspaceSize    = 0;
     aclOpExecutor * executor;
 
     // TransMatmulWeight
     ACL_CHECK(aclnnTransMatmulWeightGetWorkspaceSize(weightTransposed.get(), &workspaceSize, &executor));
+
+    std::lock_guard<std::mutex> lock(g_nz_workspaces[device].mtx);
     // Avoid frequent malloc/free of the workspace.
     g_nz_workspaces[device].realloc(workspaceSize);
 
@@ -1210,7 +1259,13 @@ static void weight_format_to_nz(ggml_tensor * tensor, size_t offset, int device)
  * @brief Set tensor data in a CANN buffer.
  *
  * This function sets tensor data in a CANN buffer, handling transformations
- * if needed based on the tensor's type.
+ * if needed based on the tensor's type. It supports multi-threaded calls
+ * where different threads write different chunks of the same tensor.
+ *
+ * For quantized tensors (Q4_0/Q8_0), data is staged in a host buffer and
+ * the format transform is deferred until all chunks are written.
+ * For NZ weight tensors, chunks are uploaded directly but the ND-to-NZ
+ * conversion is deferred until all chunks are written.
  *
  * @param buffer The CANN buffer where the tensor data will be set.
  * @param tensor Pointer to the tensor whose data will be set.
@@ -1226,26 +1281,72 @@ static void ggml_backend_cann_buffer_set_tensor(ggml_backend_buffer_t buffer,
     ggml_backend_cann_buffer_context * ctx = (ggml_backend_cann_buffer_context *) buffer->context;
 
     ggml_cann_set_device(ctx->device);
-    // TODO: refer to cann(#6017), it use thread's default stream.
-    // For acl, synchronous functions use this default stream.
-    // Why aclrtSynchronizeDevice?
 
     // Only check env once.
     static bool weight_to_nz = parse_bool(get_env_as_lowercase("GGML_CANN_WEIGHT_NZ").value_or("on"));
-    if (!need_transform(tensor->type)) {
+
+    bool is_quantized = need_transform(tensor->type);
+    bool is_nz        = !is_quantized && tensor->type != GGML_TYPE_BF16 && weight_to_nz &&
+                 is_matmul_weight((const ggml_tensor *) tensor);
+
+    // Plain tensor (not quantized, not NZ): direct copy, no tracking needed
+    if (!is_quantized && !is_nz) {
         ACL_CHECK(aclrtMemcpy((char *) tensor->data + offset, size, data, size, ACL_MEMCPY_HOST_TO_DEVICE));
-        if (weight_to_nz && tensor->type != GGML_TYPE_BF16
-            && is_matmul_weight((const ggml_tensor *) tensor)) {
+        return;
+    }
+
+    // Single-shot write (full tensor at once): handle directly without tracking overhead
+    if (offset == 0 && size == ggml_nbytes(tensor)) {
+        if (is_quantized) {
+            void * transform_buffer = malloc(size);
+            ggml_backend_cann_transform(tensor, data, transform_buffer);
+            ACL_CHECK(aclrtMemcpy(tensor->data, size, transform_buffer, size, ACL_MEMCPY_HOST_TO_DEVICE));
+            free(transform_buffer);
+        } else {
+            // NZ weight
             GGML_ASSERT(tensor->ne[2] == 1);
             GGML_ASSERT(tensor->ne[3] == 1);
-            weight_format_to_nz(tensor, offset, ctx->device);
+            ACL_CHECK(aclrtMemcpy(tensor->data, size, data, size, ACL_MEMCPY_HOST_TO_DEVICE));
+            weight_format_to_nz(tensor, ctx->device);
         }
+        return;
+    }
+
+    // Chunked write: use tracker to accumulate progress and defer transform/conversion
+    TensorSetTracker * tracker = ctx->get_or_create_tracker(tensor);
+    std::unique_lock<std::mutex> lock(tracker->mtx);
+
+    if (is_quantized) {
+        // Stage data in host buffer; transform requires full tensor data
+        if (tracker->host_buffer.empty()) {
+            tracker->host_buffer.resize(tracker->total_bytes);
+        }
+        memcpy(tracker->host_buffer.data() + offset, data, size);
     } else {
-        void * transform_buffer = malloc(size);
-        ggml_backend_cann_transform(tensor, data, transform_buffer);
+        // NZ weight: upload chunk to device immediately, defer conversion
+        ACL_CHECK(aclrtMemcpy((char *) tensor->data + offset, size, data, size, ACL_MEMCPY_HOST_TO_DEVICE));
+    }
 
-        ACL_CHECK(aclrtMemcpy((char *) tensor->data + offset, size, transform_buffer, size, ACL_MEMCPY_HOST_TO_DEVICE));
-        free(transform_buffer);
+    tracker->bytes_written += size;
+
+    // All chunks received: perform deferred transform/conversion
+    if (tracker->bytes_written >= tracker->total_bytes) {
+        if (is_quantized) {
+            void * transform_buffer = malloc(tracker->total_bytes);
+            ggml_backend_cann_transform(tensor, tracker->host_buffer.data(), transform_buffer);
+            ACL_CHECK(aclrtMemcpy(tensor->data, tracker->total_bytes, transform_buffer, tracker->total_bytes, ACL_MEMCPY_HOST_TO_DEVICE));
+            free(transform_buffer);
+        }
+
+        if (is_nz) {
+            GGML_ASSERT(tensor->ne[2] == 1);
+            GGML_ASSERT(tensor->ne[3] == 1);
+            weight_format_to_nz(tensor, ctx->device);
+        }
+
+        // Unlock before removing tracker, as remove_tracker destroys the mutex
+        lock.unlock();
+        ctx->remove_tracker(tensor);
     }
 }