Files
2026-02-04 17:22:39 +08:00

714 lines
28 KiB
Python
Executable File

from collections import defaultdict, OrderedDict
import torch
from pathlib import Path
from typing import Optional
import re
import os
import shutil
import logging
import json
from transformers import AutoTokenizer, T5Tokenizer
import gc
from datetime import datetime
from vllm.platforms import current_platform
from model_special import (smooth_model_config, get_layer_weight_bias_name, get_qkv_distribution,
modify_layer_weight_bias_name)
logger = logging.getLogger(__name__)
_str_to_torch_dtype_dict = dict(
bfloat16=torch.bfloat16,
float16=torch.float16,
float32=torch.float32,
int64=torch.int64,
int32=torch.int32,
int8=torch.int8,
bool=torch.bool,
fp8=torch.float8_e4m3fn,
)
def str_dtype_to_torch(dtype):
'''
convert torch dytpe to str dtype
'''
ret = _str_to_torch_dtype_dict.get(dtype)
dtype = ret if ret is not None else torch.float16
return dtype
_torch_dtype_to_str_dict = {
torch.bfloat16:"bfloat16",
torch.float16:"float16",
torch.float32:"float32",
torch.int64:"int64",
torch.int32:"int32",
torch.int8:"int8",
torch.bool:"bool",
torch.float8_e4m3fn:"fp8",
}
def torch_dtype_to_str(dtype):
'''
convert str dytpe to torch dtype
'''
ret = _torch_dtype_to_str_dict.get(dtype)
dtype = ret if ret is not None else "float16"
return dtype
def extract_model_path(name_or_path):
'''
extract model_version, model_family from named_or_path from config.json
'''
patterns = [
r"/(.*)(-[0-9]+[mMbB]{1})(-*.*)",
r"/(.*-[0-9]+)(-*.*)",
r"(.*)(-[0-9]+[mMbB]{1})(-*.*)",
r"(.*-[0-9]+)(-*.*)",
r"([^-]+)(-*.*)",
]
model_version = None
for pattern in patterns:
match = re.search(pattern, name_or_path)
if match:
model_version = match.group(1)
break
if model_version is None:
model_version = name_or_path
model_version = model_version.lower()
match = re.search(r"([a-zA-z]+)(.*)", model_version)
if match:
model_family = match.group(1)
else:
model_family = model_version
return model_version, model_family
def read_model_name(model_dir: str, model_version: Optional[str] = None, model_type: Optional[str] = None):
'''
get model_arch, model_version, model_family, model_type form config.json, passed model_version, model_type
args:
model_dir: model directory
model_version: passed from main, default None
model_type: pass from main, default None
'''
with open(Path(model_dir) / "config.json", 'r') as f:
config = json.load(f)
model_arch = config.get('architectures', None)
name_or_path = config.get('_name_or_path', None)
if model_type is None:
model_type = config.get('model_type', None)
if model_type:
model_type = model_type.lower()
model_family = None
if model_version is None and name_or_path:
model_version, model_family = extract_model_path(name_or_path)
if model_version is None:
model_version = model_type
if model_version:
model_version = model_version.lower()
if model_version and model_family is None:
match = re.search(r"([a-zA-z]+)(.*)", model_version)
if match:
model_family = match.group(1)
else:
model_family = model_version
if isinstance(model_arch, (list, tuple)) and len(model_arch) > 0:
model_arch = model_arch[0]
assert model_arch, "read model architectures failed"
assert model_version, "read model version failed, please set args.version manually"
assert model_family, "read model family failed, please set args.version manually"
return model_arch, model_version, model_family, model_type
def load_tokenizer(tokenizer_dir: Optional[str] = None,
vocab_file: Optional[str] = None,
model_name: str = 'GPTForCausalLM',
model_version: Optional[str] = None,
tokenizer_type: Optional[str] = None):
'''
load tokenizer of model
args:
tokenizer_dir: tokenizer directory
vocab_file: vocabulary file, default None
model_name: model name
model_version: model version
tokenizer_type: Tokenizer type to be loaded.
'''
if vocab_file is None:
use_fast = True
if tokenizer_type == "llama":
use_fast = False
# Should set both padding_side and truncation_side to be 'left'
tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir,
legacy=False,
padding_side='left',
truncation_side='right',
trust_remote_code=True,
tokenizer_type=tokenizer_type,
use_fast=use_fast)
elif model_name == 'GemmaForCausalLM':
from transformers import GemmaTokenizer
# Initialize tokenizer from vocab file.
tokenizer = GemmaTokenizer(vocab_file=vocab_file, padding_side='left', truncation_side='left', legacy=False)
else:
# For gpt-next, directly load from tokenizer.model
tokenizer = T5Tokenizer(vocab_file=vocab_file, padding_side='left', truncation_side='left', legacy=False)
if model_name == 'QWenForCausalLM':
with open(Path(tokenizer_dir) / "generation_config.json") as f:
gen_config = json.load(f)
chat_format = gen_config['chat_format']
assert chat_format in ('raw','chatml'), f"unknown chat format: {chat_format}"
pad_id = gen_config['pad_token_id']
end_id = gen_config['eos_token_id']
elif model_name in ('ChatGLMForCausalLM', 'glm'):
pad_id = tokenizer.pad_token_id
end_id = tokenizer.eop_token_id
else:
if tokenizer.pad_token_id is None:
tokenizer.pad_token_id = tokenizer.eos_token_id
pad_id = tokenizer.pad_token_id
end_id = tokenizer.eos_token_id
try:
tokenizer.pad_token = tokenizer.eos_token
except Exception as e:
logger.warn(f"set pad_token with exception:{e}")
return tokenizer, pad_id, end_id
def merge_qkv_weight(named_parameters, weight_name, tp_size, q_proj_size, num_kv_head_replicas):
'''
merge tensor parallel qkv weight to none parallel q_weight, k_weight, v_weight.
merge_qkv weight and bias has the same logic
args:
named_parameters: parallel named parameters
weight_name: qkv layer weight name
tp_size: tensor parallel size
q_proj_size: query projection size
num_kv_head_replicas: number kv head replicas
'''
qkv_proj_size = named_parameters[0][weight_name].shape[0]
kv_proj_size = (qkv_proj_size - q_proj_size) // 2
splite_size = [q_proj_size, kv_proj_size, kv_proj_size]
q_weight_list = []
k_weight_list = []
v_weight_list = []
for rank in range(0, tp_size):
weight = named_parameters[rank][weight_name]
split_weight = torch.split(weight, splite_size, dim=0)
q_weight_list.append(split_weight[0])
if rank % num_kv_head_replicas == 0:
k_weight_list.append(split_weight[1])
v_weight_list.append(split_weight[2])
q_weight = torch.cat(q_weight_list, dim=0)
k_weight = torch.cat(k_weight_list, dim=0)
v_weight = torch.cat(v_weight_list, dim=0)
return q_weight, k_weight, v_weight
def merge_merged_weight(named_parameters, weight_name, tp_size, dim=0):
'''
merge merged linear layer weight to gate_weight and up_weight.
merge merged weight and bias has the same logic.
args:
named_parameters: parallel named parameters
weight_name: qkv layer weight name
tp_size: tensor parallel size
'''
up_weight_list = []
gate_weight_list = []
for rank in range(0, tp_size):
weight = named_parameters[rank][weight_name]
chunk_weights = torch.chunk(weight, 2, dim=dim)
up_weight_list.append(chunk_weights[0])
gate_weight_list.append(chunk_weights[1])
gate_weight = torch.cat(up_weight_list, dim=dim)
up_weight = torch.cat(gate_weight_list, dim=dim)
return gate_weight, up_weight
def convert_packed_qkv(q_weight, k_weight, v_weight, dim, args):
'''
convert packad qkv weight or bias
args:
q_weight: q weight or bias
k_weight: k weight or bias
v_weight: v_weight or bias
dim: convert dim
args: argument
'''
packed_qkv = torch.cat([q_weight, k_weight, v_weight], dim=dim)
is_n3sh, head_num, kv_head_num = get_qkv_distribution(args.model_type, args.model_version, args.hf_config)
if is_n3sh is True:
packed_qkv_shape = packed_qkv.shape
num_query_heads_per_kv_head = head_num // kv_head_num
q_shape = q_weight.shape
k_shape = k_weight.shape
v_shape = v_weight.shape
q = q_weight.view(q_shape[:dim] + (kv_head_num, num_query_heads_per_kv_head, -1) + q_shape[dim + 1:])
k = k_weight.view(k_shape[:dim] + (kv_head_num, 1, -1) + k_shape[dim + 1:])
v = v_weight.view(v_shape[:dim] + (kv_head_num, 1, -1) + v_shape[dim + 1:])
tensor_n3sh = torch.cat([q, k, v], dim=dim+1)
packed_qkv = tensor_n3sh.reshape(packed_qkv_shape)
return packed_qkv
def convert_to_merged_qkv_weight(layer_name, weight_name, bias_name, named_parameters, merged_named_parameters,
layer_range, merged_act_range, tp_size, args):
'''
convert parallel qkv named parameters to non parallel qkv named parameters
args:
layer_name: layer name
weight_name: weight name
bias_name: bias name
named_parameters: parallel hugging face named parameters
merged_named_parameters: non parallel hugging face named parameters
layer_range: parallel layer range info
merged_act_range: non parallel act range
tp_size: tensor parallel size
args: argument
'''
layer_name_parts = layer_name.split(".")
self_attn_layer_name = ".".join(layer_name_parts[:-1])
qkv_name = layer_name_parts[-1]
q_weight, k_weight, v_weight = merge_qkv_weight(named_parameters, weight_name, tp_size, layer_range["q_proj_size"],
layer_range["num_kv_head_replicas"])
qkv_list = smooth_model_config[args.model_type]["qkv_list"]
qkv_list_len = len(qkv_list)
if qkv_list_len == 3:
q_layer_name = f"{self_attn_layer_name}.{qkv_list[0]}"
k_layer_name = f"{self_attn_layer_name}.{qkv_list[1]}"
v_layer_name = f"{self_attn_layer_name}.{qkv_list[2]}"
elif qkv_list_len == 1:
qkv_layer_name = f"{self_attn_layer_name}.{qkv_list[0]}"
if qkv_list_len == 3:
merged_act_range[q_layer_name]["x"] = layer_range["x"]
merged_act_range[k_layer_name]["x"] = layer_range["x"]
merged_act_range[v_layer_name]["x"] = layer_range["x"]
merged_act_range[q_layer_name]["is_qkv"] = True
merged_act_range[k_layer_name]["is_qkv"] = True
merged_act_range[v_layer_name]["is_qkv"] = True
merged_named_parameters[f"{q_layer_name}.weight"] = q_weight
merged_named_parameters[f"{k_layer_name}.weight"] = k_weight
merged_named_parameters[f"{v_layer_name}.weight"] = v_weight
elif qkv_list_len == 1:
merged_act_range[qkv_layer_name]["x"] = layer_range["x"]
qkv_weight = convert_packed_qkv(q_weight, k_weight, v_weight, 0, args)
merged_named_parameters[f"{qkv_layer_name}.weight"] = qkv_weight
if bias_name in named_parameters[0]:
q_bias, k_bias, v_bias = merge_qkv_weight(named_parameters, bias_name, tp_size, layer_range["q_proj_size"],
layer_range["num_kv_head_replicas"])
if qkv_list_len == 3:
merged_named_parameters[f"{q_layer_name}.bias"] = q_bias
merged_named_parameters[f"{k_layer_name}.bias"] = k_bias
merged_named_parameters[f"{v_layer_name}.bias"] = v_bias
elif qkv_list_len == 1:
qkv_bias = convert_packed_qkv(q_bias, k_bias, v_bias, 0, args)
merged_named_parameters[f"{qkv_layer_name}.bias"] = qkv_bias
return qkv_name
def convert_to_merged_merged_weight(layer_name, weight_name, bias_name, named_parameters, merged_named_parameters,
layer_range, merged_act_range, tp_size, model_type):
'''
convert parallel merged named parameters to non parallel merged named parameters
args:
layer_name: layer name
weight_name: weight name
bias_name: bias name
named_parameters: parallel hugging face named parameters
merged_named_parameters: non parallel hugging face named parameters
layer_range: parallel layer range info
merged_act_range: non parallel act range
tp_size: tensor parallel size
model_type: model type
'''
layer_name_parts = layer_name.split(".")
mlp_layer_name = ".".join(layer_name_parts[:-1])
gate_weight, up_weight = merge_merged_weight(named_parameters, weight_name, tp_size)
gate_up_name = layer_name_parts[-1]
gate_up_list = smooth_model_config[model_type]["gate_up_list"]
gate_up_list_len = len(gate_up_list)
is_gate_up = smooth_model_config[model_type]["is_gate_up"]
if gate_up_list_len == 2:
gate_layer_name = f"{mlp_layer_name}.{gate_up_list[0]}"
up_layer_name = f"{mlp_layer_name}.{gate_up_list[1]}"
elif gate_up_list_len == 1:
gate_up_layer_name = f"{mlp_layer_name}.{gate_up_list[0]}"
if gate_up_list_len == 2:
merged_act_range[gate_layer_name]["x"] = layer_range["x"]
merged_act_range[up_layer_name]["x"] = layer_range["x"]
merged_act_range[gate_layer_name]["is_merge"] = True
merged_act_range[up_layer_name]["is_merge"] = True
merged_named_parameters[f"{gate_layer_name}.weight"] = gate_weight
merged_named_parameters[f"{up_layer_name}.weight"] = up_weight
elif gate_up_list_len == 1:
merged_act_range[gate_up_layer_name]["x"] = layer_range["x"]
merged_gate_up_weight_list = [gate_weight, up_weight] if is_gate_up is True else [up_weight, gate_weight]
merged_named_parameters[f"{gate_up_layer_name}.weight"] = torch.cat(merged_gate_up_weight_list, dim=0)
if bias_name in named_parameters[0]:
gate_bias, up_bias = merge_merged_weight(named_parameters, bias_name, tp_size)
if gate_up_list_len == 2:
merged_named_parameters[f"{gate_layer_name}.bias"] = gate_bias
merged_named_parameters[f"{up_layer_name}.bias"] = up_bias
elif gate_up_list_len == 1:
merged_gate_up_bias_list = [gate_bias, up_bias] if is_gate_up is True else [up_bias, gate_bias]
merged_named_parameters[f"{gate_up_layer_name}.bias"] = torch.cat(merged_gate_up_bias_list, dim=0)
return gate_up_name
def convert_to_col_weight_except_qkv_merged(layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range, tp_size):
'''
convert colum parallel named parameters to non parallel named parameters
args:
layer_name: layer name
weight_name: weight name
bias_name: bias name
named_parameters: parallel hugging face named parameters
merged_named_parameters: non parallel hugging face named parameters
layer_range: parallel layer range info
merged_act_range: non parallel act range
tp_size: tensor parallel size
'''
if layer_range['is_linear']:
merged_act_range[layer_name]["x"] = layer_range["x"]
merged_named_parameters[weight_name] = torch.cat(
[named_parameters[tp_id][weight_name] for tp_id in range(0, tp_size)], dim=0)
if bias_name in named_parameters[0]:
merged_named_parameters[bias_name] = torch.cat(
[named_parameters[tp_id][bias_name] for tp_id in range(0, tp_size)], dim=0)
def convert_to_row_weight(act_layer_name, act_range, layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range, tp_size):
'''
convert row parallel named parameters to non parallel named parameters
args:
act_layer_name: act layer name
act_range: parallel act_range
layer_name: layer name
weight_name: weight name
bias_name: bias name
named_parameters: parallel hugging face named parameters
merged_named_parameters: non parallel hugging face named parameters
layer_range: parallel layer range info
merged_act_range: non parallel act range
tp_size: tensor parallel size
'''
if layer_range['is_linear']:
if isinstance(layer_range['x'], torch.Tensor):
merged_act_range[layer_name]['x'] = torch.cat(
[act_range[tp_id][act_layer_name]['x'] for tp_id in range(0, tp_size)], dim=0)
else:
merged_act_range[layer_name]['x'] = None
merged_named_parameters[weight_name] = torch.cat(
[named_parameters[tp_id][weight_name] for tp_id in range(0, tp_size)], dim=1)
if bias_name in named_parameters[0]:
merged_named_parameters[bias_name] = named_parameters[0][bias_name]
def convert_to_layer_merged(act_layer_name, act_range, layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range, tp_size, args):
'''
convert parallel layer named parameters to non parallel layer named parameters
args:
act_layer_name: act layer name
act_range: parallel act_range
layer_name: layer name
weight_name: weight name
bias_name: bias name
named_parameters: parallel hugging face named parameters
merged_named_parameters: non parallel hugging face named parameters
layer_range: parallel layer range info
merged_act_range: non parallel act range
tp_size: tensor parallel size
'''
qkv_name = "qkv_proj"
gate_up_name = "gate_up_proj"
if layer_range['split'] == 'col': # col
# merge weight
if layer_range["is_qkv"]:
qkv_name = convert_to_merged_qkv_weight(layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range, tp_size,
args)
elif layer_range["is_merge"]:
gate_up_name = convert_to_merged_merged_weight(layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range,
tp_size, args.model_type)
else:
convert_to_col_weight_except_qkv_merged(layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range, tp_size)
else: # row
convert_to_row_weight(act_layer_name, act_range, layer_name, weight_name, bias_name, named_parameters,
merged_named_parameters, layer_range, merged_act_range, tp_size)
return qkv_name, gate_up_name
def collect_moe_experts_act_range_of_layer(merged_act_range, mlp_part_name, moe_list):
'''
collect moe experts act range in the same layer
'''
experts_of_gate_up_layer = {}
experts_of_down_layer = {}
gate_up_list = moe_list["gate_up_list"]
gate_up_list_len = len(gate_up_list)
down_list = moe_list["down_list"]
gate_up_layer_pattern = rf"{mlp_part_name}.experts\.\d+\.{gate_up_list[1]}"
gate_layer_pattern = rf"{mlp_part_name}.experts\.\d+\.{gate_up_list[2]}" if gate_up_list_len > 2 else None
down_layer_pattern = rf"{mlp_part_name}.experts\.\d+\.{down_list[1]}"
for key, value in merged_act_range.items():
if re.search(gate_up_layer_pattern, key) or (gate_layer_pattern is not None
and re.search(gate_layer_pattern, key)):
experts_of_gate_up_layer[key] = value
if re.search(down_layer_pattern, key):
experts_of_down_layer[key] = value
return experts_of_gate_up_layer, experts_of_down_layer
def convert_moe_expert_activation_fused(experts_of_layer, merged_act_range):
'''
fuse the moe expert act range in the same layer, and asign to these experts
'''
unfused_activation = []
for key, value in experts_of_layer.items():
if isinstance(value["x"], torch.Tensor):
unfused_activation.append(value['x'])
assert len(unfused_activation) > 0, f"unfused_activation len is zero, this is unsupported"
activation = torch.stack(unfused_activation, dim=0)
fused_activation = torch.max(activation, dim=0)[0]
for key, value in experts_of_layer.items():
if value["x"] is None or isinstance(value["x"], torch.Tensor):
value['x'] = fused_activation
def convert_moe_layer_activation_fused(merged_act_range, model_type):
'''
loop each layer and fuse the moe expert act range in the same layer, and asign to these experts
'''
moe_list = smooth_model_config[model_type]["moe_list"]
if moe_list is None:
return
mlp_name = moe_list["gate_up_list"][0].split(".")[0]
layer = 0
while True:
mlp_part_name = rf"\.{layer}\.{mlp_name}"
experts_of_gate_up_layer, experts_of_down_layer = collect_moe_experts_act_range_of_layer(
merged_act_range, mlp_part_name, moe_list)
# if experts_of_layer is empty, means layer equants to expert_num, the loop is finished
if len(experts_of_gate_up_layer) < 1 or len(experts_of_down_layer) < 1:
logger.info(f"the experts_num is {layer}")
break
convert_moe_expert_activation_fused(experts_of_gate_up_layer, merged_act_range)
convert_moe_expert_activation_fused(experts_of_down_layer, merged_act_range)
layer += 1
def should_include(key, parameters, exclude_names):
'''
key shouldnot include in parameters and exlude_names
args:
parameters: named parameters
exclude_names: excluded nameds list
'''
return key not in parameters and not any(exclude_name in key for exclude_name in exclude_names)
def valid_act_range(act_layer_name, layer_range):
'''
valid act_range, mainly filter inf, nan or zero values in x field
args:
act_layer_name: act layer name
layer_range: act layer value
'''
act_range_x = layer_range["x"]
if act_range_x is not None and isinstance(act_range_x, torch.Tensor):
mask = torch.isinf(act_range_x) | torch.isnan(act_range_x) | (act_range_x == 0)
if torch.any(mask).item():
act_range_x[mask] = 1e-6
logger.warning(f"act_range_x in layer:{act_layer_name} has nan, inf or zero values, force to 1e-6")
def convert_to_merged(act_range, named_parameters, tp_size, args):
'''
convert parallel act_range and named parameters to non parallel format.
args:
act_range: parallel act_range
named_parameters: parallel named parameters
tp_size: tensor parallel size
args: argument
'''
model_type = args.model_type
merged_act_range = defaultdict(lambda: {"x": None, "is_qkv": False, "is_merge": False,})
merged_named_parameters = {}
input_id_list = []
exclude_names = set()
for act_layer_name, layer_range in act_range[0].items():
valid_act_range(act_layer_name, layer_range)
layer_name, weight_name, bias_name = get_layer_weight_bias_name(model_type, act_layer_name)
# when tie_word_embeddings is True, lm_head use embeding weight
if args.tie_word_embeddings is True and "lm_head" in layer_name:
continue
qkv_name, gate_up_name = convert_to_layer_merged(act_layer_name, act_range, layer_name, weight_name, bias_name,
named_parameters, merged_named_parameters, layer_range,
merged_act_range, tp_size, args)
exclude_names.update({qkv_name, gate_up_name})
if layer_range['split'] == 'col' and layer_range["is_qkv"] and len(layer_range["input_id"]) > 0:
input_id_list = layer_range["input_id"]
if args.use_smoothquant and args.disable_fused_quantize_expert is False:
convert_moe_layer_activation_fused(merged_act_range, model_type)
merged_named_parameters.update({
key: value
for key, value in named_parameters[0].items()
if should_include(key, merged_named_parameters, exclude_names)
})
modify_layer_weight_bias_name(model_type, merged_named_parameters)
sorted_named_parameters = OrderedDict(sorted(merged_named_parameters.items(), key=lambda item: item[0]))
sorted_merged_act_range = OrderedDict(sorted(merged_act_range.items(), key=lambda item: item[0]))
return sorted_merged_act_range, sorted_named_parameters, input_id_list
def copy_files_except_extensions(input_dir, output_dir, extensions):
'''
copy python files with extension in extensions from input_dir to output_dir, and keey sub directory is same
args:
input_dir: input directory
output_dir: output directory
extensions: the copy files extension
'''
# 遍历输入目录及其子目录
for root, dirs, files in os.walk(input_dir):
# 计算相对路径
rel_path = os.path.relpath(root, input_dir)
if len(rel_path) > 1 and rel_path.startswith('.'):
continue
# 构建目标目录路径
dst_dir = os.path.join(output_dir, rel_path)
# 确保目标目录存在
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
for file in files:
if not any(file.endswith(ext) for ext in extensions) and not file.startswith('.'):
# 构建源文件和目标文件的完整路径
src_file = os.path.join(root, file)
dst_file = os.path.join(dst_dir, file)
# 复制文件
shutil.copy2(src_file, dst_file)
logger.info(f'Copied {src_file} to {dst_file}')
def cleanup():
'''
cleanup memory resource
'''
gc.collect()
if not current_platform.is_cpu():
torch.cuda.empty_cache()
def vllm_cleanup(llm):
"""Release occupied resources and reset parallel_state"""
del llm
from vllm.distributed.parallel_state import destroy_model_parallel, destroy_distributed_environment
destroy_model_parallel()
destroy_distributed_environment()
import contextlib
with contextlib.suppress(AssertionError):
torch.distributed.destroy_process_group()
import ray
if ray.is_initialized():
ray.shutdown()
logger.info('llm and distributed env is cleanup')
def generate_datetime():
'''
generate current datetime
'''
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
return formatted_datetime
def get_hf_config_sliding_window(hf_text_config) -> Optional[int]:
"""Get the sliding window size, or None if disabled."""
# Some models, like Qwen2 and Qwen1.5, use `use_sliding_window` in
# addition to sliding window size. We check if that field is present
# and if it's False, return None.
if (hasattr(hf_text_config, "use_sliding_window")
and not hf_text_config.use_sliding_window):
return None
return getattr(hf_text_config, "sliding_window", None)
def get_skip_patterns(model_type):
"""Get the skip patterns from model config."""
config = smooth_model_config[model_type]
return config["skip_patterns"] if "skip_patterns" in config else []
def should_skip(model_type, weight_name):
"""judge if the weight should be skipped."""
skip_patterns = get_skip_patterns(model_type)
for pattern in skip_patterns:
import re
if re.match(pattern, weight_name):
return True
return False