diff options
| author | 2024-01-24 19:08:15 +0800 | |
|---|---|---|
| committer | 2024-01-24 19:08:15 +0800 | |
| commit | 6b895a231cadd7a3ac12fb089a09bd90d8b1ffb6 (patch) | |
| tree | e8a4c77393cef0b95b5cde8c1e050fd21a886703 /src | |
| parent | 3521280e1cbf81c8de89eaa4336952ba2c803693 (diff) | |
| download | ipm-6b895a231cadd7a3ac12fb089a09bd90d8b1ffb6.tar.gz ipm-6b895a231cadd7a3ac12fb089a09bd90d8b1ffb6.zip | |
:recycle: refactor(cli): use rich instead of multilogging to show output
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipm/__main__.py | 102 | ||||
| -rw-r--r-- | src/ipm/api.py | 132 | ||||
| -rw-r--r-- | src/ipm/logging.py | 43 | ||||
| -rw-r--r-- | src/ipm/utils/freeze.py | 58 |
4 files changed, 231 insertions, 104 deletions
diff --git a/src/ipm/__main__.py b/src/ipm/__main__.py index 5af2872..4575986 100644 --- a/src/ipm/__main__.py +++ b/src/ipm/__main__.py @@ -1,8 +1,9 @@ from . import api from .exceptions import IpmException -from .logging import logger +from .logging import status, error, tada import typer +status.start() main = typer.Typer( name="ipm", help="Infini 包管理器", no_args_is_help=True, add_completion=False ) @@ -12,9 +13,12 @@ main = typer.Typer( def check(): """分析 Infini 项目并创建项目锁""" try: - api.check(".") - except IpmException as error: - logger.error(error) + if api.check(".", echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() @@ -26,9 +30,12 @@ def install( ): """安装一个 Infini 规则包到此计算机""" try: - api.install(uri, index, upgrade=upgrade, force=force, echo=True) - except IpmException as error: - logger.error(error) + if api.install(uri, index, upgrade=upgrade, force=force, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() @@ -38,45 +45,60 @@ def extract( ): """解压缩 Infini 包""" try: - api.extract(package, dist, echo=True) - except IpmException as error: - logger.error(error) + if api.extract(package, dist, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() def init(force: bool = typer.Option(None, "--force", "-f", help="强制初始化")): """初始化一个 Infini 项目""" try: - api.init(".", force, echo=True) - except IpmException as error: - logger.error(error) + if api.init(".", force, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() def new(package: str = typer.Argument(help="Infini 项目路径")): """新建一个 Infini 项目""" try: - api.new(package, echo=True) - except IpmException as error: - logger.error(error) + if api.new(package, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() def build(package: str = typer.Argument(".", help="Infini 项目路径")): """打包 Infini 规则包""" try: - api.build(package, echo=True) - except IpmException as error: - logger.error(error) + if api.build(package, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() def uninstall(package: str = typer.Argument(help="Infini 项目路径")): """卸载 Infini 规则包""" try: - api.uninstall(package, echo=True) - except IpmException as error: - logger.error(error) + if api.uninstall(package, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() @@ -86,18 +108,24 @@ def require( ): """新增规则包依赖""" try: - api.require(name, index, echo=True) - except IpmException as error: - logger.error(error) + if api.require(name, index, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() def unrequire(name: str = typer.Argument(help="Infini 包名")): """删除规则包依赖""" try: - api.unrequire(name, echo=True) - except IpmException as error: - logger.error(error) + if api.unrequire(name, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() @@ -107,18 +135,24 @@ def add( ): """新增环境依赖""" try: - api.add(name, index=index, echo=True) - except IpmException as error: - logger.error(error) + if api.add(name, index=index, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() @main.command() def remove(name: str = typer.Argument(help="Infini 包名")): """删除环境依赖""" try: - api.remove(name, echo=True) - except IpmException as error: - logger.error(error) + if api.remove(name, echo=True): + tada() + except IpmException as err: + error(err, echo=True) + finally: + status.stop() # TODO diff --git a/src/ipm/api.py b/src/ipm/api.py index 9e69ce5..84ae92b 100644 --- a/src/ipm/api.py +++ b/src/ipm/api.py @@ -2,7 +2,7 @@ from pathlib import Path from .typing import StrPath from .utils import freeze, urlparser, loader from .const import INDEX, INDEX_PATH, STORAGE, SRC_HOME -from .logging import info, success, warning, error +from .logging import update, info, success, warning, error, confirm, tada from .exceptions import ( FileTypeMismatch, TomlLoadFailed, @@ -18,17 +18,25 @@ import toml import shutil -def check(source_path: StrPath) -> None: - return ProjectLock( +def check(source_path: StrPath, echo: bool = False) -> bool: + info("项目环境检查...", echo) + ProjectLock( Path(source_path).resolve() / "infini.lock", auto_load=False, ).init() + return True -def init(source_path: StrPath, force: bool = False, echo: bool = False) -> None: +def init(source_path: StrPath, force: bool = False, echo: bool = False) -> bool: + info("初始化规则包...", echo) + update("检查环境...", echo) source_path = Path(source_path).resolve() if (toml_path := (source_path / "infini.toml")).exists() and not force: - warning(f"无法在已经初始化的地址重新初始化, 如果你的确希望重新初始化, 请使用[ipm init --force].", echo) + return warning( + f"无法在已经初始化的地址重新初始化, 如果你的确希望重新初始化, 请使用[bold red]`ipm init --force`[/bold red].", + echo, + ) + success("环境检查完毕.", echo) toml_file = toml_path.open("w", encoding="utf-8") toml.dump( @@ -50,30 +58,48 @@ def init(source_path: StrPath, force: bool = False, echo: bool = False) -> None: (source_path / "src" / "__init__.py").write_text( "# Initialized `__init__.py` generated by ipm." ) + return True + +def new(dist_path: StrPath, echo: bool = False) -> bool: + info("新建规则包...", echo) -def new(dist_path: StrPath, echo: bool = False) -> None: - info("初始化环境中...") + update("检查环境...", echo) path = Path(dist_path).resolve() if path.exists(): - return warning(f"路径[{path}]已经存在.", echo) + return warning( + f"路径 [blue]{path.relative_to(Path('.').resolve())}[/blue] 已经存在.", echo + ) path.mkdir(parents=True, exist_ok=True) - return init(path, echo=echo) + success("环境检查完毕.", echo) + + init(path, echo=echo) + + success(f"规则包 [bold green]{path.name}[/bold green] 新建完成!", echo) + return True def build(source_path: StrPath, echo: bool = False) -> InfiniFrozenPackage: - info("检查构建环境...", echo) + info("构建规则包...", echo) + update("检查构建环境...", echo) + + if not (Path(source_path).resolve() / "infini.toml").exists(): + raise FileNotFoundError( + f"文件 [green]infini.toml[/green] 尚未被初始化, 你可以使用[bold green]`ipm init`[/bold green]来初始化项目." + ) + try: ipk = InfiniProject(source_path) - info(f"包[{ipk.name}]构建环境载入完毕.", echo) except TomlLoadFailed as e: return error(f"环境存在异常: {e}", echo) + return freeze.build_ipk(ipk, echo) def extract( source_path: StrPath, dist_path: StrPath | None = None, echo: bool = False ) -> InfiniProject: + info("解压缩规则包...", echo) dist_path = ( Path(dist_path).resolve() if dist_path else Path(source_path).resolve().parent ) @@ -87,13 +113,14 @@ def install( force: bool = False, echo: bool = False, ) -> None: - info("正在初始化 IPM 环境...", echo) + info(f"安装规则包 [bold green]{uri}[/bold green]...", echo) + update("初始化 [bold green]IPM[/bold green] 环境...", echo) SRC_HOME.mkdir(parents=True, exist_ok=True) index = index or INDEX lock = PackageLock() - if uri.isalpha(): + if uri.split("==")[0].isalpha(): # TODO 兼容 >= <= > < 等标识符 splited_uri = uri.split("==") name = splited_uri[0] @@ -120,7 +147,6 @@ def install( echo=echo, ) elif urlparser.is_valid_url(uri): - info(f"检定给定的 URI 地址[{uri}]为远程路径.", echo) filename = uri.rstrip("/").rpartition("/")[-1] ifp = loader.load_from_remote( "temp", @@ -129,8 +155,8 @@ def install( echo=echo, ) else: - info(f"检定给定的 URI 地址[{uri}]为本地路径.", echo) path = Path(uri).resolve() + update(f"检查文件 [blue]{path}[/blue]...", echo) if not path.exists(): raise FileNotFoundError("给定的 URI 路径不存在!") @@ -144,63 +170,84 @@ def install( if require_update(exists_version, ifp.version): if not upgrade: raise PackageExsitsError( - f"包[{ifp.name}]版本[{exists_version}]已经安装了, 使用[--upgrade]参数进行升级." + f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[blue]--upgrade[/blue]]参数进行升级." ) else: info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...") - uninstall(ifp.name, confirm=True, echo=echo) - success(f"[{ifp.name}={exists_version}]卸载完成...") + uninstall(ifp.name, is_confirm=True, echo=echo) + success(f"[{ifp.name}={exists_version}]卸载完成.") else: if not force: raise PackageExsitsError( - f"已经安装了[{ifp.name}]版本[{exists_version}], 使用[--force]参数进行强制覆盖." + f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[red]--force[/red]]参数进行强制覆盖." ) else: - info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...") - uninstall(ifp.name, confirm=True, echo=echo) - success(f"[{ifp.name}={exists_version}]卸载完成...") + info( + f"发现已经安装的 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 卸载中..." + ) + uninstall(ifp.name, is_confirm=True, echo=echo) + success( + f"[bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow]卸载完成..." + ) lock.load(auto_completion=True) - info(f"开始安装[{ifp.name}]中...", echo) - ipk = extract(STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo) - info("正在处理全局包锁...", echo) + update( + f"安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow] 中...", + echo, + ) + ipk = extract(STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo) # TODO 安装依赖规则包 + success( + f"成功安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow].", + echo, + ) + + update("处理全局锁...", echo) if not lock.has_storage(ifp.name): lock.add_storage(ifp, dump=True) lock.add_package(ipk, dump=True) - info("全局锁已处理完毕.", echo) + success("全局锁已处理完毕.", echo) - success(f"包[{ipk.name}]成功安装在[{ipk.source_path}].", echo) + success(f"包[{ipk.name}]成功安装在 [blue]{ipk.source_path}[/blue].", echo) -def uninstall(name: str, confirm: bool = False, echo: bool = False) -> None: +def uninstall(name: str, is_confirm: bool = False, echo: bool = False) -> None: lock = PackageLock() path = SRC_HOME / name.strip() if not (install_info := lock.get_package(name)): - return warning(f"由于[{name}]未被安装, 故忽略卸载操作.", echo) - - info(f"发现已经安装的[{name}]版本[{install_info['version']}]:", echo) - info(f" 将会清理: {path}", echo) - confirm: bool = ( - True - if (input("你确定要继续 (Y/n) ").upper() in ("", "Y") if not confirm else confirm) - else False + return warning(f"由于 [bold green]{name}[/bold green]未被安装, 故忽略卸载操作.", echo) + + info( + f"发现已经安装的 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow].", + echo, ) - if confirm: + update( + f"卸载 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow]..." + ) + warning(f"将会清理: [blue]{path}[/blue]", echo) + is_confirm: bool = ( + True if (confirm("你确定要继续?") if not is_confirm else is_confirm) else False + ) + if is_confirm: shutil.rmtree(SRC_HOME / name, ignore_errors=True) else: return info("忽略.", echo) lock.remove_package(name, dump=True) - success(f"规则包[{name}]卸载完成!", echo) + success( + f"规则包 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow] 卸载完成!", + echo, + ) + return True def require(name: str, index: str = "", echo: bool = False) -> None: - info("检查环境中...", echo) + info(f"新增规则包依赖: [bold green]{name}[/bold green]", echo) + update("检查环境中...", echo) pkg_lock = PackageLock() lock = ProjectLock() ipk = InfiniProject() - info("环境检查完毕.", echo) + success("环境检查完毕.", echo) splited_name = name.split("==") # TODO 支持 >= <= > < 标识 name = splited_name[0] @@ -222,11 +269,13 @@ def require(name: str, index: str = "", echo: bool = False) -> None: info("处理 Infini 项目依赖锁...", echo) ipk.require(name, version, dump=True) - lock.require(name, version, dump=True) + lock.require(name, version, dump=True) # TODO 使用check替代 success("规则包依赖新增完成.", echo) + return True def unrequire(name: str, echo: bool = False): + info(f"删除规则包依赖: [bold green]{name}[/bold green]", echo) info("处理 Infini 项目依赖锁...", echo) ipk = InfiniProject() lock = ProjectLock() @@ -234,6 +283,7 @@ def unrequire(name: str, echo: bool = False): ipk.unrequire(name, dump=True) lock.unrequire(name, dump=True) success("规则包依赖删除完成.", echo) + return True def add(name: str, index: str = "", echo: bool = False) -> None: diff --git a/src/ipm/logging.py b/src/ipm/logging.py index f923e64..369c8f2 100644 --- a/src/ipm/logging.py +++ b/src/ipm/logging.py @@ -1,20 +1,49 @@ -from multilogging import multilogger -from .const import DEBUG +from rich.console import Console +from rich.prompt import Confirm -logger = multilogger(name="IPM", level="DEBUG" if DEBUG else "INFO", notime=True) +console = Console() +status = console.status("") + + +def update(message: str, echo: bool = False) -> None: + return status.update(message) if echo else status.stop() def info(message: str, echo: bool = False) -> None: - return logger.info(message) if echo else None + return console.print(message) if echo else None def success(message: str, echo: bool = False) -> None: - return logger.success(message) if echo else None + return ( + console.print(" [green]:heavy_check_mark:[/green]", str(message)) + if echo + else None + ) def warning(message: str, echo: bool = False) -> None: - return logger.warning(message) if echo else None + return console.print(" [yellow]:warning:[/yellow]", str(message)) if echo else None def error(message: str, echo: bool = False) -> None: - return logger.error(message) if echo else None + return ( + console.print(" [red]:heavy_multiplication_x:[/red]", str(message)) + if echo + else None + ) + + +def critical(message: str, echo: bool = False) -> None: + return ( + console.print("".join((" [bold red]", str(message), "[/bold red]"))) + if echo + else None + ) + + +def tada(message: str = "工作完成!", echo: bool = True) -> None: + return console.print("\n[red]:tada:[/red]", str(message), "\n") if echo else None + + +def confirm(messgae: str, default: bool = False) -> bool: + return Confirm.ask(str(messgae), default=default) diff --git a/src/ipm/utils/freeze.py b/src/ipm/utils/freeze.py index 1ba0d9f..277a068 100644 --- a/src/ipm/utils/freeze.py +++ b/src/ipm/utils/freeze.py @@ -2,7 +2,7 @@ from pathlib import Path from ..exceptions import FileNotFoundError, VerifyFailed from ..models.ipk import InfiniProject, InfiniFrozenPackage from ..typing import StrPath -from ..logging import logger, info, success +from ..logging import update, info, success, error from .hash import ifp_hash, ifp_verify from . import _freeze @@ -11,36 +11,44 @@ import shutil def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage: - info("正在初始化开发环境...", echo) + update("构建开发环境...", echo) build_dir = ipk.source_path / "build" src_path = ipk.source_path / "src" dist_path = ipk.source_path / "dist" ifp_path = dist_path / ipk.default_name if not ipk.source_path.exists(): - raise FileNotFoundError(f"文件或文件夹[{ipk.source_path.resolve()}]不存在!") + raise FileNotFoundError(f"文件或文件夹 [blue]{ipk.source_path.resolve()}[/blue]]不存在!") if build_dir.exists(): + update("清理构建环境...") shutil.rmtree(build_dir, ignore_errors=True) + success("构建环境清理完毕.") dist_path.mkdir(parents=True, exist_ok=True) build_dir.mkdir(parents=True, exist_ok=True) + success("开发环境构建完毕.", echo) - info("开发环境构建完成, 开始复制工程文件...", echo) + update("复制工程文件...", echo) shutil.copytree(src_path, build_dir / "src") shutil.copy2(ipk.source_path / "infini.toml", build_dir / "infini.toml") - info("工程文件复制完毕, 开始打包[ipk]文件...", echo) + success("工程文件复制完毕.") + update("打包 [bold green]ipk[/bold green]文件...", echo) _freeze.create_tar_gz( str(build_dir), str(ifp_path), ) + success(f"打包文件已存至 [blue]{ifp_path}[/blue].", echo) - success(f"打包文件已存至[{ifp_path}].", echo) - info("开始创建SHA256验证文件...", echo) + update("创建 SHA256 验证文件...", echo) hash_bytes = ifp_hash(ifp_path) - info(f"文件SHA256值为[{hash_bytes.hex()}].", echo) + info(f"文件 SHA256 值为 [purple]{hash_bytes.hex()}[/purple].", echo) + (dist_path / ipk.hash_name).write_bytes(hash_bytes) - success(f"包[{ipk.name}]构建成功.", echo) + success( + f"包 [bold green]{ipk.name}[/bold green] [yellow]{ipk.version}[/yellow] 构建成功.", + echo, + ) return InfiniFrozenPackage(source_path=ifp_path, **{"name": ipk.name}) @@ -53,33 +61,39 @@ def extract_ipk( hash_path = ifp_path.parent / (ifp_path.name + ".hash") if not hash_path.exists(): - raise VerifyFailed(f"哈希文件[{hash_path}]不存在!") + raise VerifyFailed(f"哈希文件 [blue]{hash_path}[/blue] 不存在!") - info("进行文件校验中...") + update("文件校验...") if not ifp_verify(ifp_path, hash_path.read_bytes()): raise VerifyFailed("文件完整性验证失败!") - info("文件校验成功.") + success("文件校验成功.") temp_dir = tempfile.TemporaryDirectory() - temp_path = Path(temp_dir.name).resolve() / "ifp" - info(f"创建临时目录[{temp_dir}], 开始解压...", echo) + info(f"创建临时目录 [blue]{temp_dir}[/blue].") + + update(f"解压 [blue]{ifp_path}[/blue]...", echo) _freeze.extract_tar_gz(str(ifp_path), str(temp_path)) temp_pkg = InfiniProject(temp_path) dist_pkg_path = dist_path / temp_pkg.name - success(f"包[{temp_pkg.name}]解压完成.", echo) + success( + f"[bold green]{temp_pkg.name}[/bold green] [yellow]{temp_pkg.version}[/yellow] 已解压至缓存目录.", + echo, + ) if dist_pkg_path.exists(): - info("目标路径已存在, 清理旧文件...", echo) + update("目标路径已存在, 清理旧文件...", echo) try: shutil.rmtree(dist_pkg_path) - except Exception as error: - logger.exception(error) if echo else logger.error(error) - success("旧规则包清理完毕.", echo) + except Exception as err: + return error(err) + success("旧规则包项目文件清理完毕.", echo) - info(f"迁移文件至安装目录中...", echo) + update(f"迁移文件至目标目录...", echo) shutil.move(temp_path, dist_pkg_path) - info(f"任务完成, 开始清理临时文件...", echo) + success(f"文件已迁移至 [blue]{dist_pkg_path}[/blue].", echo) + + update(f"清理临时文件...", echo) temp_dir.cleanup() - info(f"临时文件清理完毕.", echo) + success(f"临时文件清理完毕.", echo) return InfiniProject(dist_pkg_path) |
