#ifdef __has_include
#if __has_include(<unistd.h>)
#include <unistd.h>
+ #include <fcntl.h>
+ #include <sys/stat.h>
#if defined(_POSIX_MAPPED_FILES)
#include <sys/mman.h>
- #include <fcntl.h>
#endif
#if defined(_POSIX_MEMLOCK_RANGE)
#include <sys/resource.h>
return ret;
}
- impl(const char * fname, const char * mode) {
+ impl(const char * fname, const char * mode, [[maybe_unused]] const bool use_direct_io = false) {
fp = ggml_fopen(fname, mode);
if (fp == NULL) {
throw std::runtime_error(format("failed to open %s: %s", fname, strerror(errno)));
write_raw(&val, sizeof(val));
}
+ void read_aligned_chunk(size_t offset, void * dest, size_t size) const {
+ throw std::runtime_error("DirectIO is not implemented on Windows.");
+ }
+
~impl() {
if (fp) {
std::fclose(fp);
}
}
#else
- impl(const char * fname, const char * mode) {
+ impl(const char * fname, const char * mode, [[maybe_unused]] const bool use_direct_io = false) {
+#ifdef __linux__
+ // Try unbuffered I/O for read only
+ if (use_direct_io && std::strcmp(mode, "rb") == 0) {
+ fd = open(fname, O_RDONLY | O_DIRECT);
+
+ if (fd != -1) {
+ struct stat file_stats{};
+ fstat(fd, &file_stats);
+
+ size = file_stats.st_size;
+ alignment = file_stats.st_blksize;
+
+ off_t ret = lseek(fd, 0, SEEK_SET);
+ if (ret == -1) {
+ throw std::runtime_error(format("seek error: %s", strerror(errno)));
+ }
+ return;
+ }
+
+ LLAMA_LOG_WARN("Failed to open model %s with error: %s. Falling back to buffered I/O",
+ fname, strerror(errno));
+ }
+#endif
fp = ggml_fopen(fname, mode);
if (fp == NULL) {
throw std::runtime_error(format("failed to open %s: %s", fname, strerror(errno)));
}
size_t tell() const {
-// TODO: this ifdef is never true?
-#ifdef _WIN32
- __int64 ret = _ftelli64(fp);
-#else
- long ret = std::ftell(fp);
-#endif
- if (ret == -1) {
- throw std::runtime_error(format("ftell error: %s", strerror(errno)));
+ if (fd == -1) {
+ long ret = std::ftell(fp);
+ if (ret == -1) {
+ throw std::runtime_error(format("ftell error: %s", strerror(errno)));
+ }
+
+ return (size_t) ret;
}
- return (size_t) ret;
+ off_t pos = lseek(fd, 0, SEEK_CUR);
+ if (pos == -1) {
+ throw std::runtime_error(format("lseek error: %s", strerror(errno)));
+ }
+ return (size_t) pos;
}
void seek(size_t offset, int whence) const {
-// TODO: this ifdef is never true?
-#ifdef _WIN32
- int ret = _fseeki64(fp, (__int64) offset, whence);
-#else
- int ret = std::fseek(fp, (long) offset, whence);
-#endif
- if (ret != 0) {
+ off_t ret = 0;
+ if (fd == -1) {
+ ret = std::fseek(fp, (long) offset, whence);
+ } else {
+ ret = lseek(fd, offset, whence);
+ }
+ if (ret == -1) {
throw std::runtime_error(format("seek error: %s", strerror(errno)));
}
}
return;
}
errno = 0;
- std::size_t ret = std::fread(ptr, len, 1, fp);
- if (ferror(fp)) {
- throw std::runtime_error(format("read error: %s", strerror(errno)));
+ if (fd == -1) {
+ std::size_t ret = std::fread(ptr, len, 1, fp);
+ if (ferror(fp)) {
+ throw std::runtime_error(format("read error: %s", strerror(errno)));
+ }
+ if (ret != 1) {
+ throw std::runtime_error("unexpectedly reached end of file");
+ }
+ } else {
+ bool successful = false;
+ while (!successful) {
+ off_t ret = read(fd, ptr, len);
+
+ if (ret == -1) {
+ if (errno == EINTR) {
+ continue; // Interrupted by signal, retry
+ }
+ throw std::runtime_error(format("read error: %s", strerror(errno)));
+ }
+ if (ret == 0) {
+ throw std::runtime_error("unexpectedly reached end of file");
+ }
+
+ successful = true;
+ }
}
- if (ret != 1) {
- throw std::runtime_error("unexpectedly reached end of file");
+ }
+
+ void read_aligned_chunk(size_t offset, void * dest, size_t size) const {
+ off_t aligned_offset = offset & ~(alignment - 1);
+ off_t offset_from_alignment = offset - aligned_offset;
+ size_t bytes_to_read = (offset_from_alignment + size + alignment - 1) & ~(alignment - 1);
+
+ void * raw_buffer = nullptr;
+ int ret = posix_memalign(&raw_buffer, alignment, bytes_to_read);
+ if (ret != 0) {
+ throw std::runtime_error(format("posix_memalign failed with error %d", ret));
}
+
+ struct aligned_buffer_deleter {
+ void operator()(void * p) const { free(p); }
+ };
+ std::unique_ptr<void, aligned_buffer_deleter> buffer(raw_buffer);
+
+ seek(aligned_offset, SEEK_SET);
+ read_raw(buffer.get(), bytes_to_read);
+
+ uintptr_t actual_data = reinterpret_cast<uintptr_t>(buffer.get()) + offset_from_alignment;
+ memcpy(dest, reinterpret_cast<void *>(actual_data), size);
}
uint32_t read_u32() const {
}
~impl() {
- if (fp) {
+ if (fd != -1) {
+ close(fd);
+ } else {
std::fclose(fp);
}
}
+ int fd = -1;
#endif
- FILE * fp;
- size_t size;
+ void read_raw_at(void * ptr, size_t len, size_t offset) const {
+ if (alignment != 1) {
+ read_aligned_chunk(offset, ptr, len);
+ } else {
+ seek(offset, SEEK_SET);
+ read_raw(ptr, len);
+ }
+ }
+
+ size_t read_alignment() const {
+ return alignment;
+ }
+
+ size_t alignment = 1;
+
+ FILE * fp{};
+ size_t size{};
};
-llama_file::llama_file(const char * fname, const char * mode) : pimpl(std::make_unique<impl>(fname, mode)) {}
+llama_file::llama_file(const char * fname, const char * mode, const bool use_direct_io) :
+ pimpl(std::make_unique<impl>(fname, mode, use_direct_io)) {}
llama_file::~llama_file() = default;
size_t llama_file::tell() const { return pimpl->tell(); }
size_t llama_file::size() const { return pimpl->size; }
+size_t llama_file::read_alignment() const { return pimpl->read_alignment(); }
+
int llama_file::file_id() const {
#ifdef _WIN32
return _fileno(pimpl->fp);
void llama_file::seek(size_t offset, int whence) const { pimpl->seek(offset, whence); }
void llama_file::read_raw(void * ptr, size_t len) const { pimpl->read_raw(ptr, len); }
+void llama_file::read_raw_at(void * ptr, size_t len, size_t offset) const { pimpl->read_raw_at(ptr, len, offset); }
uint32_t llama_file::read_u32() const { return pimpl->read_u32(); }
get_key(llm_kv(LLM_KV_GENERAL_ARCHITECTURE), arch_name, false);
llm_kv = LLM_KV(llm_arch_from_string(arch_name));
- files.emplace_back(new llama_file(fname.c_str(), "rb"));
+ files.emplace_back(new llama_file(fname.c_str(), "rb", !use_mmap));
contexts.emplace_back(ctx);
// Save tensors data offset of the main file.
}
}
- files.emplace_back(new llama_file(fname_split, "rb"));
+ files.emplace_back(new llama_file(fname_split, "rb", !use_mmap));
contexts.emplace_back(ctx);
// Save tensors data offset info of the shard.
// 4 staging buffers for async uploads, each sized 1MB seems to be a good default for single NVMe drives.
// NVMe raid configurations might require more / larger buffers.
constexpr size_t n_buffers = 4;
- constexpr size_t buffer_size = 1 * 1024 * 1024; // 1MB
+
+ size_t alignment = 1;
+ for (const auto & file : files) {
+ alignment = std::max(file->read_alignment(), alignment);
+ }
+
+ // Buffer size: balance between memory usage and I/O efficiency
+ // 64MB works well for NVMe drives
+ const size_t buffer_size = alignment != 1 ? 64 * 1024 * 1024 + 2 * alignment : 1 * 1024 * 1024;
std::vector<ggml_backend_buffer_t> host_buffers;
std::vector<ggml_backend_event_t> events;
// If the backend is supported, create pinned memory buffers and events for synchronisation.
for (size_t idx = 0; idx < n_buffers; ++idx) {
auto * buf = ggml_backend_buft_alloc_buffer(host_buft, buffer_size);
+
if (!buf) {
LLAMA_LOG_DEBUG("%s: failed to allocate host buffer for async uploads for device %s\n", func,
ggml_backend_dev_name(dev));
}
} else {
const auto & file = files.at(weight->idx);
+
if (ggml_backend_buffer_is_host(cur->buffer)) {
- file->seek(weight->offs, SEEK_SET);
- file->read_raw(cur->data, n_size);
+ file->read_raw_at(cur->data, n_size, weight->offs);
if (check_tensors) {
validation_result.emplace_back(std::async(std::launch::async, [cur, n_size] {
return std::make_pair(cur, ggml_validate_row_data(cur->type, cur->data, n_size));
} else {
// If upload_backend is valid load the tensor in chunks to pinned memory and upload the buffers asynchronously to the GPU.
if (upload_backend) {
- file->seek(weight->offs, SEEK_SET);
+ auto offset = (off_t) weight->offs;
+ alignment = file->read_alignment();
+ off_t aligned_offset = offset & ~(alignment - 1);
+ off_t offset_from_alignment = offset - aligned_offset;
+ file->seek(aligned_offset, SEEK_SET);
+
+ // Calculate aligned read boundaries
+ size_t read_start = aligned_offset;
+ size_t read_end = (offset + n_size + alignment - 1) & ~(alignment - 1);
size_t bytes_read = 0;
+ size_t data_read = 0; // Actual tensor data copied (excluding padding)
+
+ while (bytes_read < read_end - read_start) {
+ size_t read_size = std::min<size_t>(buffer_size, read_end - read_start - bytes_read);
- while (bytes_read < n_size) {
- size_t read_iteration = std::min<size_t>(buffer_size, n_size - bytes_read);
+ // Align the destination pointer within the pinned buffer
+ uintptr_t ptr_dest_aligned = (reinterpret_cast<uintptr_t>(host_ptrs[buffer_idx]) + alignment - 1) & ~(alignment - 1);
+ // Wait for previous upload to complete before reusing buffer
ggml_backend_event_synchronize(events[buffer_idx]);
- file->read_raw(host_ptrs[buffer_idx], read_iteration);
- ggml_backend_tensor_set_async(upload_backend, cur, host_ptrs[buffer_idx], bytes_read, read_iteration);
+
+ // Read aligned chunk from file
+ file->read_raw(reinterpret_cast<void *>(ptr_dest_aligned), read_size);
+
+ // Calculate actual data portion (excluding alignment padding)
+ uintptr_t ptr_data = ptr_dest_aligned;
+ size_t data_to_copy = read_size;
+
+ // Skip alignment padding at start of first chunk
+ if (bytes_read == 0) {
+ ptr_data += offset_from_alignment;
+ data_to_copy -= offset_from_alignment;
+ }
+
+ // Trim alignment padding at end of last chunk
+ if (aligned_offset + bytes_read + read_size > offset + n_size) {
+ data_to_copy -= (read_end - (offset + n_size));
+ }
+
+ // Async upload actual data to GPU
+ ggml_backend_tensor_set_async(upload_backend, cur,
+ reinterpret_cast<void *>(ptr_data), data_read, data_to_copy);
ggml_backend_event_record(events[buffer_idx], upload_backend);
- bytes_read += read_iteration;
+ data_read += data_to_copy;
+ bytes_read += read_size;
+
++buffer_idx;
buffer_idx %= n_buffers;
}
} else {
read_buf.resize(n_size);
- file->seek(weight->offs, SEEK_SET);
- file->read_raw(read_buf.data(), n_size);
+ file->read_raw_at(read_buf.data(), n_size, weight->offs);
ggml_backend_tensor_set(cur, read_buf.data(), 0, n_size);
if (check_tensors && !ggml_validate_row_data(cur->type, read_buf.data(), n_size)) {
throw std::runtime_error(format("tensor '%s' has invalid data", ggml_get_name(cur)));