Files
dllama/converter/convert-hf.py
Chris 42172cbb6f
Some checks failed
main / Linux (amd64, ubuntu-22.04) (push) Successful in 49s
main / Linux (arm64, ubuntu-24.04-arm) (push) Has been cancelled
main / Windows (push) Has been cancelled
init
2025-10-24 11:42:14 +02:00

265 lines
9.4 KiB
Python

import gc
import json
import sys
import os
from writer import parseFloatType, writeTensor, writeHeader, FloatType
from safetensors import safe_open
class ArchType:
LLAMA = 0xABCD00
QWEN3 = 0xABCD01
QWEN3_MOE = 0xABCD02
def permute(tensor, nHeads: int, nKvHeads: int):
if nHeads != nKvHeads:
nHeads = nKvHeads
return (tensor.reshape(nHeads, 2, tensor.shape[0] // nHeads // 2, *tensor.shape[1:]).swapaxes(1, 2).reshape(tensor.shape))
class Processor:
def __init__(self, config):
self.config = config
self.archType = config['arch_type']
self.currentModelIndex = None
self.currentModel = None
self.currentModelKeys = None
self.layerMap = {}
self.plan = []
def __unloadModel(self):
if self.currentModel:
del self.currentModel
self.currentModel = None
gc.collect()
self.currentModelIndex = None
def __loadModel(self, index: int):
if (self.currentModelIndex == index):
return
self.__unloadModel()
filePath = self.config['files'][index]
fileName = os.path.basename(filePath)
print(f'💿 Loading file {fileName}...')
self.currentModel = safe_open(filePath, framework='pt', device='cpu')
self.currentModelKeys = list(self.currentModel.keys())
for key in self.currentModelKeys:
self.layerMap[key] = index
print(f'Found {len(self.currentModelKeys)} layers')
self.currentModelIndex = index
def __transformQ(self, tensor):
if self.archType == ArchType.LLAMA:
return permute(tensor, self.config['n_heads'], self.config['n_heads'])
return tensor
def __transformK(self, tensor):
if self.archType == ArchType.LLAMA:
return permute(tensor, self.config['n_heads'], self.config['n_kv_heads'])
return tensor
def __preparePlan(self):
wt = self.config['weights_float_type']
p = self.plan
p.append([FloatType.F32,
'model.embed_tokens.weight'])
for l in range(0, self.config['n_layers']):
p.append([wt, self.__transformQ,
f'model.layers.{l}.self_attn.q_proj.weight'])
p.append([wt, self.__transformK,
f'model.layers.{l}.self_attn.k_proj.weight'])
p.append([wt,
f'model.layers.{l}.self_attn.v_proj.weight'])
p.append([wt,
f'model.layers.{l}.self_attn.o_proj.weight'])
if (self.config['n_experts'] > 0):
p.append([FloatType.F32, f'model.layers.{l}.mlp.gate.weight'])
for e in range(self.config['n_experts']):
p.append([wt,
f'model.layers.{l}.mlp.experts.{e}.gate_proj.weight'])
p.append([wt,
f'model.layers.{l}.mlp.experts.{e}.down_proj.weight'])
p.append([wt,
f'model.layers.{l}.mlp.experts.{e}.up_proj.weight'])
else:
p.append([wt,
f'model.layers.{l}.mlp.gate_proj.weight'])
p.append([wt,
f'model.layers.{l}.mlp.down_proj.weight'])
p.append([wt,
f'model.layers.{l}.mlp.up_proj.weight'])
if (self.archType == ArchType.QWEN3 or self.archType == ArchType.QWEN3_MOE):
p.append([FloatType.F32,
f'model.layers.{l}.self_attn.q_norm.weight'])
p.append([FloatType.F32,
f'model.layers.{l}.self_attn.k_norm.weight'])
p.append([FloatType.F32,
f'model.layers.{l}.input_layernorm.weight'])
p.append([FloatType.F32,
f'model.layers.{l}.post_attention_layernorm.weight'])
p.append([FloatType.F32,
'model.norm.weight'])
p.append([wt,
'lm_head.weight', 'model.embed_tokens.weight'])
def write(self, outputFile: str):
self.__preparePlan()
# Loading the last model file to get the layer names
self.__loadModel(len(self.config['files']) - 1)
self.__unloadModel()
for planItem in self.plan:
lookup = planItem[1:]
transform = None
if (callable(lookup[0])):
transform = lookup[0]
lookup = lookup[1:]
if (self.currentModelIndex == None):
modelIndex = 0
else:
modelIndex = None
for layerName in lookup:
if (layerName in self.layerMap):
modelIndex = self.layerMap[layerName]
break
if (modelIndex is None):
modelIndex = self.currentModelIndex + 1
self.__loadModel(modelIndex)
tensor = None
for layerName in lookup:
if (layerName in self.currentModelKeys):
tensor = self.currentModel.get_tensor(layerName)
break
if tensor is None:
raise Exception(f'Layer {lookup[0]} not found')
print(f'🔶 Writing tensor {layerName} {tensor.shape}...')
floatType = planItem[0]
if (transform):
tensor = transform(tensor)
writeTensor(outputFile, tensor, floatType)
def parseArchType(type: str):
archType = {
'llama': ArchType.LLAMA,
'mistral': ArchType.LLAMA,
'qwen3': ArchType.QWEN3,
'qwen3_moe': ArchType.QWEN3_MOE,
}.get(type)
if (archType is None):
raise Exception(f'Unsupported arch type: {type}')
return archType
def parseHiddenAct(act: str):
hiddenAct = {
'gelu': 0,
'silu': 1
}.get(act)
if (hiddenAct is None):
raise Exception(f'Unsupported hidden act: {act}')
return hiddenAct
def parseRopeType(rt: str):
ropeType = {
'llama3': 2, # LLAMA3_1
}.get(rt)
if (ropeType is None):
raise Exception(f'Unsupported rope type: {ropeType}')
return ropeType
def parseRmsNormEpsilon(epsilon: float):
if (epsilon == 1e-05):
return 5
elif (epsilon == 1e-06):
return 6
raise Exception(f'Unsupported epsilon: {epsilon}')
def loadConfig(folderPath: str, weightsFloatType: int):
allFiles = os.listdir(folderPath)
allFiles.sort()
with open(os.path.join(folderPath, 'config.json')) as fc:
config = json.load(fc)
files = []
for fileName in allFiles:
if fileName.endswith('.safetensors') and not fileName.startswith('.'):
files.append(os.path.join(folderPath, fileName))
if (len(files) == 0):
raise Exception('Not found any model file')
result = {
'version': 0,
'arch_type': parseArchType(config['model_type']),
'hidden_act': parseHiddenAct(config['hidden_act']),
'dim': config['hidden_size'],
'hidden_dim': config['intermediate_size'],
'n_layers': config['num_hidden_layers'],
'n_heads': config['num_attention_heads'],
'n_kv_heads': config['num_key_value_heads'],
'weights_float_type': weightsFloatType,
'max_seq_len': config['max_position_embeddings'],
'vocab_size': config['vocab_size'],
'files': files,
}
nExperts = config.get('num_experts')
nActiveExperts = config.get('num_experts_per_tok')
result['n_experts'] = int(nExperts) if nExperts is not None else 0
result['n_active_experts'] = int(nActiveExperts) if nActiveExperts is not None else 0
ropeTheta = config.get('rope_theta')
if (ropeTheta is not None):
result['rope_theta'] = int(ropeTheta)
ropeScaling = config.get('rope_scaling')
if (ropeScaling is not None):
result['rope_scaling_factor'] = int(ropeScaling['factor'])
result['rope_scaling_low_freq_factor'] = int(ropeScaling['low_freq_factor'])
result['rope_scaling_high_freq_factory'] = int(ropeScaling['high_freq_factor'])
result['rope_scaling_orig_max_seq_len'] = int(ropeScaling['original_max_position_embeddings'])
result['rope_type'] = parseRopeType(ropeScaling['rope_type'])
headDim = config.get('head_dim')
if (headDim is not None):
result['head_dim'] = headDim
rmsNormEps = config.get('rms_norm_eps')
if (rmsNormEps is not None):
result['norm_epsilon'] = parseRmsNormEpsilon(rmsNormEps)
moeHiddenDim = config.get('moe_intermediate_size')
if (moeHiddenDim is not None):
result['moe_hidden_dim'] = int(moeHiddenDim)
return result
def printUsage():
print('Usage: python convert-hf.py <sourceFolderPath> <weightsFloatType> <name>')
print()
print('Options:')
print(' <sourceFolderPath> The path to the folder containing the model files')
print(' <weightsFloatType> The float type of the weights (e.g. "q40")')
print(' <name> The name of the model (e.g. "llama3")')
if __name__ == '__main__':
if (len(sys.argv) < 4):
printUsage()
exit(1)
sourceFolderPath = sys.argv[1]
weightsFloatType = parseFloatType(sys.argv[2])
name = sys.argv[3]
outputFileName = f'dllama_model_{name}_{sys.argv[2]}.m'
print(f'Output file: {outputFileName}')
config = loadConfig(sourceFolderPath, weightsFloatType)
with open(outputFileName, 'wb') as outputFile:
writeHeader(outputFile, config)
processor = Processor(config)
processor.write(outputFile)
print(f'{outputFileName} created successfully')