from pathlib import Path
from hashlib import sha256
from typing import TYPE_CHECKING, Any, Callable, ContextManager, Iterable, Iterator, Literal, Sequence, TypeVar, cast
+from itertools import chain
import math
import numpy as np
model_name: str | None
metadata_override: Path | None
dir_model_card: Path
- is_lora: bool
# subclasses should define this!
model_arch: gguf.MODEL_ARCH
def __init__(self, dir_model: Path, ftype: gguf.LlamaFileType, fname_out: Path, is_big_endian: bool = False,
use_temp_file: bool = False, eager: bool = False,
metadata_override: Path | None = None, model_name: str | None = None,
- split_max_tensors: int = 0, split_max_size: int = 0, dry_run: bool = False, small_first_shard: bool = False, is_lora: bool = False):
+ split_max_tensors: int = 0, split_max_size: int = 0, dry_run: bool = False, small_first_shard: bool = False):
if type(self) is Model:
raise TypeError(f"{type(self).__name__!r} should not be directly instantiated")
self.metadata_override = metadata_override
self.model_name = model_name
self.dir_model_card = dir_model # overridden in convert_lora_to_gguf.py
- self.is_lora = is_lora # true if model is used inside convert_lora_to_gguf.py
# Apply heuristics to figure out typical tensor encoding based on first layer tensor encoding type
if self.ftype == gguf.LlamaFileType.GUESSED:
return False
+ # some models need extra generated tensors (like rope_freqs)
+ def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
+ return ()
+
def prepare_tensors(self):
max_name_len = max(len(s) for _, s in self.tensor_map.mapping.values()) + len(".weight,")
- for name, data_torch in self.get_tensors():
+ for name, data_torch in chain(self.generate_extra_tensors(), self.get_tensors()):
# we don't need these
if name.endswith((".attention.masked_bias", ".attention.bias", ".rotary_emb.inv_freq")):
continue
return [(self.map_tensor_name(name), data_torch)]
- def prepare_tensors(self):
+ def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
if rope_scaling := self.find_hparam(["rope_scaling"], optional=True):
if rope_scaling.get("rope_type", '').lower() == "llama3":
base = self.hparams.get("rope_theta", 10000.0)
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
- if not self.is_lora:
- self.gguf_writer.add_tensor(self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), np.array(rope_factors, dtype=np.float32))
+ yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
+ def prepare_tensors(self):
super().prepare_tensors()
if self._experts is not None:
def set_gguf_parameters(self):
hparams = self.hparams
- rope_dims = hparams["qk_rope_head_dim"]
-
self.gguf_writer.add_file_type(self.ftype)
self.gguf_writer.add_context_length(hparams["max_position_embeddings"])
self.gguf_writer.add_embedding_length(hparams["hidden_size"])
self.gguf_writer.add_key_length(hparams["qk_nope_head_dim"] + hparams["qk_rope_head_dim"])
self.gguf_writer.add_rope_dimension_count(hparams["qk_rope_head_dim"])
+ def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
rope_scaling = self.find_hparam(['rope_scaling'], True)
- if rope_scaling is None:
- return
+ if rope_scaling is not None:
+ rope_dims = self.hparams["qk_rope_head_dim"]
- long_factors = rope_scaling.get('long_factor', None)
- short_factors = rope_scaling.get('short_factor', None)
+ long_factors = rope_scaling.get('long_factor', None)
+ short_factors = rope_scaling.get('short_factor', None)
- if long_factors is None or short_factors is None:
- raise KeyError('Missing the required key rope_scaling.long_factor or rope_scaling_short_factor')
+ if long_factors is None or short_factors is None:
+ raise KeyError('Missing the required key rope_scaling.long_factor or rope_scaling_short_factor')
- if len(long_factors) != len(short_factors) or len(long_factors) != rope_dims / 2:
- raise ValueError(f'The length of rope long and short factors must be {rope_dims / 2}')
+ if len(long_factors) != len(short_factors) or len(long_factors) != rope_dims / 2:
+ raise ValueError(f'The length of rope long and short factors must be {rope_dims / 2}')
- self.gguf_writer.add_tensor(gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.ROPE_FACTORS_LONG] + ".weight", np.array(long_factors, dtype=np.float32))
- self.gguf_writer.add_tensor(gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.ROPE_FACTORS_SHORT] + ".weight", np.array(short_factors, dtype=np.float32))
+ yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_LONG), torch.tensor(long_factors, dtype=torch.float32))
+ yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_SHORT), torch.tensor(short_factors, dtype=torch.float32))
def set_vocab(self):
- self._set_vocab_llama_hf()
+ self._set_vocab_sentencepiece()
def _reverse_hf_permute(self, weights: Tensor, n_head: int, n_kv_head: int | None = None) -> Tensor:
if n_kv_head is not None and n_head != n_kv_head:
self.gguf_writer.add_file_type(self.ftype)
self.gguf_writer.add_sliding_window(self.find_hparam(["sliding_window"]))
+ def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
+ n_embd = self.find_hparam(["hidden_size", "n_embd"])
+ n_head = self.find_hparam(["num_attention_heads", "n_head"])
+ max_pos_embds = self.find_hparam(["n_positions", "max_position_embeddings"])
+ orig_max_pos_embds = self.find_hparam(["original_max_position_embeddings"])
+ rope_dims = n_embd // n_head
+
# write rope scaling for long context (128k) model
rope_scaling = self.find_hparam(['rope_scaling'], True)
if rope_scaling is None:
if len(long_factors) != len(short_factors) or len(long_factors) != rope_dims / 2:
raise ValueError(f'The length of rope long and short factors must be {rope_dims / 2}')
- if not self.is_lora:
- self.gguf_writer.add_tensor(gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.ROPE_FACTORS_LONG] + ".weight", np.array(long_factors, dtype=np.float32))
- self.gguf_writer.add_tensor(gguf.TENSOR_NAMES[gguf.MODEL_TENSOR.ROPE_FACTORS_SHORT] + ".weight", np.array(short_factors, dtype=np.float32))
+ yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_LONG), torch.tensor(long_factors, dtype=torch.float32))
+ yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FACTORS_SHORT), torch.tensor(short_factors, dtype=torch.float32))
@Model.register("PlamoForCausalLM")
self.gguf_writer.add_rope_scaling_type(gguf.RopeScalingType.LINEAR)
self.gguf_writer.add_rope_scaling_factor(hparams["rope_scaling"]["factor"])
- def prepare_tensors(self):
+ def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
if rope_scaling := self.find_hparam(["rope_scaling"], optional=True):
if rope_scaling.get("rope_type", '').lower() == "llama3":
base = self.hparams.get("rope_theta", 10000.0)
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
- if not self.is_lora:
- self.gguf_writer.add_tensor(self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), np.array(rope_factors, dtype=np.float32))
-
- super().prepare_tensors()
+ yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
@Model.register("GraniteForCausalLM")