152 lines
4.8 KiB
Python
152 lines
4.8 KiB
Python
import json
|
|
import os
|
|
import shutil
|
|
import tarfile
|
|
import tempfile
|
|
import zipfile
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
def load_json(path: str, raise_for_invalid: bool = False) -> Any:
|
|
"""读取path json文件转为对象"""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
if raise_for_invalid:
|
|
|
|
def parse_constant(s: str):
|
|
raise ValueError("非法json字符: %s" % s)
|
|
|
|
return json.load(f, parse_constant=parse_constant)
|
|
return json.load(f)
|
|
|
|
|
|
def dump_json(path: str, obj: Any):
|
|
"""将obj对象以json形式写入path文件"""
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(obj, f, ensure_ascii=False, indent=4)
|
|
|
|
|
|
def load_yaml(path: str) -> Any:
|
|
"""读取path yaml文件转为对象"""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return yaml.full_load(f)
|
|
|
|
|
|
def dump_yaml(path: str, obj: Any):
|
|
"""将obj对象以yaml形式写入path文件"""
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
yaml.dump(obj, f, indent=2, allow_unicode=True, sort_keys=False, line_break="\n")
|
|
|
|
|
|
def dumps_yaml(obj: Any) -> str:
|
|
"""将obj对象以yaml形式导出为字符串"""
|
|
return yaml.dump(obj, indent=2, allow_unicode=True, sort_keys=False, line_break="\n")
|
|
|
|
|
|
def read_file(path: str) -> str:
|
|
"""读取文件为str"""
|
|
with open(path, "r") as f:
|
|
return f.read()
|
|
|
|
|
|
def write_bfile(path: str, data: bytes):
|
|
"""将bytes data写入path文件"""
|
|
with open(path, "wb") as f:
|
|
f.write(data)
|
|
|
|
|
|
def write_file(path: str, data: str):
|
|
"""将str data写入path文件"""
|
|
with open(path, "w") as f:
|
|
f.write(data)
|
|
|
|
|
|
def tail_file(path: str, tail: int) -> str:
|
|
"""倍增获取文件path最后tail行"""
|
|
block = 1024
|
|
with open(path, "rb") as f:
|
|
f.seek(0, 2)
|
|
filesize = f.tell()
|
|
while True:
|
|
if filesize < block:
|
|
block = filesize
|
|
f.seek(filesize - block, 0)
|
|
lines = f.readlines()
|
|
if len(lines) > tail or filesize <= block:
|
|
return "".join(line.decode() for line in lines[-tail:])
|
|
block *= 2
|
|
|
|
|
|
def zip_dir(zip_path: str, dirname: str):
|
|
"""将dirname制作为zip_path压缩包"""
|
|
with zipfile.ZipFile(zip_path, "w") as ziper:
|
|
for path, _, files in os.walk(dirname):
|
|
for file in files:
|
|
ziper.write(
|
|
os.path.join(path, file), os.path.join(path.removeprefix(dirname), file), zipfile.ZIP_DEFLATED
|
|
)
|
|
|
|
|
|
def zip_files(name: str, zipfile_paths: list):
|
|
"""将zipfiles_paths=list[文件名, 文件路径]制作为name压缩包"""
|
|
with zipfile.ZipFile(name, "w") as ziper:
|
|
for arcname, zipfile_path in zipfile_paths:
|
|
ziper.write(zipfile_path, arcname, zipfile.ZIP_DEFLATED)
|
|
|
|
|
|
def zip_strs(name: str, zipfile_strs: list):
|
|
"""将zipfile_strs=list[文件名, 内容]制作为name压缩包"""
|
|
with zipfile.ZipFile(name, "w") as ziper:
|
|
for filename, content in zipfile_strs:
|
|
ziper.writestr(filename, content)
|
|
|
|
|
|
def zip_zipers(name: str, ziper_paths: list):
|
|
"""将ziper_paths=list[压缩后名称, 压缩包/文件位置]制作为name压缩包"""
|
|
temp_dirname = tempfile.mkdtemp(prefix=name, dir=os.path.dirname(name))
|
|
os.makedirs(temp_dirname, exist_ok=True)
|
|
for subname, ziper_path in ziper_paths:
|
|
sub_dirname = os.path.join(temp_dirname, subname)
|
|
if not os.path.exists(ziper_path):
|
|
continue
|
|
if zipfile.is_zipfile(ziper_path):
|
|
# 压缩包解压
|
|
os.makedirs(sub_dirname, exist_ok=True)
|
|
unzip_dir(ziper_path, sub_dirname)
|
|
elif os.path.isfile(ziper_path):
|
|
# 文件
|
|
shutil.copyfile(ziper_path, sub_dirname)
|
|
else:
|
|
# 文件夹
|
|
shutil.copytree(ziper_path, sub_dirname)
|
|
zip_dir(name, temp_dirname)
|
|
shutil.rmtree(temp_dirname)
|
|
|
|
|
|
def unzip_dir(zip_path: str, dirname: str, catch_exc: bool = True):
|
|
"""将zip_path解压到dirname"""
|
|
with zipfile.ZipFile(zip_path, "r") as ziper:
|
|
try:
|
|
ziper.extractall(dirname)
|
|
except Exception as e:
|
|
if catch_exc:
|
|
write_file(os.path.join(dirname, "unzip_error.log"), "%r" % e)
|
|
shutil.copyfile(zip_path, os.path.join(dirname, os.path.basename(zip_path)))
|
|
else:
|
|
raise e
|
|
|
|
|
|
def tar_dir(zip_path: str, dirname: str):
|
|
"""将dirname压缩到zip_path"""
|
|
with tarfile.open(zip_path, "w:gz") as ziper:
|
|
for path, _, files in os.walk(dirname):
|
|
for file in files:
|
|
ziper.add(os.path.join(path, file), os.path.join(path.removeprefix(dirname), file))
|
|
|
|
|
|
def untar_dir(zip_path: str, dirname: str):
|
|
"""将zip_path解压到dirname"""
|
|
with tarfile.open(zip_path) as ziper:
|
|
ziper.extractall(dirname)
|