update
This commit is contained in:
151
utils/file.py
Normal file
151
utils/file.py
Normal file
@@ -0,0 +1,151 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tarfile
|
||||
import tempfile
|
||||
import zipfile
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def load_json(path: str, raise_for_invalid: bool = False) -> Any:
|
||||
"""读取path json文件转为对象"""
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
if raise_for_invalid:
|
||||
|
||||
def parse_constant(s: str):
|
||||
raise ValueError("非法json字符: %s" % s)
|
||||
|
||||
return json.load(f, parse_constant=parse_constant)
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def dump_json(path: str, obj: Any):
|
||||
"""将obj对象以json形式写入path文件"""
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(obj, f, ensure_ascii=False, indent=4)
|
||||
|
||||
|
||||
def load_yaml(path: str) -> Any:
|
||||
"""读取path yaml文件转为对象"""
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return yaml.full_load(f)
|
||||
|
||||
|
||||
def dump_yaml(path: str, obj: Any):
|
||||
"""将obj对象以yaml形式写入path文件"""
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
yaml.dump(obj, f, indent=2, allow_unicode=True, sort_keys=False, line_break="\n")
|
||||
|
||||
|
||||
def dumps_yaml(obj: Any) -> str:
|
||||
"""将obj对象以yaml形式导出为字符串"""
|
||||
return yaml.dump(obj, indent=2, allow_unicode=True, sort_keys=False, line_break="\n")
|
||||
|
||||
|
||||
def read_file(path: str) -> str:
|
||||
"""读取文件为str"""
|
||||
with open(path, "r") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def write_bfile(path: str, data: bytes):
|
||||
"""将bytes data写入path文件"""
|
||||
with open(path, "wb") as f:
|
||||
f.write(data)
|
||||
|
||||
|
||||
def write_file(path: str, data: str):
|
||||
"""将str data写入path文件"""
|
||||
with open(path, "w") as f:
|
||||
f.write(data)
|
||||
|
||||
|
||||
def tail_file(path: str, tail: int) -> str:
|
||||
"""倍增获取文件path最后tail行"""
|
||||
block = 1024
|
||||
with open(path, "rb") as f:
|
||||
f.seek(0, 2)
|
||||
filesize = f.tell()
|
||||
while True:
|
||||
if filesize < block:
|
||||
block = filesize
|
||||
f.seek(filesize - block, 0)
|
||||
lines = f.readlines()
|
||||
if len(lines) > tail or filesize <= block:
|
||||
return "".join(line.decode() for line in lines[-tail:])
|
||||
block *= 2
|
||||
|
||||
|
||||
def zip_dir(zip_path: str, dirname: str):
|
||||
"""将dirname制作为zip_path压缩包"""
|
||||
with zipfile.ZipFile(zip_path, "w") as ziper:
|
||||
for path, _, files in os.walk(dirname):
|
||||
for file in files:
|
||||
ziper.write(
|
||||
os.path.join(path, file), os.path.join(path.removeprefix(dirname), file), zipfile.ZIP_DEFLATED
|
||||
)
|
||||
|
||||
|
||||
def zip_files(name: str, zipfile_paths: list):
|
||||
"""将zipfiles_paths=list[文件名, 文件路径]制作为name压缩包"""
|
||||
with zipfile.ZipFile(name, "w") as ziper:
|
||||
for arcname, zipfile_path in zipfile_paths:
|
||||
ziper.write(zipfile_path, arcname, zipfile.ZIP_DEFLATED)
|
||||
|
||||
|
||||
def zip_strs(name: str, zipfile_strs: list):
|
||||
"""将zipfile_strs=list[文件名, 内容]制作为name压缩包"""
|
||||
with zipfile.ZipFile(name, "w") as ziper:
|
||||
for filename, content in zipfile_strs:
|
||||
ziper.writestr(filename, content)
|
||||
|
||||
|
||||
def zip_zipers(name: str, ziper_paths: list):
|
||||
"""将ziper_paths=list[压缩后名称, 压缩包/文件位置]制作为name压缩包"""
|
||||
temp_dirname = tempfile.mkdtemp(prefix=name, dir=os.path.dirname(name))
|
||||
os.makedirs(temp_dirname, exist_ok=True)
|
||||
for subname, ziper_path in ziper_paths:
|
||||
sub_dirname = os.path.join(temp_dirname, subname)
|
||||
if not os.path.exists(ziper_path):
|
||||
continue
|
||||
if zipfile.is_zipfile(ziper_path):
|
||||
# 压缩包解压
|
||||
os.makedirs(sub_dirname, exist_ok=True)
|
||||
unzip_dir(ziper_path, sub_dirname)
|
||||
elif os.path.isfile(ziper_path):
|
||||
# 文件
|
||||
shutil.copyfile(ziper_path, sub_dirname)
|
||||
else:
|
||||
# 文件夹
|
||||
shutil.copytree(ziper_path, sub_dirname)
|
||||
zip_dir(name, temp_dirname)
|
||||
shutil.rmtree(temp_dirname)
|
||||
|
||||
|
||||
def unzip_dir(zip_path: str, dirname: str, catch_exc: bool = True):
|
||||
"""将zip_path解压到dirname"""
|
||||
with zipfile.ZipFile(zip_path, "r") as ziper:
|
||||
try:
|
||||
ziper.extractall(dirname)
|
||||
except Exception as e:
|
||||
if catch_exc:
|
||||
write_file(os.path.join(dirname, "unzip_error.log"), "%r" % e)
|
||||
shutil.copyfile(zip_path, os.path.join(dirname, os.path.basename(zip_path)))
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
||||
def tar_dir(zip_path: str, dirname: str):
|
||||
"""将dirname压缩到zip_path"""
|
||||
with tarfile.open(zip_path, "w:gz") as ziper:
|
||||
for path, _, files in os.walk(dirname):
|
||||
for file in files:
|
||||
ziper.add(os.path.join(path, file), os.path.join(path.removeprefix(dirname), file))
|
||||
|
||||
|
||||
def untar_dir(zip_path: str, dirname: str):
|
||||
"""将zip_path解压到dirname"""
|
||||
with tarfile.open(zip_path) as ziper:
|
||||
ziper.extractall(dirname)
|
||||
Reference in New Issue
Block a user