Files
zhousha 55a67e817e update
2025-08-06 15:38:55 +08:00

152 lines
4.8 KiB
Python

import json
import os
import shutil
import tarfile
import tempfile
import zipfile
from typing import Any
import yaml
def load_json(path: str, raise_for_invalid: bool = False) -> Any:
"""读取path json文件转为对象"""
with open(path, "r", encoding="utf-8") as f:
if raise_for_invalid:
def parse_constant(s: str):
raise ValueError("非法json字符: %s" % s)
return json.load(f, parse_constant=parse_constant)
return json.load(f)
def dump_json(path: str, obj: Any):
"""将obj对象以json形式写入path文件"""
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=4)
def load_yaml(path: str) -> Any:
"""读取path yaml文件转为对象"""
with open(path, "r", encoding="utf-8") as f:
return yaml.full_load(f)
def dump_yaml(path: str, obj: Any):
"""将obj对象以yaml形式写入path文件"""
with open(path, "w", encoding="utf-8") as f:
yaml.dump(obj, f, indent=2, allow_unicode=True, sort_keys=False, line_break="\n")
def dumps_yaml(obj: Any) -> str:
"""将obj对象以yaml形式导出为字符串"""
return yaml.dump(obj, indent=2, allow_unicode=True, sort_keys=False, line_break="\n")
def read_file(path: str) -> str:
"""读取文件为str"""
with open(path, "r") as f:
return f.read()
def write_bfile(path: str, data: bytes):
"""将bytes data写入path文件"""
with open(path, "wb") as f:
f.write(data)
def write_file(path: str, data: str):
"""将str data写入path文件"""
with open(path, "w") as f:
f.write(data)
def tail_file(path: str, tail: int) -> str:
"""倍增获取文件path最后tail行"""
block = 1024
with open(path, "rb") as f:
f.seek(0, 2)
filesize = f.tell()
while True:
if filesize < block:
block = filesize
f.seek(filesize - block, 0)
lines = f.readlines()
if len(lines) > tail or filesize <= block:
return "".join(line.decode() for line in lines[-tail:])
block *= 2
def zip_dir(zip_path: str, dirname: str):
"""将dirname制作为zip_path压缩包"""
with zipfile.ZipFile(zip_path, "w") as ziper:
for path, _, files in os.walk(dirname):
for file in files:
ziper.write(
os.path.join(path, file), os.path.join(path.removeprefix(dirname), file), zipfile.ZIP_DEFLATED
)
def zip_files(name: str, zipfile_paths: list):
"""将zipfiles_paths=list[文件名, 文件路径]制作为name压缩包"""
with zipfile.ZipFile(name, "w") as ziper:
for arcname, zipfile_path in zipfile_paths:
ziper.write(zipfile_path, arcname, zipfile.ZIP_DEFLATED)
def zip_strs(name: str, zipfile_strs: list):
"""将zipfile_strs=list[文件名, 内容]制作为name压缩包"""
with zipfile.ZipFile(name, "w") as ziper:
for filename, content in zipfile_strs:
ziper.writestr(filename, content)
def zip_zipers(name: str, ziper_paths: list):
"""将ziper_paths=list[压缩后名称, 压缩包/文件位置]制作为name压缩包"""
temp_dirname = tempfile.mkdtemp(prefix=name, dir=os.path.dirname(name))
os.makedirs(temp_dirname, exist_ok=True)
for subname, ziper_path in ziper_paths:
sub_dirname = os.path.join(temp_dirname, subname)
if not os.path.exists(ziper_path):
continue
if zipfile.is_zipfile(ziper_path):
# 压缩包解压
os.makedirs(sub_dirname, exist_ok=True)
unzip_dir(ziper_path, sub_dirname)
elif os.path.isfile(ziper_path):
# 文件
shutil.copyfile(ziper_path, sub_dirname)
else:
# 文件夹
shutil.copytree(ziper_path, sub_dirname)
zip_dir(name, temp_dirname)
shutil.rmtree(temp_dirname)
def unzip_dir(zip_path: str, dirname: str, catch_exc: bool = True):
"""将zip_path解压到dirname"""
with zipfile.ZipFile(zip_path, "r") as ziper:
try:
ziper.extractall(dirname)
except Exception as e:
if catch_exc:
write_file(os.path.join(dirname, "unzip_error.log"), "%r" % e)
shutil.copyfile(zip_path, os.path.join(dirname, os.path.basename(zip_path)))
else:
raise e
def tar_dir(zip_path: str, dirname: str):
"""将dirname压缩到zip_path"""
with tarfile.open(zip_path, "w:gz") as ziper:
for path, _, files in os.walk(dirname):
for file in files:
ziper.add(os.path.join(path, file), os.path.join(path.removeprefix(dirname), file))
def untar_dir(zip_path: str, dirname: str):
"""将zip_path解压到dirname"""
with tarfile.open(zip_path) as ziper:
ziper.extractall(dirname)