auto_build_launcher/utils/file_utils.py

363 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
import zipfile
import tarfile
import gzip
import fnmatch
from typing import Union, List, Optional
from pathlib import Path
class FileUtils:
"""
文件操作工具类
功能:
1. 文件/文件夹拷贝
2. 文件/文件夹删除
3. 文件/文件夹压缩 (zip, tar, gz)
4. 文件/文件夹解压
5. 文件查找
6. 文件校验
"""
@staticmethod
def copy(src: Union[str, Path], dst: Union[str, Path],
overwrite: bool = False, ignore_patterns: Optional[List[str]] = None) -> bool:
"""
拷贝文件或文件夹
:param src: 源路径
:param dst: 目标路径
:param overwrite: 是否覆盖已存在文件
:param ignore_patterns: 忽略的文件模式列表 (如 ['*.tmp', '*.log'])
:return: 是否成功
"""
src, dst = Path(src), Path(dst)
def _ignore(path, names):
ignored = set()
if ignore_patterns:
for pattern in ignore_patterns:
ignored.update(fnmatch.filter(names, pattern))
return ignored
try:
if src.is_file():
if dst.exists():
if not overwrite:
return False
if dst.is_dir():
dst = dst / src.name
shutil.copy2(src, dst)
elif src.is_dir():
if dst.exists() and not overwrite:
return False
shutil.copytree(src, dst, ignore=_ignore if ignore_patterns else None,
dirs_exist_ok=overwrite)
return True
except Exception as e:
print(f"拷贝失败: {e}")
return False
@staticmethod
def delete(path: Union[str, Path], recursive: bool = False) -> bool:
"""
删除文件或文件夹
:param path: 要删除的路径
:param recursive: 是否递归删除文件夹
:return: 是否成功
"""
path = Path(path)
try:
if path.is_file():
path.unlink()
elif path.is_dir():
if recursive:
shutil.rmtree(path)
else:
path.rmdir()
return True
except Exception as e:
print(f"删除失败: {e}")
return False
@staticmethod
def compress(
src: Union[str, Path, List[Union[str, Path]]],
dst: Union[str, Path],
fmt: str = 'zip',
compression_level: int = 6
) -> bool:
"""
压缩文件或文件夹
:param src: 源路径(单个或多个)
:param dst: 目标压缩文件路径
:param fmt: 压缩格式 (zip, tar, gz)
:param compression_level: 压缩级别 (1-9)
:return: 是否成功
"""
src_list = [src] if not isinstance(src, list) else src
src_list = [Path(s) for s in src_list]
dst = Path(dst)
try:
if fmt == 'zip':
with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression_level) as zf:
for src_item in src_list:
if src_item.is_file():
zf.write(src_item, src_item.name)
elif src_item.is_dir():
for root, _, files in os.walk(src_item):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(src_item.parent)
zf.write(file_path, arcname)
elif fmt == 'tar':
with tarfile.open(dst, 'w:gz') as tf:
for src_item in src_list:
if src_item.is_file():
tf.add(src_item, arcname=src_item.name)
elif src_item.is_dir():
tf.add(src_item, arcname=src_item.name)
elif fmt == 'gz':
if len(src_list) > 1:
raise ValueError("gz格式只支持压缩单个文件")
with open(src_list[0], 'rb') as f_in:
with gzip.open(dst, 'wb', compresslevel=compression_level) as f_out:
shutil.copyfileobj(f_in, f_out)
else:
raise ValueError(f"不支持的压缩格式: {fmt}")
return True
except Exception as e:
print(f"压缩失败: {e}")
return False
@staticmethod
def decompress(
src: Union[str, Path],
dst: Union[str, Path] = None,
fmt: str = None
) -> bool:
"""
解压文件
:param src: 压缩文件路径
:param dst: 解压目标路径 (默认为当前目录)
:param fmt: 压缩格式 (自动检测如果为None)
:return: 是否成功
"""
src = Path(src)
dst = Path(dst) if dst else Path.cwd()
# 自动检测格式
if fmt is None:
if src.suffix == '.zip':
fmt = 'zip'
elif src.suffix == '.tar' or src.suffixes[-2:] == ['.tar', '.gz']:
fmt = 'tar'
elif src.suffix == '.gz':
fmt = 'gz'
else:
raise ValueError("无法自动识别压缩格式请指定fmt参数")
try:
dst.mkdir(parents=True, exist_ok=True)
if fmt == 'zip':
with zipfile.ZipFile(src, 'r') as zf:
zf.extractall(dst)
elif fmt == 'tar':
with tarfile.open(src, 'r:*') as tf:
tf.extractall(dst)
elif fmt == 'gz':
with gzip.open(src, 'rb') as f_in:
output_path = dst / src.stem
with open(output_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
raise ValueError(f"不支持的压缩格式: {fmt}")
return True
except Exception as e:
print(f"解压失败: {e}")
return False
@staticmethod
def find_files(
root: Union[str, Path],
pattern: str = '*',
recursive: bool = True
) -> List[Path]:
"""
查找文件
:param root: 搜索根目录
:param pattern: 文件名模式 (如 '*.txt')
:param recursive: 是否递归搜索
:return: 匹配的文件路径列表
"""
root = Path(root)
matches = []
if recursive:
for path in root.rglob(pattern):
if path.is_file():
matches.append(path)
else:
for path in root.glob(pattern):
if path.is_file():
matches.append(path)
return matches
@staticmethod
def calculate_size(path: Union[str, Path]) -> int:
"""
计算文件或文件夹大小(字节)
:param path: 路径
:return: 大小(字节)
"""
path = Path(path)
if path.is_file():
return path.stat().st_size
elif path.is_dir():
return sum(f.stat().st_size for f in path.rglob('*') if f.is_file())
return 0
@staticmethod
def compare_files(
file1: Union[str, Path],
file2: Union[str, Path],
chunk_size: int = 8192
) -> bool:
"""
比较两个文件内容是否相同
:param file1: 文件1路径
:param file2: 文件2路径
:param chunk_size: 读取块大小
:return: 是否相同
"""
file1, file2 = Path(file1), Path(file2)
if file1.stat().st_size != file2.stat().st_size:
return False
with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
while True:
b1 = f1.read(chunk_size)
b2 = f2.read(chunk_size)
if b1 != b2:
return False
if not b1:
return True
@staticmethod
def get_md5(file_path: Union[str, Path], chunk_size: int = 8192) -> str:
"""
计算文件的MD5哈希值
:param file_path: 文件路径
:param chunk_size: 读取块大小
:return: MD5哈希值
"""
import hashlib
file_path = Path(file_path)
md5 = hashlib.md5()
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
md5.update(chunk)
return md5.hexdigest()
@staticmethod
def move(
src: Union[str, Path],
dst: Union[str, Path],
overwrite: bool = False,
ignore_patterns: Optional[List[str]] = None,
create_parents: bool = True
) -> Optional[bool]:
"""
移动文件或文件夹
:param create_parents:
:param src: 源路径
:param dst: 目标路径
:param overwrite: 是否覆盖已存在文件
:param ignore_patterns: 忽略的文件模式列表 (如 ['*.tmp', '*.log'])
:return: 是否成功
示例:
>>> FileUtils.move('a.txt', 'dir/b.txt') # 移动并重命名文件
>>> FileUtils.move('dir1', 'dir2') # 移动文件夹
"""
src, dst = Path(src), Path(dst)
try:
# 处理目标已存在的情况
if dst.exists():
if not overwrite:
return False
FileUtils.delete(dst) # 先删除目标
if create_parents:
if src.is_file():
dst.parent.mkdir(parents=True, exist_ok=True)
elif src.is_dir():
dst.mkdir(parents=True, exist_ok=True)
# 移动文件
if src.is_file():
shutil.move(str(src), str(dst))
return True
# 移动文件夹(带忽略模式)
elif src.is_dir():
# 先拷贝再删除源目录
if FileUtils.copy(src, dst, overwrite, ignore_patterns):
FileUtils.delete(src, recursive=True)
return True
return False
except Exception as e:
print(f"移动失败: {e}")
return False
# 使用示例
if __name__ == "__main__":
# 1. 拷贝示例
FileUtils.copy('source.txt', 'backup.txt')
FileUtils.copy('mydir', 'mydir_backup', ignore_patterns=['*.tmp'])
# 2. 删除示例
FileUtils.delete('backup.txt')
FileUtils.delete('mydir_backup', recursive=True)
# 3. 压缩示例
FileUtils.compress('mydir', 'mydir.zip')
FileUtils.compress(['file1.txt', 'file2.txt'], 'files.tar', fmt='tar')
# 4. 解压示例
FileUtils.decompress('mydir.zip', 'extracted')
# 5. 查找文件示例
txt_files = FileUtils.find_files('.', '*.txt')
print(f"找到的文本文件: {txt_files}")
# 6. 计算大小示例
size = FileUtils.calculate_size('mydir')
print(f"文件夹大小: {size} 字节")
# 7. 比较文件示例
same = FileUtils.compare_files('file1.txt', 'file2.txt')
print(f"文件是否相同: {same}")
# 8. 计算MD5示例
md5 = FileUtils.get_md5('file1.txt')
print(f"文件MD5: {md5}")