#ifdef USE_ACL_GRAPH
struct ggml_graph_node_properties {
// dst tensor
- void * node_address;
- int64_t ne[GGML_MAX_DIMS];
- size_t nb[GGML_MAX_DIMS];
+ void * node_address;
+ ggml_type node_type;
+ int64_t ne[GGML_MAX_DIMS];
+ size_t nb[GGML_MAX_DIMS];
// src tensor
- void * src_address[GGML_MAX_SRC];
- int64_t src_ne[GGML_MAX_SRC][GGML_MAX_DIMS];
- size_t src_nb[GGML_MAX_SRC][GGML_MAX_DIMS];
+ void * src_address[GGML_MAX_SRC];
+ ggml_type src_type[GGML_MAX_SRC];
+ int64_t src_ne[GGML_MAX_SRC][GGML_MAX_DIMS];
+ size_t src_nb[GGML_MAX_SRC][GGML_MAX_DIMS];
// op
ggml_op node_op;
return false;
}
+ if (node->type != this->node_type) {
+ return false;
+ }
+
for (int i = 0; i < GGML_MAX_DIMS; i++) {
if (node->ne[i] != this->ne[i]) {
return false;
return false;
}
+ if (node->src[i]->type != this->src_type[i]) {
+ return false;
+ }
+
for (int d = 0; d < GGML_MAX_DIMS; d++) {
if (node->src[i]->ne[d] != this->src_ne[i][d]) {
return false;
}
}
- if (node->op == GGML_OP_SCALE || node->op == GGML_OP_UNARY || node->op == GGML_OP_GLU || node->op == GGML_OP_ROPE){
- return memcmp(this->op_params, node->op_params, GGML_MAX_OP_PARAMS) == 0;
- }
- return true;
+ return memcmp(this->op_params, node->op_params, GGML_MAX_OP_PARAMS) == 0;
}
};
prop.node_address = node->data;
prop.node_op = node->op;
+ prop.node_type = node->type;
std::copy_n(node->ne, GGML_MAX_DIMS, prop.ne);
std::copy_n(node->nb, GGML_MAX_DIMS, prop.nb);
for (int src = 0; src < GGML_MAX_SRC; ++src) {
if (node->src[src]) {
prop.src_address[src] = node->src[src]->data;
+ prop.src_type[src] = node->src[src]->type;
std::copy_n(node->src[src]->ne, GGML_MAX_DIMS, prop.src_ne[src]);
std::copy_n(node->src[src]->nb, GGML_MAX_DIMS, prop.src_nb[src]);
} else {
prop.src_address[src] = nullptr;
+ prop.src_type[src] = GGML_TYPE_COUNT;
std::fill_n(prop.src_ne[src], GGML_MAX_DIMS, 0);
std::fill_n(prop.src_nb[src], GGML_MAX_DIMS, 0);
}
#include <cmath>
#include <cstdio>
#include <cstring>
+#include <memory>
#include <mutex>
#include <optional>
#include <queue>
+#include <unordered_map>
#include <unordered_set>
+#include <vector>
#define GGML_COMMON_DECL_C
}
// cann buffer
+
+/**
+ * @brief Tracks multi-threaded write progress for a single tensor.
+ *
+ * When multiple threads call set_tensor on different chunks of the same tensor,
+ * this tracker accumulates progress and defers post-processing (quantized format
+ * transform or ND-to-NZ conversion) until all data has been written.
+ */
+struct TensorSetTracker {
+ std::mutex mtx; ///< Protects concurrent access to this tracker
+ size_t bytes_written = 0; ///< Accumulated bytes written so far
+ size_t total_bytes = 0; ///< Target size (full tensor)
+ std::vector<uint8_t> host_buffer; ///< Host staging buffer for quantized tensors
+};
+
/**
* @brief Context for managing a CANN buffer associated with a specific device.
*
int32_t device; ///< The device ID associated with this buffer context.
void * dev_ptr = nullptr; ///< Pointer to the device memory allocated for the buffer.
+ std::mutex tracker_mutex; ///< Protects the trackers map
+ std::unordered_map<void *, std::unique_ptr<TensorSetTracker>> trackers;
+
/**
* @brief Constructor to initialize the CANN buffer context.
*
* @brief Destructor to free the device memory allocated for the buffer.
*/
~ggml_backend_cann_buffer_context() { ACL_CHECK(aclrtFree(dev_ptr)); }
+
+ /**
+ * @brief Get or create a tracker for the given tensor.
+ */
+ TensorSetTracker * get_or_create_tracker(ggml_tensor * tensor) {
+ std::lock_guard<std::mutex> lock(tracker_mutex);
+ auto key = tensor->data;
+ auto it = trackers.find(key);
+ if (it == trackers.end()) {
+ auto tracker = std::make_unique<TensorSetTracker>();
+ tracker->total_bytes = ggml_nbytes(tensor);
+ auto * ptr = tracker.get();
+ trackers[key] = std::move(tracker);
+ return ptr;
+ }
+ return it->second.get();
+ }
+
+ /**
+ * @brief Remove the tracker for the given tensor.
+ */
+ void remove_tracker(ggml_tensor * tensor) {
+ std::lock_guard<std::mutex> lock(tracker_mutex);
+ trackers.erase(tensor->data);
+ }
};
// cann buffer type
* designed to be used with a global array, one per device.
*/
struct ggml_cann_nz_workspace {
+ std::mutex mtx; // Protects ptr/allocated from concurrent access
void * ptr; // Pointer to allocated device buffer
size_t allocated; // Size of currently allocated buffer in bytes
* @note The workspace buffer used in this function is managed globally and reused
* across calls. This reduces overhead from repeated memory allocation and deallocation.
*/
-static void weight_format_to_nz(ggml_tensor * tensor, size_t offset, int device) {
- acl_tensor_ptr weightTransposed = ggml_cann_create_tensor(tensor, tensor->ne, tensor->nb, 2, ACL_FORMAT_ND, offset);
+static void weight_format_to_nz(ggml_tensor * tensor, int device) {
+ acl_tensor_ptr weightTransposed = ggml_cann_create_tensor(tensor, tensor->ne, tensor->nb, 2, ACL_FORMAT_ND, 0);
uint64_t workspaceSize = 0;
aclOpExecutor * executor;
// TransMatmulWeight
ACL_CHECK(aclnnTransMatmulWeightGetWorkspaceSize(weightTransposed.get(), &workspaceSize, &executor));
+
+ std::lock_guard<std::mutex> lock(g_nz_workspaces[device].mtx);
// Avoid frequent malloc/free of the workspace.
g_nz_workspaces[device].realloc(workspaceSize);
* @brief Set tensor data in a CANN buffer.
*
* This function sets tensor data in a CANN buffer, handling transformations
- * if needed based on the tensor's type.
+ * if needed based on the tensor's type. It supports multi-threaded calls
+ * where different threads write different chunks of the same tensor.
+ *
+ * For quantized tensors (Q4_0/Q8_0), data is staged in a host buffer and
+ * the format transform is deferred until all chunks are written.
+ * For NZ weight tensors, chunks are uploaded directly but the ND-to-NZ
+ * conversion is deferred until all chunks are written.
*
* @param buffer The CANN buffer where the tensor data will be set.
* @param tensor Pointer to the tensor whose data will be set.
ggml_backend_cann_buffer_context * ctx = (ggml_backend_cann_buffer_context *) buffer->context;
ggml_cann_set_device(ctx->device);
- // TODO: refer to cann(#6017), it use thread's default stream.
- // For acl, synchronous functions use this default stream.
- // Why aclrtSynchronizeDevice?
// Only check env once.
static bool weight_to_nz = parse_bool(get_env_as_lowercase("GGML_CANN_WEIGHT_NZ").value_or("on"));
- if (!need_transform(tensor->type)) {
+
+ bool is_quantized = need_transform(tensor->type);
+ bool is_nz = !is_quantized && tensor->type != GGML_TYPE_BF16 && weight_to_nz &&
+ is_matmul_weight((const ggml_tensor *) tensor);
+
+ // Plain tensor (not quantized, not NZ): direct copy, no tracking needed
+ if (!is_quantized && !is_nz) {
ACL_CHECK(aclrtMemcpy((char *) tensor->data + offset, size, data, size, ACL_MEMCPY_HOST_TO_DEVICE));
- if (weight_to_nz && tensor->type != GGML_TYPE_BF16
- && is_matmul_weight((const ggml_tensor *) tensor)) {
+ return;
+ }
+
+ // Single-shot write (full tensor at once): handle directly without tracking overhead
+ if (offset == 0 && size == ggml_nbytes(tensor)) {
+ if (is_quantized) {
+ void * transform_buffer = malloc(size);
+ ggml_backend_cann_transform(tensor, data, transform_buffer);
+ ACL_CHECK(aclrtMemcpy(tensor->data, size, transform_buffer, size, ACL_MEMCPY_HOST_TO_DEVICE));
+ free(transform_buffer);
+ } else {
+ // NZ weight
GGML_ASSERT(tensor->ne[2] == 1);
GGML_ASSERT(tensor->ne[3] == 1);
- weight_format_to_nz(tensor, offset, ctx->device);
+ ACL_CHECK(aclrtMemcpy(tensor->data, size, data, size, ACL_MEMCPY_HOST_TO_DEVICE));
+ weight_format_to_nz(tensor, ctx->device);
}
+ return;
+ }
+
+ // Chunked write: use tracker to accumulate progress and defer transform/conversion
+ TensorSetTracker * tracker = ctx->get_or_create_tracker(tensor);
+ std::unique_lock<std::mutex> lock(tracker->mtx);
+
+ if (is_quantized) {
+ // Stage data in host buffer; transform requires full tensor data
+ if (tracker->host_buffer.empty()) {
+ tracker->host_buffer.resize(tracker->total_bytes);
+ }
+ memcpy(tracker->host_buffer.data() + offset, data, size);
} else {
- void * transform_buffer = malloc(size);
- ggml_backend_cann_transform(tensor, data, transform_buffer);
+ // NZ weight: upload chunk to device immediately, defer conversion
+ ACL_CHECK(aclrtMemcpy((char *) tensor->data + offset, size, data, size, ACL_MEMCPY_HOST_TO_DEVICE));
+ }
- ACL_CHECK(aclrtMemcpy((char *) tensor->data + offset, size, transform_buffer, size, ACL_MEMCPY_HOST_TO_DEVICE));
- free(transform_buffer);
+ tracker->bytes_written += size;
+
+ // All chunks received: perform deferred transform/conversion
+ if (tracker->bytes_written >= tracker->total_bytes) {
+ if (is_quantized) {
+ void * transform_buffer = malloc(tracker->total_bytes);
+ ggml_backend_cann_transform(tensor, tracker->host_buffer.data(), transform_buffer);
+ ACL_CHECK(aclrtMemcpy(tensor->data, tracker->total_bytes, transform_buffer, tracker->total_bytes, ACL_MEMCPY_HOST_TO_DEVICE));
+ free(transform_buffer);
+ }
+
+ if (is_nz) {
+ GGML_ASSERT(tensor->ne[2] == 1);
+ GGML_ASSERT(tensor->ne[3] == 1);
+ weight_format_to_nz(tensor, ctx->device);
+ }
+
+ // Unlock before removing tracker, as remove_tracker destroys the mutex
+ lock.unlock();
+ ctx->remove_tracker(tensor);
}
}