import json import os import shutil import tarfile import tempfile import zipfile from typing import Any import yaml def load_json(path: str, raise_for_invalid: bool = False) -> Any: """读取path json文件转为对象""" with open(path, "r", encoding="utf-8") as f: if raise_for_invalid: def parse_constant(s: str): raise ValueError("非法json字符: %s" % s) return json.load(f, parse_constant=parse_constant) return json.load(f) def dump_json(path: str, obj: Any): """将obj对象以json形式写入path文件""" with open(path, "w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=4) def load_yaml(path: str) -> Any: """读取path yaml文件转为对象""" with open(path, "r", encoding="utf-8") as f: return yaml.full_load(f) def dump_yaml(path: str, obj: Any): """将obj对象以yaml形式写入path文件""" with open(path, "w", encoding="utf-8") as f: yaml.dump(obj, f, indent=2, allow_unicode=True, sort_keys=False, line_break="\n") def dumps_yaml(obj: Any) -> str: """将obj对象以yaml形式导出为字符串""" return yaml.dump(obj, indent=2, allow_unicode=True, sort_keys=False, line_break="\n") def read_file(path: str) -> str: """读取文件为str""" with open(path, "r") as f: return f.read() def write_bfile(path: str, data: bytes): """将bytes data写入path文件""" with open(path, "wb") as f: f.write(data) def write_file(path: str, data: str): """将str data写入path文件""" with open(path, "w") as f: f.write(data) def tail_file(path: str, tail: int) -> str: """倍增获取文件path最后tail行""" block = 1024 with open(path, "rb") as f: f.seek(0, 2) filesize = f.tell() while True: if filesize < block: block = filesize f.seek(filesize - block, 0) lines = f.readlines() if len(lines) > tail or filesize <= block: return "".join(line.decode() for line in lines[-tail:]) block *= 2 def zip_dir(zip_path: str, dirname: str): """将dirname制作为zip_path压缩包""" with zipfile.ZipFile(zip_path, "w") as ziper: for path, _, files in os.walk(dirname): for file in files: ziper.write( os.path.join(path, file), os.path.join(path.removeprefix(dirname), file), zipfile.ZIP_DEFLATED ) def zip_files(name: str, zipfile_paths: list): """将zipfiles_paths=list[文件名, 文件路径]制作为name压缩包""" with zipfile.ZipFile(name, "w") as ziper: for arcname, zipfile_path in zipfile_paths: ziper.write(zipfile_path, arcname, zipfile.ZIP_DEFLATED) def zip_strs(name: str, zipfile_strs: list): """将zipfile_strs=list[文件名, 内容]制作为name压缩包""" with zipfile.ZipFile(name, "w") as ziper: for filename, content in zipfile_strs: ziper.writestr(filename, content) def zip_zipers(name: str, ziper_paths: list): """将ziper_paths=list[压缩后名称, 压缩包/文件位置]制作为name压缩包""" temp_dirname = tempfile.mkdtemp(prefix=name, dir=os.path.dirname(name)) os.makedirs(temp_dirname, exist_ok=True) for subname, ziper_path in ziper_paths: sub_dirname = os.path.join(temp_dirname, subname) if not os.path.exists(ziper_path): continue if zipfile.is_zipfile(ziper_path): # 压缩包解压 os.makedirs(sub_dirname, exist_ok=True) unzip_dir(ziper_path, sub_dirname) elif os.path.isfile(ziper_path): # 文件 shutil.copyfile(ziper_path, sub_dirname) else: # 文件夹 shutil.copytree(ziper_path, sub_dirname) zip_dir(name, temp_dirname) shutil.rmtree(temp_dirname) def unzip_dir(zip_path: str, dirname: str, catch_exc: bool = True): """将zip_path解压到dirname""" with zipfile.ZipFile(zip_path, "r") as ziper: try: ziper.extractall(dirname) except Exception as e: if catch_exc: write_file(os.path.join(dirname, "unzip_error.log"), "%r" % e) shutil.copyfile(zip_path, os.path.join(dirname, os.path.basename(zip_path))) else: raise e def tar_dir(zip_path: str, dirname: str): """将dirname压缩到zip_path""" with tarfile.open(zip_path, "w:gz") as ziper: for path, _, files in os.walk(dirname): for file in files: ziper.add(os.path.join(path, file), os.path.join(path.removeprefix(dirname), file)) def untar_dir(zip_path: str, dirname: str): """将zip_path解压到dirname""" with tarfile.open(zip_path) as ziper: ziper.extractall(dirname)