import os import shutil import zipfile import tarfile import gzip import fnmatch from typing import Union, List, Optional from pathlib import Path class FileUtils: """ 文件操作工具类 功能: 1. 文件/文件夹拷贝 2. 文件/文件夹删除 3. 文件/文件夹压缩 (zip, tar, gz) 4. 文件/文件夹解压 5. 文件查找 6. 文件校验 """ @staticmethod def copy(src: Union[str, Path], dst: Union[str, Path], overwrite: bool = False, ignore_patterns: Optional[List[str]] = None) -> bool: """ 拷贝文件或文件夹 :param src: 源路径 :param dst: 目标路径 :param overwrite: 是否覆盖已存在文件 :param ignore_patterns: 忽略的文件模式列表 (如 ['*.tmp', '*.log']) :return: 是否成功 """ src, dst = Path(src), Path(dst) def _ignore(path, names): ignored = set() if ignore_patterns: for pattern in ignore_patterns: ignored.update(fnmatch.filter(names, pattern)) return ignored try: if src.is_file(): if dst.exists(): if not overwrite: return False if dst.is_dir(): dst = dst / src.name shutil.copy2(src, dst) elif src.is_dir(): if dst.exists() and not overwrite: return False shutil.copytree(src, dst, ignore=_ignore if ignore_patterns else None, dirs_exist_ok=overwrite) return True except Exception as e: print(f"拷贝失败: {e}") return False @staticmethod def delete(path: Union[str, Path], recursive: bool = False) -> bool: """ 删除文件或文件夹 :param path: 要删除的路径 :param recursive: 是否递归删除文件夹 :return: 是否成功 """ path = Path(path) try: if path.is_file(): path.unlink() elif path.is_dir(): if recursive: shutil.rmtree(path) else: path.rmdir() return True except Exception as e: print(f"删除失败: {e}") return False @staticmethod def compress( src: Union[str, Path, List[Union[str, Path]]], dst: Union[str, Path], fmt: str = 'zip', compression_level: int = 6 ) -> bool: """ 压缩文件或文件夹 :param src: 源路径(单个或多个) :param dst: 目标压缩文件路径 :param fmt: 压缩格式 (zip, tar, gz) :param compression_level: 压缩级别 (1-9) :return: 是否成功 """ src_list = [src] if not isinstance(src, list) else src src_list = [Path(s) for s in src_list] dst = Path(dst) try: if fmt == 'zip': with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression_level) as zf: for src_item in src_list: if src_item.is_file(): zf.write(src_item, src_item.name) elif src_item.is_dir(): for root, _, files in os.walk(src_item): for file in files: file_path = Path(root) / file arcname = file_path.relative_to(src_item.parent) zf.write(file_path, arcname) elif fmt == 'tar': with tarfile.open(dst, 'w:gz') as tf: for src_item in src_list: if src_item.is_file(): tf.add(src_item, arcname=src_item.name) elif src_item.is_dir(): tf.add(src_item, arcname=src_item.name) elif fmt == 'gz': if len(src_list) > 1: raise ValueError("gz格式只支持压缩单个文件") with open(src_list[0], 'rb') as f_in: with gzip.open(dst, 'wb', compresslevel=compression_level) as f_out: shutil.copyfileobj(f_in, f_out) else: raise ValueError(f"不支持的压缩格式: {fmt}") return True except Exception as e: print(f"压缩失败: {e}") return False @staticmethod def decompress( src: Union[str, Path], dst: Union[str, Path] = None, fmt: str = None ) -> bool: """ 解压文件 :param src: 压缩文件路径 :param dst: 解压目标路径 (默认为当前目录) :param fmt: 压缩格式 (自动检测如果为None) :return: 是否成功 """ src = Path(src) dst = Path(dst) if dst else Path.cwd() # 自动检测格式 if fmt is None: if src.suffix == '.zip': fmt = 'zip' elif src.suffix == '.tar' or src.suffixes[-2:] == ['.tar', '.gz']: fmt = 'tar' elif src.suffix == '.gz': fmt = 'gz' else: raise ValueError("无法自动识别压缩格式,请指定fmt参数") try: dst.mkdir(parents=True, exist_ok=True) if fmt == 'zip': with zipfile.ZipFile(src, 'r') as zf: zf.extractall(dst) elif fmt == 'tar': with tarfile.open(src, 'r:*') as tf: tf.extractall(dst) elif fmt == 'gz': with gzip.open(src, 'rb') as f_in: output_path = dst / src.stem with open(output_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: raise ValueError(f"不支持的压缩格式: {fmt}") return True except Exception as e: print(f"解压失败: {e}") return False @staticmethod def find_files( root: Union[str, Path], pattern: str = '*', recursive: bool = True ) -> List[Path]: """ 查找文件 :param root: 搜索根目录 :param pattern: 文件名模式 (如 '*.txt') :param recursive: 是否递归搜索 :return: 匹配的文件路径列表 """ root = Path(root) matches = [] if recursive: for path in root.rglob(pattern): if path.is_file(): matches.append(path) else: for path in root.glob(pattern): if path.is_file(): matches.append(path) return matches @staticmethod def calculate_size(path: Union[str, Path]) -> int: """ 计算文件或文件夹大小(字节) :param path: 路径 :return: 大小(字节) """ path = Path(path) if path.is_file(): return path.stat().st_size elif path.is_dir(): return sum(f.stat().st_size for f in path.rglob('*') if f.is_file()) return 0 @staticmethod def compare_files( file1: Union[str, Path], file2: Union[str, Path], chunk_size: int = 8192 ) -> bool: """ 比较两个文件内容是否相同 :param file1: 文件1路径 :param file2: 文件2路径 :param chunk_size: 读取块大小 :return: 是否相同 """ file1, file2 = Path(file1), Path(file2) if file1.stat().st_size != file2.stat().st_size: return False with open(file1, 'rb') as f1, open(file2, 'rb') as f2: while True: b1 = f1.read(chunk_size) b2 = f2.read(chunk_size) if b1 != b2: return False if not b1: return True @staticmethod def get_md5(file_path: Union[str, Path], chunk_size: int = 8192) -> str: """ 计算文件的MD5哈希值 :param file_path: 文件路径 :param chunk_size: 读取块大小 :return: MD5哈希值 """ import hashlib file_path = Path(file_path) md5 = hashlib.md5() with open(file_path, 'rb') as f: while chunk := f.read(chunk_size): md5.update(chunk) return md5.hexdigest() # 使用示例 if __name__ == "__main__": # 1. 拷贝示例 FileUtils.copy('source.txt', 'backup.txt') FileUtils.copy('mydir', 'mydir_backup', ignore_patterns=['*.tmp']) # 2. 删除示例 FileUtils.delete('backup.txt') FileUtils.delete('mydir_backup', recursive=True) # 3. 压缩示例 FileUtils.compress('mydir', 'mydir.zip') FileUtils.compress(['file1.txt', 'file2.txt'], 'files.tar', fmt='tar') # 4. 解压示例 FileUtils.decompress('mydir.zip', 'extracted') # 5. 查找文件示例 txt_files = FileUtils.find_files('.', '*.txt') print(f"找到的文本文件: {txt_files}") # 6. 计算大小示例 size = FileUtils.calculate_size('mydir') print(f"文件夹大小: {size} 字节") # 7. 比较文件示例 same = FileUtils.compare_files('file1.txt', 'file2.txt') print(f"文件是否相同: {same}") # 8. 计算MD5示例 md5 = FileUtils.get_md5('file1.txt') print(f"文件MD5: {md5}")