aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src
diff options
context:
space:
mode:
author苏向夜 <fu050409@163.com>2024-01-24 19:08:15 +0800
committer苏向夜 <fu050409@163.com>2024-01-24 19:08:15 +0800
commit6b895a231cadd7a3ac12fb089a09bd90d8b1ffb6 (patch)
treee8a4c77393cef0b95b5cde8c1e050fd21a886703 /src
parent3521280e1cbf81c8de89eaa4336952ba2c803693 (diff)
downloadipm-6b895a231cadd7a3ac12fb089a09bd90d8b1ffb6.tar.gz
ipm-6b895a231cadd7a3ac12fb089a09bd90d8b1ffb6.zip
:recycle: refactor(cli): use rich instead of multilogging to show output
Diffstat (limited to 'src')
-rw-r--r--src/ipm/__main__.py102
-rw-r--r--src/ipm/api.py132
-rw-r--r--src/ipm/logging.py43
-rw-r--r--src/ipm/utils/freeze.py58
4 files changed, 231 insertions, 104 deletions
diff --git a/src/ipm/__main__.py b/src/ipm/__main__.py
index 5af2872..4575986 100644
--- a/src/ipm/__main__.py
+++ b/src/ipm/__main__.py
@@ -1,8 +1,9 @@
from . import api
from .exceptions import IpmException
-from .logging import logger
+from .logging import status, error, tada
import typer
+status.start()
main = typer.Typer(
name="ipm", help="Infini 包管理器", no_args_is_help=True, add_completion=False
)
@@ -12,9 +13,12 @@ main = typer.Typer(
def check():
"""分析 Infini 项目并创建项目锁"""
try:
- api.check(".")
- except IpmException as error:
- logger.error(error)
+ if api.check(".", echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
@@ -26,9 +30,12 @@ def install(
):
"""安装一个 Infini 规则包到此计算机"""
try:
- api.install(uri, index, upgrade=upgrade, force=force, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.install(uri, index, upgrade=upgrade, force=force, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
@@ -38,45 +45,60 @@ def extract(
):
"""解压缩 Infini 包"""
try:
- api.extract(package, dist, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.extract(package, dist, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
def init(force: bool = typer.Option(None, "--force", "-f", help="强制初始化")):
"""初始化一个 Infini 项目"""
try:
- api.init(".", force, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.init(".", force, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
def new(package: str = typer.Argument(help="Infini 项目路径")):
"""新建一个 Infini 项目"""
try:
- api.new(package, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.new(package, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
def build(package: str = typer.Argument(".", help="Infini 项目路径")):
"""打包 Infini 规则包"""
try:
- api.build(package, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.build(package, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
def uninstall(package: str = typer.Argument(help="Infini 项目路径")):
"""卸载 Infini 规则包"""
try:
- api.uninstall(package, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.uninstall(package, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
@@ -86,18 +108,24 @@ def require(
):
"""新增规则包依赖"""
try:
- api.require(name, index, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.require(name, index, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
def unrequire(name: str = typer.Argument(help="Infini 包名")):
"""删除规则包依赖"""
try:
- api.unrequire(name, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.unrequire(name, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
@@ -107,18 +135,24 @@ def add(
):
"""新增环境依赖"""
try:
- api.add(name, index=index, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.add(name, index=index, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
@main.command()
def remove(name: str = typer.Argument(help="Infini 包名")):
"""删除环境依赖"""
try:
- api.remove(name, echo=True)
- except IpmException as error:
- logger.error(error)
+ if api.remove(name, echo=True):
+ tada()
+ except IpmException as err:
+ error(err, echo=True)
+ finally:
+ status.stop()
# TODO
diff --git a/src/ipm/api.py b/src/ipm/api.py
index 9e69ce5..84ae92b 100644
--- a/src/ipm/api.py
+++ b/src/ipm/api.py
@@ -2,7 +2,7 @@ from pathlib import Path
from .typing import StrPath
from .utils import freeze, urlparser, loader
from .const import INDEX, INDEX_PATH, STORAGE, SRC_HOME
-from .logging import info, success, warning, error
+from .logging import update, info, success, warning, error, confirm, tada
from .exceptions import (
FileTypeMismatch,
TomlLoadFailed,
@@ -18,17 +18,25 @@ import toml
import shutil
-def check(source_path: StrPath) -> None:
- return ProjectLock(
+def check(source_path: StrPath, echo: bool = False) -> bool:
+ info("项目环境检查...", echo)
+ ProjectLock(
Path(source_path).resolve() / "infini.lock",
auto_load=False,
).init()
+ return True
-def init(source_path: StrPath, force: bool = False, echo: bool = False) -> None:
+def init(source_path: StrPath, force: bool = False, echo: bool = False) -> bool:
+ info("初始化规则包...", echo)
+ update("检查环境...", echo)
source_path = Path(source_path).resolve()
if (toml_path := (source_path / "infini.toml")).exists() and not force:
- warning(f"无法在已经初始化的地址重新初始化, 如果你的确希望重新初始化, 请使用[ipm init --force].", echo)
+ return warning(
+ f"无法在已经初始化的地址重新初始化, 如果你的确希望重新初始化, 请使用[bold red]`ipm init --force`[/bold red].",
+ echo,
+ )
+ success("环境检查完毕.", echo)
toml_file = toml_path.open("w", encoding="utf-8")
toml.dump(
@@ -50,30 +58,48 @@ def init(source_path: StrPath, force: bool = False, echo: bool = False) -> None:
(source_path / "src" / "__init__.py").write_text(
"# Initialized `__init__.py` generated by ipm."
)
+ return True
+
+def new(dist_path: StrPath, echo: bool = False) -> bool:
+ info("新建规则包...", echo)
-def new(dist_path: StrPath, echo: bool = False) -> None:
- info("初始化环境中...")
+ update("检查环境...", echo)
path = Path(dist_path).resolve()
if path.exists():
- return warning(f"路径[{path}]已经存在.", echo)
+ return warning(
+ f"路径 [blue]{path.relative_to(Path('.').resolve())}[/blue] 已经存在.", echo
+ )
path.mkdir(parents=True, exist_ok=True)
- return init(path, echo=echo)
+ success("环境检查完毕.", echo)
+
+ init(path, echo=echo)
+
+ success(f"规则包 [bold green]{path.name}[/bold green] 新建完成!", echo)
+ return True
def build(source_path: StrPath, echo: bool = False) -> InfiniFrozenPackage:
- info("检查构建环境...", echo)
+ info("构建规则包...", echo)
+ update("检查构建环境...", echo)
+
+ if not (Path(source_path).resolve() / "infini.toml").exists():
+ raise FileNotFoundError(
+ f"文件 [green]infini.toml[/green] 尚未被初始化, 你可以使用[bold green]`ipm init`[/bold green]来初始化项目."
+ )
+
try:
ipk = InfiniProject(source_path)
- info(f"包[{ipk.name}]构建环境载入完毕.", echo)
except TomlLoadFailed as e:
return error(f"环境存在异常: {e}", echo)
+
return freeze.build_ipk(ipk, echo)
def extract(
source_path: StrPath, dist_path: StrPath | None = None, echo: bool = False
) -> InfiniProject:
+ info("解压缩规则包...", echo)
dist_path = (
Path(dist_path).resolve() if dist_path else Path(source_path).resolve().parent
)
@@ -87,13 +113,14 @@ def install(
force: bool = False,
echo: bool = False,
) -> None:
- info("正在初始化 IPM 环境...", echo)
+ info(f"安装规则包 [bold green]{uri}[/bold green]...", echo)
+ update("初始化 [bold green]IPM[/bold green] 环境...", echo)
SRC_HOME.mkdir(parents=True, exist_ok=True)
index = index or INDEX
lock = PackageLock()
- if uri.isalpha():
+ if uri.split("==")[0].isalpha():
# TODO 兼容 >= <= > < 等标识符
splited_uri = uri.split("==")
name = splited_uri[0]
@@ -120,7 +147,6 @@ def install(
echo=echo,
)
elif urlparser.is_valid_url(uri):
- info(f"检定给定的 URI 地址[{uri}]为远程路径.", echo)
filename = uri.rstrip("/").rpartition("/")[-1]
ifp = loader.load_from_remote(
"temp",
@@ -129,8 +155,8 @@ def install(
echo=echo,
)
else:
- info(f"检定给定的 URI 地址[{uri}]为本地路径.", echo)
path = Path(uri).resolve()
+ update(f"检查文件 [blue]{path}[/blue]...", echo)
if not path.exists():
raise FileNotFoundError("给定的 URI 路径不存在!")
@@ -144,63 +170,84 @@ def install(
if require_update(exists_version, ifp.version):
if not upgrade:
raise PackageExsitsError(
- f"包[{ifp.name}]版本[{exists_version}]已经安装了, 使用[--upgrade]参数进行升级."
+ f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[blue]--upgrade[/blue]]参数进行升级."
)
else:
info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...")
- uninstall(ifp.name, confirm=True, echo=echo)
- success(f"[{ifp.name}={exists_version}]卸载完成...")
+ uninstall(ifp.name, is_confirm=True, echo=echo)
+ success(f"[{ifp.name}={exists_version}]卸载完成.")
else:
if not force:
raise PackageExsitsError(
- f"已经安装了[{ifp.name}]版本[{exists_version}], 使用[--force]参数进行强制覆盖."
+ f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[red]--force[/red]]参数进行强制覆盖."
)
else:
- info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...")
- uninstall(ifp.name, confirm=True, echo=echo)
- success(f"[{ifp.name}={exists_version}]卸载完成...")
+ info(
+ f"发现已经安装的 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 卸载中..."
+ )
+ uninstall(ifp.name, is_confirm=True, echo=echo)
+ success(
+ f"[bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow]卸载完成..."
+ )
lock.load(auto_completion=True)
- info(f"开始安装[{ifp.name}]中...", echo)
- ipk = extract(STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo)
- info("正在处理全局包锁...", echo)
+ update(
+ f"安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow] 中...",
+ echo,
+ )
+ ipk = extract(STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo) # TODO 安装依赖规则包
+ success(
+ f"成功安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow].",
+ echo,
+ )
+
+ update("处理全局锁...", echo)
if not lock.has_storage(ifp.name):
lock.add_storage(ifp, dump=True)
lock.add_package(ipk, dump=True)
- info("全局锁已处理完毕.", echo)
+ success("全局锁已处理完毕.", echo)
- success(f"包[{ipk.name}]成功安装在[{ipk.source_path}].", echo)
+ success(f"包[{ipk.name}]成功安装在 [blue]{ipk.source_path}[/blue].", echo)
-def uninstall(name: str, confirm: bool = False, echo: bool = False) -> None:
+def uninstall(name: str, is_confirm: bool = False, echo: bool = False) -> None:
lock = PackageLock()
path = SRC_HOME / name.strip()
if not (install_info := lock.get_package(name)):
- return warning(f"由于[{name}]未被安装, 故忽略卸载操作.", echo)
-
- info(f"发现已经安装的[{name}]版本[{install_info['version']}]:", echo)
- info(f" 将会清理: {path}", echo)
- confirm: bool = (
- True
- if (input("你确定要继续 (Y/n) ").upper() in ("", "Y") if not confirm else confirm)
- else False
+ return warning(f"由于 [bold green]{name}[/bold green]未被安装, 故忽略卸载操作.", echo)
+
+ info(
+ f"发现已经安装的 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow].",
+ echo,
)
- if confirm:
+ update(
+ f"卸载 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow]..."
+ )
+ warning(f"将会清理: [blue]{path}[/blue]", echo)
+ is_confirm: bool = (
+ True if (confirm("你确定要继续?") if not is_confirm else is_confirm) else False
+ )
+ if is_confirm:
shutil.rmtree(SRC_HOME / name, ignore_errors=True)
else:
return info("忽略.", echo)
lock.remove_package(name, dump=True)
- success(f"规则包[{name}]卸载完成!", echo)
+ success(
+ f"规则包 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow] 卸载完成!",
+ echo,
+ )
+ return True
def require(name: str, index: str = "", echo: bool = False) -> None:
- info("检查环境中...", echo)
+ info(f"新增规则包依赖: [bold green]{name}[/bold green]", echo)
+ update("检查环境中...", echo)
pkg_lock = PackageLock()
lock = ProjectLock()
ipk = InfiniProject()
- info("环境检查完毕.", echo)
+ success("环境检查完毕.", echo)
splited_name = name.split("==") # TODO 支持 >= <= > < 标识
name = splited_name[0]
@@ -222,11 +269,13 @@ def require(name: str, index: str = "", echo: bool = False) -> None:
info("处理 Infini 项目依赖锁...", echo)
ipk.require(name, version, dump=True)
- lock.require(name, version, dump=True)
+ lock.require(name, version, dump=True) # TODO 使用check替代
success("规则包依赖新增完成.", echo)
+ return True
def unrequire(name: str, echo: bool = False):
+ info(f"删除规则包依赖: [bold green]{name}[/bold green]", echo)
info("处理 Infini 项目依赖锁...", echo)
ipk = InfiniProject()
lock = ProjectLock()
@@ -234,6 +283,7 @@ def unrequire(name: str, echo: bool = False):
ipk.unrequire(name, dump=True)
lock.unrequire(name, dump=True)
success("规则包依赖删除完成.", echo)
+ return True
def add(name: str, index: str = "", echo: bool = False) -> None:
diff --git a/src/ipm/logging.py b/src/ipm/logging.py
index f923e64..369c8f2 100644
--- a/src/ipm/logging.py
+++ b/src/ipm/logging.py
@@ -1,20 +1,49 @@
-from multilogging import multilogger
-from .const import DEBUG
+from rich.console import Console
+from rich.prompt import Confirm
-logger = multilogger(name="IPM", level="DEBUG" if DEBUG else "INFO", notime=True)
+console = Console()
+status = console.status("")
+
+
+def update(message: str, echo: bool = False) -> None:
+ return status.update(message) if echo else status.stop()
def info(message: str, echo: bool = False) -> None:
- return logger.info(message) if echo else None
+ return console.print(message) if echo else None
def success(message: str, echo: bool = False) -> None:
- return logger.success(message) if echo else None
+ return (
+ console.print(" [green]:heavy_check_mark:[/green]", str(message))
+ if echo
+ else None
+ )
def warning(message: str, echo: bool = False) -> None:
- return logger.warning(message) if echo else None
+ return console.print(" [yellow]:warning:[/yellow]", str(message)) if echo else None
def error(message: str, echo: bool = False) -> None:
- return logger.error(message) if echo else None
+ return (
+ console.print(" [red]:heavy_multiplication_x:[/red]", str(message))
+ if echo
+ else None
+ )
+
+
+def critical(message: str, echo: bool = False) -> None:
+ return (
+ console.print("".join((" [bold red]", str(message), "[/bold red]")))
+ if echo
+ else None
+ )
+
+
+def tada(message: str = "工作完成!", echo: bool = True) -> None:
+ return console.print("\n[red]:tada:[/red]", str(message), "\n") if echo else None
+
+
+def confirm(messgae: str, default: bool = False) -> bool:
+ return Confirm.ask(str(messgae), default=default)
diff --git a/src/ipm/utils/freeze.py b/src/ipm/utils/freeze.py
index 1ba0d9f..277a068 100644
--- a/src/ipm/utils/freeze.py
+++ b/src/ipm/utils/freeze.py
@@ -2,7 +2,7 @@ from pathlib import Path
from ..exceptions import FileNotFoundError, VerifyFailed
from ..models.ipk import InfiniProject, InfiniFrozenPackage
from ..typing import StrPath
-from ..logging import logger, info, success
+from ..logging import update, info, success, error
from .hash import ifp_hash, ifp_verify
from . import _freeze
@@ -11,36 +11,44 @@ import shutil
def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage:
- info("正在初始化开发环境...", echo)
+ update("构建开发环境...", echo)
build_dir = ipk.source_path / "build"
src_path = ipk.source_path / "src"
dist_path = ipk.source_path / "dist"
ifp_path = dist_path / ipk.default_name
if not ipk.source_path.exists():
- raise FileNotFoundError(f"文件或文件夹[{ipk.source_path.resolve()}]不存在!")
+ raise FileNotFoundError(f"文件或文件夹 [blue]{ipk.source_path.resolve()}[/blue]]不存在!")
if build_dir.exists():
+ update("清理构建环境...")
shutil.rmtree(build_dir, ignore_errors=True)
+ success("构建环境清理完毕.")
dist_path.mkdir(parents=True, exist_ok=True)
build_dir.mkdir(parents=True, exist_ok=True)
+ success("开发环境构建完毕.", echo)
- info("开发环境构建完成, 开始复制工程文件...", echo)
+ update("复制工程文件...", echo)
shutil.copytree(src_path, build_dir / "src")
shutil.copy2(ipk.source_path / "infini.toml", build_dir / "infini.toml")
- info("工程文件复制完毕, 开始打包[ipk]文件...", echo)
+ success("工程文件复制完毕.")
+ update("打包 [bold green]ipk[/bold green]文件...", echo)
_freeze.create_tar_gz(
str(build_dir),
str(ifp_path),
)
+ success(f"打包文件已存至 [blue]{ifp_path}[/blue].", echo)
- success(f"打包文件已存至[{ifp_path}].", echo)
- info("开始创建SHA256验证文件...", echo)
+ update("创建 SHA256 验证文件...", echo)
hash_bytes = ifp_hash(ifp_path)
- info(f"文件SHA256值为[{hash_bytes.hex()}].", echo)
+ info(f"文件 SHA256 值为 [purple]{hash_bytes.hex()}[/purple].", echo)
+
(dist_path / ipk.hash_name).write_bytes(hash_bytes)
- success(f"包[{ipk.name}]构建成功.", echo)
+ success(
+ f"包 [bold green]{ipk.name}[/bold green] [yellow]{ipk.version}[/yellow] 构建成功.",
+ echo,
+ )
return InfiniFrozenPackage(source_path=ifp_path, **{"name": ipk.name})
@@ -53,33 +61,39 @@ def extract_ipk(
hash_path = ifp_path.parent / (ifp_path.name + ".hash")
if not hash_path.exists():
- raise VerifyFailed(f"哈希文件[{hash_path}]不存在!")
+ raise VerifyFailed(f"哈希文件 [blue]{hash_path}[/blue] 不存在!")
- info("进行文件校验中...")
+ update("文件校验...")
if not ifp_verify(ifp_path, hash_path.read_bytes()):
raise VerifyFailed("文件完整性验证失败!")
- info("文件校验成功.")
+ success("文件校验成功.")
temp_dir = tempfile.TemporaryDirectory()
-
temp_path = Path(temp_dir.name).resolve() / "ifp"
- info(f"创建临时目录[{temp_dir}], 开始解压...", echo)
+ info(f"创建临时目录 [blue]{temp_dir}[/blue].")
+
+ update(f"解压 [blue]{ifp_path}[/blue]...", echo)
_freeze.extract_tar_gz(str(ifp_path), str(temp_path))
temp_pkg = InfiniProject(temp_path)
dist_pkg_path = dist_path / temp_pkg.name
- success(f"包[{temp_pkg.name}]解压完成.", echo)
+ success(
+ f"[bold green]{temp_pkg.name}[/bold green] [yellow]{temp_pkg.version}[/yellow] 已解压至缓存目录.",
+ echo,
+ )
if dist_pkg_path.exists():
- info("目标路径已存在, 清理旧文件...", echo)
+ update("目标路径已存在, 清理旧文件...", echo)
try:
shutil.rmtree(dist_pkg_path)
- except Exception as error:
- logger.exception(error) if echo else logger.error(error)
- success("旧规则包清理完毕.", echo)
+ except Exception as err:
+ return error(err)
+ success("旧规则包项目文件清理完毕.", echo)
- info(f"迁移文件至安装目录中...", echo)
+ update(f"迁移文件至目标目录...", echo)
shutil.move(temp_path, dist_pkg_path)
- info(f"任务完成, 开始清理临时文件...", echo)
+ success(f"文件已迁移至 [blue]{dist_pkg_path}[/blue].", echo)
+
+ update(f"清理临时文件...", echo)
temp_dir.cleanup()
- info(f"临时文件清理完毕.", echo)
+ success(f"临时文件清理完毕.", echo)
return InfiniProject(dist_pkg_path)