commit 5fcdd4ea97b07047db2e2f3dec7f13a8fcf42c1e Author: luojian Date: Mon Jul 7 11:27:26 2025 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7943cbc --- /dev/null +++ b/.gitignore @@ -0,0 +1,175 @@ +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Flask template +instance/* +!instance/.gitignore +.webassets-cache +.env + + +logs +out +project + diff --git a/build_config.json b/build_config.json new file mode 100644 index 0000000..ddc95b0 --- /dev/null +++ b/build_config.json @@ -0,0 +1,6 @@ +{ + "repo_url": "http://192.168.0.200:3000/Faxing/Lawnchair.git", + "repo_branch": "touka-dev", + "repo_commit": "", + "package_name": "com.shape.shift.run.launcher.free.game.xjrtg" +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..c93ffd3 --- /dev/null +++ b/main.py @@ -0,0 +1,32 @@ +from scripts.build import run +from scripts.context import Context +from utils.logger_utils import init + +if __name__ == '__main__': + context = Context.from_json(open("build_config.json", "r").read()) + + logger = init() + + run(context) + # # 系统信息示例 + # print(f"当前系统: {SystemUtils.get_platform_name()}") + # print(f"是Windows系统吗? {SystemUtils.is_windows()}") + # + # # 命令执行示例 + # if SystemUtils.is_windows(): + # cmd = "echo Hello Windows" + # else: + # cmd = "echo 'Hello Unix/Linux/macOS'" + # + # # 执行并获取输出 + # return_code, output, error = CommandUtils.execute(cmd) + # print(f"\n命令执行结果:") + # print(f"返回码: {return_code}") + # print(f"输出: {output.strip()}") + # if error: + # print(f"错误: {error}") + # + # # 实时输出示例 + # print("\n实时输出示例:") + # CommandUtils.execute_with_real_time_output( + # "ping -c 4 127.0.0.1" if not SystemUtils.is_windows() else "ping -n 4 127.0.0.1") diff --git a/scripts/build.py b/scripts/build.py new file mode 100644 index 0000000..c3b627e --- /dev/null +++ b/scripts/build.py @@ -0,0 +1,29 @@ +from .context import Context +from .project_build import ProjectBuild +from .project_copy import ProjectCopy +from .project_end import ProjectEnd +from .project_init import ProjectInit +from .project_update import ProjectUpdate +from .project_upload import ProjectUpload + +from utils.logger_utils import app_logger + + +def run(context: Context): + app_logger().info("build run.") + tasks = [ + # ProjectInit(context), + ProjectCopy(context), + ProjectUpdate(context), + ProjectBuild(context), + ProjectUpload(context), + # ProjectEnd(context), + ] + + for task in tasks: + app_logger().info(f"start[{task.__class__.__name__}]") + task.execute() + app_logger().info(f"end[{task.__class__.__name__}]") + + app_logger().info(context) + pass diff --git a/scripts/context.py b/scripts/context.py new file mode 100644 index 0000000..bde6f24 --- /dev/null +++ b/scripts/context.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass +import json + + +@dataclass +class Context: + repo_url: str = "" + repo_branch: str = "" + repo_commit: str = "" + package_name: str = "" + + project_original_path: str = "project/original" + + temp_project_path: str = "" + # 本地的版本号 + local_repo_branch: str = "" + local_repo_commit: str = "" + + @classmethod + def from_json(cls, json_str: str): + data = json.loads(json_str) + return cls(**data) diff --git a/scripts/project_build.py b/scripts/project_build.py new file mode 100644 index 0000000..b335c82 --- /dev/null +++ b/scripts/project_build.py @@ -0,0 +1,36 @@ +import os.path + +from scripts.task import Task +from utils import SystemUtils, CommandUtils +from utils.logger_utils import app_logger + + +class ProjectBuild(Task): + """ + assembleLawnWithQuickstepPlay + bundleLawnWithQuickstepPlayRelease + """ + + def gradlew(self): + gradlew = os.path.abspath(os.path.join(self.context.temp_project_path, "gradlew")) + if SystemUtils.is_windows(): + gradlew += ".bat" + return gradlew + + def build_apk(self): + cmd = f"{self.gradlew()} assembleLawnWithQuickstepPlay" + app_logger().debug(f"build apk cmd = {cmd}") + return_code, stdout, stderr = CommandUtils.execute(cmd) + app_logger().debug(f"build apk return_code = {return_code} stdout = {stdout}") + + def build_aab(self): + cmd = f"{self.gradlew()} bundleLawnWithQuickstepPlayRelease" + app_logger().debug(f"build aab cmd = {cmd}") + return_code, stdout, stderr = CommandUtils.execute(cmd) + app_logger().debug(f"build aab return_code = {return_code} stdout = {stdout}") + + def execute(self): + self.build_apk() + self.build_aab() + + pass diff --git a/scripts/project_copy.py b/scripts/project_copy.py new file mode 100644 index 0000000..f2a081b --- /dev/null +++ b/scripts/project_copy.py @@ -0,0 +1,18 @@ +from scripts.task import Task +from utils import FileUtils +from utils.logger_utils import app_logger + + +class ProjectCopy(Task): + + def execute(self): + self.init() + result = FileUtils.copy(self.context.project_original_path, self.context.temp_project_path) + app_logger().debug("Copied project '{}' to '{}'".format(self.context.project_original_path, result)) + pass + + def init(self): + self.context.temp_project_path = self.context.project_original_path.replace("original", + self.context.package_name) + + pass diff --git a/scripts/project_end.py b/scripts/project_end.py new file mode 100644 index 0000000..106eb2e --- /dev/null +++ b/scripts/project_end.py @@ -0,0 +1,8 @@ +from scripts.task import Task +from utils import FileUtils + + +class ProjectEnd(Task): + def execute(self): + FileUtils.delete(self.context.temp_project_path, True) + pass diff --git a/scripts/project_init.py b/scripts/project_init.py new file mode 100644 index 0000000..b621e62 --- /dev/null +++ b/scripts/project_init.py @@ -0,0 +1,60 @@ +import time + +from git import Repo, RemoteProgress +from .task import Task + + +def progress(op_code, cur_count, max_count=None, message=''): + if op_code == RemoteProgress.END: + print() + print(f"操作: {op_code}, 进度: {cur_count}/{max_count}, 消息: {message}") + + +class ProjectInit(Task): + def execute(self): + try: + repo = Repo(self.context.project_original_path) + except Exception: + repo = Repo.clone_from(self.context.repo_url, self.context.project_original_path, + recursive=True, + progress=progress) + + # for submodule in repo.submodules: + # print(f"子模块 '{submodule.name}' 路径: {submodule.path}") + # # print(f"Commit ID: {submodule.module().head.commit}") + + if self.context.repo_commit: + pass + elif self.context.repo_branch: + # 克隆仓库 + branch_name = self.context.repo_branch + + remote_name = "origin" # 远程仓库默认名称 + + repo.git.fetch(remote_name) + + # 2. 创建本地分支并跟踪远程分支 + remote_branch_ref = f"{remote_name}/{branch_name}" + local_branch = repo.create_head(branch_name, remote_branch_ref) # 创建本地分支指向远程 + local_branch.set_tracking_branch(repo.remotes[remote_name].refs[branch_name]) # 设置跟踪 + local_branch.checkout() # 切换到该分支 + + self.context.local_repo_branch = repo.active_branch.name + self.context.local_repo_commit = repo.head.commit.hexsha[:10] + + # 拉取最新代码 + repo.remotes.origin.pull() + pass + else: + raise Exception(f"No commit to {self.context.repo_commit}") + + repo.git.submodule('update', '--init', '--recursive') + + for submodule in repo.submodules: + print(submodule.url) + print(submodule.name) + print(submodule.hexsha) + print(submodule.path) + sub_repo = submodule.module() + sub_repo.git.reset("--hard", submodule.hexsha) + print(f"Reset {submodule.name} to {submodule.hexsha[:7]}") diff --git a/scripts/project_update.py b/scripts/project_update.py new file mode 100644 index 0000000..f98950a --- /dev/null +++ b/scripts/project_update.py @@ -0,0 +1,6 @@ +from scripts.task import Task + + +class ProjectUpdate(Task): + def execute(self): + pass diff --git a/scripts/project_upload.py b/scripts/project_upload.py new file mode 100644 index 0000000..1f789bc --- /dev/null +++ b/scripts/project_upload.py @@ -0,0 +1,6 @@ +from scripts.task import Task + + +class ProjectUpload(Task): + def execute(self): + pass diff --git a/scripts/task.py b/scripts/task.py new file mode 100644 index 0000000..68b47ff --- /dev/null +++ b/scripts/task.py @@ -0,0 +1,11 @@ +from .context import Context + + +class Task: + + def __init__(self, context: Context): + self.context = context + pass + + def execute(self): + pass diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..069d027 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,5 @@ +from .system_utils import SystemUtils +from .command_utils import CommandUtils +from .file_utils import FileUtils + +__all__ = ['SystemUtils', 'CommandUtils', "FileUtils"] diff --git a/utils/command_utils.py b/utils/command_utils.py new file mode 100644 index 0000000..673b379 --- /dev/null +++ b/utils/command_utils.py @@ -0,0 +1,71 @@ +import subprocess +from typing import Union, Tuple +from .system_utils import SystemUtils + + +class CommandUtils: + """命令执行工具类""" + + @staticmethod + def execute(command: Union[str, list], timeout: int = None) -> Tuple[int, str, str]: + """ + 执行系统命令并返回结果(兼容Windows/macOS/Linux) + + 参数: + command: 要执行的命令,可以是字符串或列表 + timeout: 超时时间(秒) + + 返回: + 元组: (return_code, stdout, stderr) + """ + # 预处理命令 + processed_cmd = CommandUtils._prepare_command(command) + + try: + # 执行命令 + result = subprocess.run( + processed_cmd, + shell=isinstance(command, str), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + timeout=timeout + ) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return -1, "", "Command timed out" + except Exception as e: + return -1, "", str(e) + + @staticmethod + def _prepare_command(command: Union[str, list]) -> Union[str, list]: + """预处理命令""" + if SystemUtils.is_windows() and isinstance(command, str): + return ['cmd', '/c'] + command.split() + return command + + @staticmethod + def execute_with_real_time_output(command: Union[str, list], timeout: int = None) -> int: + """ + 执行命令并实时输出(不捕获输出,直接打印到控制台) + + 参数: + command: 要执行的命令 + timeout: 超时时间(秒) + + 返回: + 返回状态码 + """ + processed_cmd = CommandUtils._prepare_command(command) + + try: + result = subprocess.run( + processed_cmd, + shell=isinstance(command, str), + timeout=timeout + ) + return result.returncode + except subprocess.TimeoutExpired: + return -1 + except Exception as e: + return -1 diff --git a/utils/file_utils.py b/utils/file_utils.py new file mode 100644 index 0000000..19c5eb5 --- /dev/null +++ b/utils/file_utils.py @@ -0,0 +1,308 @@ +import os +import shutil +import zipfile +import tarfile +import gzip +import fnmatch +from typing import Union, List, Optional +from pathlib import Path + + +class FileUtils: + """ + 文件操作工具类 + + 功能: + 1. 文件/文件夹拷贝 + 2. 文件/文件夹删除 + 3. 文件/文件夹压缩 (zip, tar, gz) + 4. 文件/文件夹解压 + 5. 文件查找 + 6. 文件校验 + """ + + @staticmethod + def copy(src: Union[str, Path], dst: Union[str, Path], + overwrite: bool = False, ignore_patterns: Optional[List[str]] = None) -> bool: + """ + 拷贝文件或文件夹 + + :param src: 源路径 + :param dst: 目标路径 + :param overwrite: 是否覆盖已存在文件 + :param ignore_patterns: 忽略的文件模式列表 (如 ['*.tmp', '*.log']) + :return: 是否成功 + """ + src, dst = Path(src), Path(dst) + + def _ignore(path, names): + ignored = set() + if ignore_patterns: + for pattern in ignore_patterns: + ignored.update(fnmatch.filter(names, pattern)) + return ignored + + try: + if src.is_file(): + if dst.exists(): + if not overwrite: + return False + if dst.is_dir(): + dst = dst / src.name + shutil.copy2(src, dst) + elif src.is_dir(): + if dst.exists() and not overwrite: + return False + shutil.copytree(src, dst, ignore=_ignore if ignore_patterns else None, + dirs_exist_ok=overwrite) + return True + except Exception as e: + print(f"拷贝失败: {e}") + return False + + @staticmethod + def delete(path: Union[str, Path], recursive: bool = False) -> bool: + """ + 删除文件或文件夹 + + :param path: 要删除的路径 + :param recursive: 是否递归删除文件夹 + :return: 是否成功 + """ + path = Path(path) + try: + if path.is_file(): + path.unlink() + elif path.is_dir(): + if recursive: + shutil.rmtree(path) + else: + path.rmdir() + return True + except Exception as e: + print(f"删除失败: {e}") + return False + + @staticmethod + def compress( + src: Union[str, Path, List[Union[str, Path]]], + dst: Union[str, Path], + fmt: str = 'zip', + compression_level: int = 6 + ) -> bool: + """ + 压缩文件或文件夹 + + :param src: 源路径(单个或多个) + :param dst: 目标压缩文件路径 + :param fmt: 压缩格式 (zip, tar, gz) + :param compression_level: 压缩级别 (1-9) + :return: 是否成功 + """ + src_list = [src] if not isinstance(src, list) else src + src_list = [Path(s) for s in src_list] + dst = Path(dst) + + try: + if fmt == 'zip': + with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression_level) as zf: + for src_item in src_list: + if src_item.is_file(): + zf.write(src_item, src_item.name) + elif src_item.is_dir(): + for root, _, files in os.walk(src_item): + for file in files: + file_path = Path(root) / file + arcname = file_path.relative_to(src_item.parent) + zf.write(file_path, arcname) + elif fmt == 'tar': + with tarfile.open(dst, 'w:gz') as tf: + for src_item in src_list: + if src_item.is_file(): + tf.add(src_item, arcname=src_item.name) + elif src_item.is_dir(): + tf.add(src_item, arcname=src_item.name) + elif fmt == 'gz': + if len(src_list) > 1: + raise ValueError("gz格式只支持压缩单个文件") + with open(src_list[0], 'rb') as f_in: + with gzip.open(dst, 'wb', compresslevel=compression_level) as f_out: + shutil.copyfileobj(f_in, f_out) + else: + raise ValueError(f"不支持的压缩格式: {fmt}") + return True + except Exception as e: + print(f"压缩失败: {e}") + return False + + @staticmethod + def decompress( + src: Union[str, Path], + dst: Union[str, Path] = None, + fmt: str = None + ) -> bool: + """ + 解压文件 + + :param src: 压缩文件路径 + :param dst: 解压目标路径 (默认为当前目录) + :param fmt: 压缩格式 (自动检测如果为None) + :return: 是否成功 + """ + src = Path(src) + dst = Path(dst) if dst else Path.cwd() + + # 自动检测格式 + if fmt is None: + if src.suffix == '.zip': + fmt = 'zip' + elif src.suffix == '.tar' or src.suffixes[-2:] == ['.tar', '.gz']: + fmt = 'tar' + elif src.suffix == '.gz': + fmt = 'gz' + else: + raise ValueError("无法自动识别压缩格式,请指定fmt参数") + + try: + dst.mkdir(parents=True, exist_ok=True) + + if fmt == 'zip': + with zipfile.ZipFile(src, 'r') as zf: + zf.extractall(dst) + elif fmt == 'tar': + with tarfile.open(src, 'r:*') as tf: + tf.extractall(dst) + elif fmt == 'gz': + with gzip.open(src, 'rb') as f_in: + output_path = dst / src.stem + with open(output_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + else: + raise ValueError(f"不支持的压缩格式: {fmt}") + return True + except Exception as e: + print(f"解压失败: {e}") + return False + + @staticmethod + def find_files( + root: Union[str, Path], + pattern: str = '*', + recursive: bool = True + ) -> List[Path]: + """ + 查找文件 + + :param root: 搜索根目录 + :param pattern: 文件名模式 (如 '*.txt') + :param recursive: 是否递归搜索 + :return: 匹配的文件路径列表 + """ + root = Path(root) + matches = [] + + if recursive: + for path in root.rglob(pattern): + if path.is_file(): + matches.append(path) + else: + for path in root.glob(pattern): + if path.is_file(): + matches.append(path) + + return matches + + @staticmethod + def calculate_size(path: Union[str, Path]) -> int: + """ + 计算文件或文件夹大小(字节) + + :param path: 路径 + :return: 大小(字节) + """ + path = Path(path) + if path.is_file(): + return path.stat().st_size + elif path.is_dir(): + return sum(f.stat().st_size for f in path.rglob('*') if f.is_file()) + return 0 + + @staticmethod + def compare_files( + file1: Union[str, Path], + file2: Union[str, Path], + chunk_size: int = 8192 + ) -> bool: + """ + 比较两个文件内容是否相同 + + :param file1: 文件1路径 + :param file2: 文件2路径 + :param chunk_size: 读取块大小 + :return: 是否相同 + """ + file1, file2 = Path(file1), Path(file2) + + if file1.stat().st_size != file2.stat().st_size: + return False + + with open(file1, 'rb') as f1, open(file2, 'rb') as f2: + while True: + b1 = f1.read(chunk_size) + b2 = f2.read(chunk_size) + if b1 != b2: + return False + if not b1: + return True + + @staticmethod + def get_md5(file_path: Union[str, Path], chunk_size: int = 8192) -> str: + """ + 计算文件的MD5哈希值 + + :param file_path: 文件路径 + :param chunk_size: 读取块大小 + :return: MD5哈希值 + """ + import hashlib + file_path = Path(file_path) + md5 = hashlib.md5() + + with open(file_path, 'rb') as f: + while chunk := f.read(chunk_size): + md5.update(chunk) + return md5.hexdigest() + + +# 使用示例 +if __name__ == "__main__": + # 1. 拷贝示例 + FileUtils.copy('source.txt', 'backup.txt') + FileUtils.copy('mydir', 'mydir_backup', ignore_patterns=['*.tmp']) + + # 2. 删除示例 + FileUtils.delete('backup.txt') + FileUtils.delete('mydir_backup', recursive=True) + + # 3. 压缩示例 + FileUtils.compress('mydir', 'mydir.zip') + FileUtils.compress(['file1.txt', 'file2.txt'], 'files.tar', fmt='tar') + + # 4. 解压示例 + FileUtils.decompress('mydir.zip', 'extracted') + + # 5. 查找文件示例 + txt_files = FileUtils.find_files('.', '*.txt') + print(f"找到的文本文件: {txt_files}") + + # 6. 计算大小示例 + size = FileUtils.calculate_size('mydir') + print(f"文件夹大小: {size} 字节") + + # 7. 比较文件示例 + same = FileUtils.compare_files('file1.txt', 'file2.txt') + print(f"文件是否相同: {same}") + + # 8. 计算MD5示例 + md5 = FileUtils.get_md5('file1.txt') + print(f"文件MD5: {md5}") diff --git a/utils/logger_utils.py b/utils/logger_utils.py new file mode 100644 index 0000000..453c8fb --- /dev/null +++ b/utils/logger_utils.py @@ -0,0 +1,154 @@ +import logging +import os +from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler +from typing import Union + + +class Logger: + """ + 日志工具类封装 + + 功能: + 1. 支持控制台和文件两种输出方式 + 2. 支持按大小或时间轮转日志文件 + 3. 支持自定义日志格式 + 4. 支持不同日志级别 + 5. 线程安全 + """ + + def __init__( + self, + name: str = "root", + level: Union[int, str] = logging.INFO, + console: bool = True, + file: bool = False, + file_path: str = "logs/app.log", + max_bytes: int = 10 * 1024 * 1024, # 10MB + backup_count: int = 5, + when: str = "midnight", + interval: int = 1, + fmt: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt: str = "%Y-%m-%d %H:%M:%S", + mode: str = "size" # 'size' 或 'time' + ): + """ + 初始化日志工具 + + :param name: 日志名称 + :param level: 日志级别 + :param console: 是否输出到控制台 + :param file: 是否输出到文件 + :param file_path: 日志文件路径 + :param max_bytes: 每个日志文件的最大大小(字节),仅mode='size'时有效 + :param backup_count: 保留的备份日志文件数量 + :param when: 日志轮转时间单位,如'S'(秒)、'M'(分)、'H'(小时)、'D'(天)、'midnight'(午夜),仅mode='time'时有效 + :param interval: 轮转间隔,仅mode='time'时有效 + :param fmt: 日志格式 + :param datefmt: 日期格式 + :param mode: 日志轮转模式,'size'按大小轮转,'time'按时间轮转 + """ + self.logger = logging.getLogger(name) + self.logger.setLevel(level) + + # 避免重复添加handler + if self.logger.handlers: + return + + formatter = logging.Formatter(fmt=fmt, datefmt=datefmt) + + # 控制台输出 + if console: + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + # 文件输出 + if file: + # 创建日志目录 + log_dir = os.path.dirname(file_path) + if log_dir and not os.path.exists(log_dir): + os.makedirs(log_dir) + + if mode == "size": + # 按大小轮转 + file_handler = RotatingFileHandler( + filename=file_path, + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8" + ) + else: + # 按时间轮转 + file_handler = TimedRotatingFileHandler( + filename=file_path, + when=when, + interval=interval, + backupCount=backup_count, + encoding="utf-8" + ) + + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + def debug(self, msg: str, *args, **kwargs): + """记录调试信息""" + self.logger.debug(msg, *args, **kwargs) + + def info(self, msg: str, *args, **kwargs): + """记录普通信息""" + self.logger.info(msg, *args, **kwargs) + + def warning(self, msg: str, *args, **kwargs): + """记录警告信息""" + self.logger.warning(msg, *args, **kwargs) + + def error(self, msg: str, *args, **kwargs): + """记录错误信息""" + self.logger.error(msg, *args, **kwargs) + + def critical(self, msg: str, *args, **kwargs): + """记录严重错误信息""" + self.logger.critical(msg, *args, **kwargs) + + def exception(self, msg: str, *args, exc_info=True, **kwargs): + """记录异常信息""" + self.logger.exception(msg, *args, exc_info=exc_info, **kwargs) + + def log(self, level: int, msg: str, *args, **kwargs): + """通用日志记录方法""" + self.logger.log(level, msg, *args, **kwargs) + + def set_level(self, level: Union[int, str]): + """设置日志级别""" + self.logger.setLevel(level) + + def add_handler(self, handler: logging.Handler): + """添加自定义handler""" + self.logger.addHandler(handler) + + def remove_handler(self, handler: logging.Handler): + """移除handler""" + self.logger.removeHandler(handler) + + +logger: Logger + + +def app_logger() -> Logger: + return logger + + +def init() -> Logger: + # 创建日志实例 + global logger + logger = Logger( + name="my_app", + level=logging.DEBUG, + console=True, + file=True, + file_path="logs/app.log", + max_bytes=1024 * 1024, # 1MB + backup_count=3, + mode="size" + ) + return logger diff --git a/utils/system_utils.py b/utils/system_utils.py new file mode 100644 index 0000000..ac65d85 --- /dev/null +++ b/utils/system_utils.py @@ -0,0 +1,33 @@ +import platform + + +class SystemUtils: + """系统相关工具类""" + + @staticmethod + def is_windows(): + """判断是否为Windows系统""" + return platform.system() == 'Windows' + + @staticmethod + def is_linux(): + """判断是否为Linux系统""" + return platform.system() == 'Linux' + + @staticmethod + def is_mac(): + """判断是否为macOS系统""" + return platform.system() == 'Darwin' + + @staticmethod + def get_platform_name(): + """获取平台名称""" + system = platform.system() + if system == 'Windows': + return 'windows' + elif system == 'Linux': + return 'linux' + elif system == 'Darwin': + return 'mac' + else: + return 'unknown'