From 57e639f755aa97d0952e136dc6925fd38962a396 Mon Sep 17 00:00:00 2001 From: 苏向夜 Date: Wed, 6 Mar 2024 20:17:15 +0800 Subject: refactor(ipm): prepare to rewrite and refactor new/init api --- pyproject.toml | 2 +- src/ipm/__main__.py | 196 ++++++------ src/ipm/api.py | 641 ++++++++++++++++++++------------------- src/ipm/const.py | 15 +- src/ipm/models/ipk.py | 153 ++++------ src/ipm/models/lock.py | 758 +++++++++++++++++++++++------------------------ src/ipm/utils/freeze.py | 17 +- src/ipm/utils/loader.py | 4 + src/ipm/utils/version.py | 5 +- tests/test_api.py | 50 ++-- 10 files changed, 924 insertions(+), 917 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d11a234..e060386 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ipdm" -version = "0.1.4" +version = "0.2.0-alpha.1" description = "Infini 包管理器" authors = [ { name = "苏向夜", email = "fu050409@163.com" }, diff --git a/src/ipm/__main__.py b/src/ipm/__main__.py index 4575986..976d59c 100644 --- a/src/ipm/__main__.py +++ b/src/ipm/__main__.py @@ -9,33 +9,33 @@ main = typer.Typer( ) -@main.command() -def check(): - """分析 Infini 项目并创建项目锁""" - try: - if api.check(".", echo=True): - tada() - except IpmException as err: - error(err, echo=True) - finally: - status.stop() - - -@main.command() -def install( - uri: str = typer.Argument(help="Infini 包的统一资源标识符"), - index: str = typer.Option(None, help="世界树服务器地址"), - upgrade: bool = typer.Option(False, "--upgrade", "-u", help="更新 Infini 包"), - force: bool = typer.Option(False, "--force", "-f", help="强制安装"), -): - """安装一个 Infini 规则包到此计算机""" - try: - if api.install(uri, index, upgrade=upgrade, force=force, echo=True): - tada() - except IpmException as err: - error(err, echo=True) - finally: - status.stop() +# @main.command() +# def check(): +# """分析 Infini 项目并创建项目锁""" +# try: +# if api.check(".", echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() + + +# @main.command() +# def install( +# uri: str = typer.Argument(help="Infini 包的统一资源标识符"), +# index: str = typer.Option(None, help="世界树服务器地址"), +# upgrade: bool = typer.Option(False, "--upgrade", "-u", help="更新 Infini 包"), +# force: bool = typer.Option(False, "--force", "-f", help="强制安装"), +# ): +# """安装一个 Infini 规则包到此计算机""" +# try: +# if api.install(uri, index, upgrade=upgrade, force=force, echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() @main.command() @@ -48,7 +48,7 @@ def extract( if api.extract(package, dist, echo=True): tada() except IpmException as err: - error(err, echo=True) + error(str(err), echo=True) finally: status.stop() @@ -60,7 +60,7 @@ def init(force: bool = typer.Option(None, "--force", "-f", help="强制初始化 if api.init(".", force, echo=True): tada() except IpmException as err: - error(err, echo=True) + error(str(err), echo=True) finally: status.stop() @@ -72,7 +72,7 @@ def new(package: str = typer.Argument(help="Infini 项目路径")): if api.new(package, echo=True): tada() except IpmException as err: - error(err, echo=True) + error(str(err), echo=True) finally: status.stop() @@ -84,87 +84,85 @@ def build(package: str = typer.Argument(".", help="Infini 项目路径")): if api.build(package, echo=True): tada() except IpmException as err: - error(err, echo=True) - finally: - status.stop() - - -@main.command() -def uninstall(package: str = typer.Argument(help="Infini 项目路径")): - """卸载 Infini 规则包""" - try: - if api.uninstall(package, echo=True): - tada() - except IpmException as err: - error(err, echo=True) + error(str(err), echo=True) finally: status.stop() -@main.command() -def require( - name: str = typer.Argument(help="Infini 包名"), - index: str = typer.Option(None, help="世界树服务器地址"), -): - """新增规则包依赖""" - try: - if api.require(name, index, echo=True): - tada() - except IpmException as err: - error(err, echo=True) - finally: - status.stop() - - -@main.command() -def unrequire(name: str = typer.Argument(help="Infini 包名")): - """删除规则包依赖""" - try: - if api.unrequire(name, echo=True): - tada() - except IpmException as err: - error(err, echo=True) - finally: - status.stop() - - -@main.command() -def add( - name: str = typer.Argument(help="Infini 包名"), - index: str = typer.Option(None, help="世界树服务器地址"), -): - """新增环境依赖""" - try: - if api.add(name, index=index, echo=True): - tada() - except IpmException as err: - error(err, echo=True) - finally: - status.stop() - - -@main.command() -def remove(name: str = typer.Argument(help="Infini 包名")): - """删除环境依赖""" - try: - if api.remove(name, echo=True): - tada() - except IpmException as err: - error(err, echo=True) - finally: - status.stop() +# @main.command() +# def uninstall(package: str = typer.Argument(help="Infini 项目路径")): +# """卸载 Infini 规则包""" +# try: +# if api.uninstall(package, echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() + + +# @main.command() +# def require( +# name: str = typer.Argument(help="Infini 包名"), +# index: str = typer.Option(None, help="世界树服务器地址"), +# ): +# """新增规则包依赖""" +# try: +# if api.require(name, index, echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() + + +# @main.command() +# def unrequire(name: str = typer.Argument(help="Infini 包名")): +# """删除规则包依赖""" +# try: +# if api.unrequire(name, echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() + + +# @main.command() +# def add( +# name: str = typer.Argument(help="Infini 包名"), +# index: str = typer.Option(None, help="世界树服务器地址"), +# ): +# """新增环境依赖""" +# try: +# if api.add(name, index=index, echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() + + +# @main.command() +# def remove(name: str = typer.Argument(help="Infini 包名")): +# """删除环境依赖""" +# try: +# if api.remove(name, echo=True): +# tada() +# except IpmException as err: +# error(str(err), echo=True) +# finally: +# status.stop() # TODO @main.command() -def collect(): - ... +def collect(): ... # TODO @main.command() -def update(): - ... +def update(): ... if __name__ == "__main__": diff --git a/src/ipm/api.py b/src/ipm/api.py index 60dc05b..54b9c76 100644 --- a/src/ipm/api.py +++ b/src/ipm/api.py @@ -1,58 +1,76 @@ from pathlib import Path -from .typing import StrPath -from .utils import freeze, urlparser, loader -from .const import INDEX, INDEX_PATH, STORAGE, SRC_HOME -from .logging import status, update, info, success, warning, error, confirm, ask -from .exceptions import ( +from typing import Optional +from ipm.typing import StrPath +from ipm.utils import freeze, urlparser, loader +from ipm.const import GITIGNORE, INDEX, INDEX_PATH, STORAGE, SRC_HOME +from ipm.logging import status, update, info, success, warning, error, confirm, ask +from ipm.exceptions import ( FileTypeMismatch, TomlLoadFailed, FileNotFoundError, PackageExsitsError, ) -from .utils.version import require_update -from .models.ipk import InfiniProject, InfiniFrozenPackage -from .models.lock import PackageLock, ProjectLock -from .models.index import Yggdrasil +from ipm.utils.version import require_update +from ipm.models.ipk import InfiniProject, InfiniFrozenPackage +# from ipm.models.lock import PackageLock, ProjectLock +from ipm.models.index import Yggdrasil -import toml +import tomlkit import shutil +import os +import configparser -def check(source_path: StrPath, echo: bool = False) -> bool: - info("项目环境检查...", echo) +# def check(source_path: StrPath, echo: bool = False) -> bool: +# info("项目环境检查...", echo) - update("检查环境...") - lock = ProjectLock( - Path(source_path).resolve() / "infini.lock", - auto_load=False, - ) - success("环境检查完毕.", echo) +# update("检查环境...") +# lock = ProjectLock( +# Path(source_path).resolve() / "infini.lock", +# auto_load=False, +# ) +# success("环境检查完毕.", echo) - update("写入依赖锁文件...", echo) - lock.init() - success("项目依赖锁写入完成.", echo) - return True +# update("写入依赖锁文件...", echo) +# lock.init() +# success("项目依赖锁写入完成.", echo) +# return True -def init(source_path: StrPath, force: bool = False, echo: bool = False) -> bool: +def init(target_path: StrPath, force: bool = False, echo: bool = False) -> bool: info("初始化规则包...", echo) update("检查环境...", echo) - source_path = Path(source_path).resolve() - if (toml_path := (source_path / "infini.toml")).exists() and not force: - return warning( + target_path = Path(target_path).resolve() + if (toml_path := (target_path / "infini.toml")).exists() and not force: + warning( f"无法在已经初始化的地址重新初始化, 如果你的确希望重新初始化, 请使用[bold red]`ipm init --force`[/bold red].", echo, ) + return False + email = username = None + gitconfig_path = Path.home().joinpath(".gitconfig") + if gitconfig_path.exists(): + config = configparser.ConfigParser() + config.read(str(gitconfig_path), encoding="utf-8") + if "user" in config.sections(): + email = config["user"].get("email") + username = config["user"].get("name") + email = email or (os.getlogin() + "@example.com") + username = username or os.getlogin() success("环境检查完毕.", echo) status.stop() - name = ask("项目名称", default=source_path.name, echo=echo) + name = ask("项目名称", default=target_path.name, echo=echo) version = ask("项目版本", default="0.1.0", echo=echo) - description = ask("项目简介", default=f"{source_path.name.upper()} 规则包", echo=echo) + description = ask( + "项目简介", default=f"{target_path.name.upper()} 规则包", echo=echo + ) + author_name = ask("作者名称", default=username, echo=echo) + author_email = ask("作者邮箱", default=email, echo=echo) license = ask("开源协议", default="MIT", echo=echo) default_entries = ["__init__.py", f"{name}.py"] - info("请选择你要使用的入口文件", echo) + info("请选择你要使用的入口文件:", echo) for index, default_entry in enumerate(default_entries): info(f"[bold cyan]{index}[/bold cyan]. [green]{default_entry}[/green]", echo) entry_file = ask( @@ -63,69 +81,77 @@ def init(source_path: StrPath, force: bool = False, echo: bool = False) -> bool: ) status.start() + toml_file = toml_path.open("w", encoding="utf-8") - toml.dump( - { - "infini": { - "name": name, - "version": version, - "description": description, - "license": license, - }, - "requirements": {}, - "dependencies": {}, - }, - toml_file, - ) + toml_data = tomlkit.document() + project = tomlkit.table() + project.add("name", name) + project.add("version", version) + project.add("description", description) + author = tomlkit.array() + author.add_line({"name": author_name, "email": author_email}) + author.multiline(True) + project.add("author", author) + project.add("license", license) + toml_data.add("project", project) + toml_data.add("requirements", tomlkit.table()) + toml_data.add("dependencies", tomlkit.table()) + tomlkit.dump(toml_data, toml_file) toml_file.close() - (source_path / "src").mkdir(parents=True, exist_ok=True) - (source_path / ".gitignores").write_text( - "# Initialized `.gitignores` generated by ipm.\n\n" - "# python\n" - "__pycache__/\n" - "*.pyc\n" - "# build\n" - "build/" - "dist/" - ) + source_path = target_path.joinpath("src") + gitignore_filepath = target_path.joinpath(".gitignore") + source_path.mkdir(parents=True, exist_ok=True) + if not gitignore_filepath.exists(): + (target_path / ".gitignore").write_text(GITIGNORE) + if entry_file == "0": - (source_path / "src" / "__init__.py").write_text( - "# Initialized `__init__.py` generated by ipm.\n" - "# This is the entry file for this Infini package." - "# Documents at https://ipm.hydroroll.team/\n\n" - "from .events import register as events_register\n" - "from .handlers import register as handlers_register\n" - "from .interceptors import register as interceptors_register\n" - ) - (source_path / "src" / "events.py").write_text( - "# Initialized `events.py` generated by ipm.\n" - "# Regists your text events and regist global variables here." - "# Documents at https://ipm.hydroroll.team/\n\n" - "from infini.register import Register\n\n\n" - "register = Register()\n" - ) - (source_path / "src" / "handlers.py").write_text( - "# Initialized `handlers.py` generated by ipm.\n" - "# Regists your handlers here." - "# Documents at https://ipm.hydroroll.team/\n\n" - "from infini.register import Register\n\n\n" - "register = Register()\n" - ) - (source_path / "src" / "interceptors.py").write_text( - "# Initialized `interceptors.py` generated by ipm.\n" - "# Regists your pre-interceptors and interceptors here." - "# Documents at https://ipm.hydroroll.team/\n\n" - "from infini.register import Register\n\n\n" - "register = Register()\n" - ) + init_filepath = source_path.joinpath("__init__.py") + events_filepath = source_path.joinpath("events.py") + handlers_filepath = source_path.joinpath("handlers.py") + interceptors_filepath = source_path.joinpath("interceptors.py") + + if not init_filepath.exists(): + init_filepath.write_text( + "# Initialized `events.py` generated by ipm.\n" + "# Regists your text events and regist global variables here.\n" + "# Documents at https://ipm.hydroroll.team/\n\n" + "from infini.register import Register\n\n\n" + "register = Register()\n" + ) + if not events_filepath.exists(): + events_filepath.write_text( + "# Initialized `events.py` generated by ipm.\n" + "# Regists your text events and regist global variables here.\n" + "# Documents at https://ipm.hydroroll.team/\n\n" + "from infini.register import Register\n\n\n" + "register = Register()\n" + ) + if not handlers_filepath.exists(): + handlers_filepath.write_text( + "# Initialized `handlers.py` generated by ipm.\n" + "# Regists your handlers here.\n" + "# Documents at https://ipm.hydroroll.team/\n\n" + "from infini.register import Register\n\n\n" + "register = Register()\n" + ) + if not interceptors_filepath.exists(): + interceptors_filepath.write_text( + "# Initialized `interceptors.py` generated by ipm.\n" + "# Regists your pre-interceptors and interceptors here.\n" + "# Documents at https://ipm.hydroroll.team/\n\n" + "from infini.register import Register\n\n\n" + "register = Register()\n" + ) else: - (source_path / "src" / default_entries[int(entry_file)]).write_text( - "# Initialized `__init__.py` generated by ipm.\n" - "# Documents at https://ipm.hydroroll.team/\n\n" - "from infini.register import Register\n\n\n" - "register = Register()\n" - ) + entry_filepath = source_path.joinpath(default_entries[int(entry_file)]) + if not entry_filepath.exists(): + entry_filepath.write_text( + f"# Initialized `{source_path.name}` generated by ipm.\n" + "# Documents at https://ipm.hydroroll.team/\n\n" + "from infini.register import Register\n\n\n" + "register = Register()\n" + ) return True @@ -135,9 +161,10 @@ def new(dist_path: StrPath, echo: bool = False) -> bool: update("检查环境...", echo) path = Path(dist_path).resolve() if path.exists(): - return warning( + warning( f"路径 [blue]{path.relative_to(Path('.').resolve())}[/blue] 已经存在.", echo ) + return False path.mkdir(parents=True, exist_ok=True) success("环境检查完毕.", echo) @@ -147,7 +174,7 @@ def new(dist_path: StrPath, echo: bool = False) -> bool: return True -def build(source_path: StrPath, echo: bool = False) -> InfiniFrozenPackage: +def build(source_path: StrPath, echo: bool = False) -> Optional[InfiniFrozenPackage]: info("构建规则包...", echo) update("检查构建环境...", echo) @@ -166,7 +193,7 @@ def build(source_path: StrPath, echo: bool = False) -> InfiniFrozenPackage: def extract( source_path: StrPath, dist_path: StrPath | None = None, echo: bool = False -) -> InfiniProject: +) -> Optional[InfiniProject]: info("解压缩规则包...", echo) dist_path = ( Path(dist_path).resolve() if dist_path else Path(source_path).resolve().parent @@ -174,216 +201,224 @@ def extract( return freeze.extract_ipk(source_path, dist_path, echo) -def install( - uri: str, - index: str = "", - upgrade: bool = False, - force: bool = False, - echo: bool = False, -) -> None: - info(f"安装规则包 [bold green]{uri}[/bold green]...", echo) - - update("初始化 [bold green]IPM[/bold green] 环境...", echo) - SRC_HOME.mkdir(parents=True, exist_ok=True) - index = index or INDEX - lock = PackageLock() - - if uri.split("==")[0].isalpha(): - # TODO 兼容 >= <= > < 等标识符 - splited_uri = uri.split("==") - name = splited_uri[0] - if len(splited_uri) == 1: - version = None - else: - version = splited_uri[1] - - yggdrasil = Yggdrasil(index) - - if not (lock_index := lock.get_index(index)): - yggdrasil.sync() - lock.add_index(index, yggdrasil.host, yggdrasil.uuid, dump=True) - else: - yggdrasil.init(INDEX_PATH / lock_index["uuid"]) - - if not (remote_ifp := yggdrasil.get(name, version=version)): - return warning(f"未能在世界树[{yggdrasil.index}]中搜寻到规则包[{uri}].", echo) - - ifp = loader.load_from_remote( - name, - baseurl=index, - filename=remote_ifp["source"], - echo=echo, - ) - elif urlparser.is_valid_url(uri): - filename = uri.rstrip("/").rpartition("/")[-1] - ifp = loader.load_from_remote( - "temp", - uri.rstrip("/").rpartition("/")[0], - filename, - echo=echo, - ) - else: - path = Path(uri).resolve() - update(f"检查文件 [blue]{path}[/blue]...", echo) - if not path.exists(): - raise FileNotFoundError("给定的 URI 路径不存在!") - - if uri.endswith(".ipk"): - ifp = loader.load_from_local(path) - else: - raise FileTypeMismatch("文件类型与预期[.ipk]不匹配.") - - if lock.has_package(ifp.name): - exists_version = lock.get_package(ifp.name)["version"] - if require_update(exists_version, ifp.version): - if not upgrade: - raise PackageExsitsError( - f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[blue]--upgrade[/blue]]参数进行升级." - ) - else: - info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...") - uninstall(ifp.name, is_confirm=True, echo=echo) - success(f"[{ifp.name}={exists_version}]卸载完成.") - else: - if not force: - raise PackageExsitsError( - f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[red]--force[/red]]参数进行强制覆盖." - ) - else: - info( - f"发现已经安装的 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 卸载中..." - ) - uninstall(ifp.name, is_confirm=True, echo=echo) - success( - f"[bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow]卸载完成..." - ) - lock.load(auto_completion=True) - - update( - f"安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow] 中...", - echo, - ) - ipk = extract(STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo) # TODO 安装依赖规则包 - success( - f"成功安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow].", - echo, - ) - - update("处理全局锁...", echo) - if not lock.has_storage(ifp.name): - lock.add_storage(ifp, dump=True) - lock.add_package(ipk, dump=True) - success("全局锁已处理完毕.", echo) - - success(f"包[{ipk.name}]成功安装在 [blue]{ipk.source_path}[/blue].", echo) - - -def uninstall(name: str, is_confirm: bool = False, echo: bool = False) -> None: - lock = PackageLock() - path = SRC_HOME / name.strip() - - if not (install_info := lock.get_package(name)): - return warning(f"由于 [bold green]{name}[/bold green]未被安装, 故忽略卸载操作.", echo) - - info( - f"发现已经安装的 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow].", - echo, - ) - update( - f"卸载 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow]..." - ) - warning(f"将会清理: [blue]{path}[/blue]", echo) - is_confirm: bool = ( - True if (confirm("你确定要继续?") if not is_confirm else is_confirm) else False - ) - if is_confirm: - shutil.rmtree(SRC_HOME / name, ignore_errors=True) - else: - return info("忽略.", echo) - - lock.remove_package(name, dump=True) - success( - f"规则包 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow] 卸载完成!", - echo, - ) - return True - - -def require(name: str, index: str = "", echo: bool = False) -> None: - info(f"新增规则包依赖: [bold green]{name}[/bold green]", echo) - update("检查环境中...", echo) - pkg_lock = PackageLock() - lock = ProjectLock() - ipk = InfiniProject() - success("环境检查完毕.", echo) - - splited_name = name.split("==") # TODO 支持 >= <= > < 标识 - name = splited_name[0] - - if len(splited_name) > 1: - version = splited_name[1] - else: - version = None - - if not pkg_lock.has_package(name): - info(f"检测到需要依赖的规则包[{name}]不存在, 安装中...", echo) - install( - f"name=={version}" if version else name, - index=index, - upgrade=True, - force=True, - echo=True, - ) - - info("处理 Infini 项目依赖锁...", echo) - ipk.require(name, version, dump=True) - lock.require(name, version, dump=True) # TODO 使用check替代 - success("规则包依赖新增完成.", echo) - return True - - -def unrequire(name: str, echo: bool = False): - info(f"删除规则包依赖: [bold green]{name}[/bold green]", echo) - info("处理 Infini 项目依赖锁...", echo) - ipk = InfiniProject() - lock = ProjectLock() - - ipk.unrequire(name, dump=True) - lock.unrequire(name, dump=True) - success("规则包依赖删除完成.", echo) - return True - - -def add(name: str, index: str = "", echo: bool = False) -> None: - info("检查环境中...", echo) - pkg_lock = PackageLock() - lock = ProjectLock() - ipk = InfiniProject() - info("环境检查完毕.", echo) - - splited_name = name.split("==") # TODO 支持 >= <= > < 标识 - name = splited_name[0] - - if len(splited_name) > 1: - version = splited_name[1] - else: - version = None - - if not pkg_lock.has_package(name): - # TODO pip 环境安装 - ... - - info("处理 Infini 项目依赖锁...", echo) - ipk.add(name, version, dump=True) - lock.add(name, version, dump=True) - success("环境依赖新增完成.", echo) - - -def remove(name: str, echo: bool = False): - info("处理 Infini 项目依赖锁...", echo) - ipk = InfiniProject() - lock = ProjectLock() - - ipk.remove(name, dump=True) - lock.remove(name, dump=True) - success("环境依赖删除完成.", echo) +# def install( +# uri: str, +# index: str = "", +# upgrade: bool = False, +# force: bool = False, +# echo: bool = False, +# ) -> None: +# info(f"安装规则包 [bold green]{uri}[/bold green]...", echo) + +# update("初始化 [bold green]IPM[/bold green] 环境...", echo) +# SRC_HOME.mkdir(parents=True, exist_ok=True) +# index = index or INDEX +# lock = PackageLock() + +# if uri.split("==")[0].isalpha(): +# # TODO 兼容 >= <= > < 等标识符 +# splited_uri = uri.split("==") +# name = splited_uri[0] +# if len(splited_uri) == 1: +# version = None +# else: +# version = splited_uri[1] + +# yggdrasil = Yggdrasil(index) + +# if not (lock_index := lock.get_index(index)): +# yggdrasil.sync() +# lock.add_index(index, yggdrasil.host, yggdrasil.uuid, dump=True) +# else: +# yggdrasil.init(INDEX_PATH / lock_index["uuid"]) + +# if not (remote_ifp := yggdrasil.get(name, version=version)): +# return warning( +# f"未能在世界树[{yggdrasil.index}]中搜寻到规则包[{uri}].", echo +# ) + +# ifp = loader.load_from_remote( +# name, +# baseurl=index, +# filename=remote_ifp["source"], +# echo=echo, +# ) +# elif urlparser.is_valid_url(uri): +# filename = uri.rstrip("/").rpartition("/")[-1] +# ifp = loader.load_from_remote( +# "temp", +# uri.rstrip("/").rpartition("/")[0], +# filename, +# echo=echo, +# ) +# else: +# path = Path(uri).resolve() +# update(f"检查文件 [blue]{path}[/blue]...", echo) +# if not path.exists(): +# raise FileNotFoundError("给定的 URI 路径不存在!") + +# if uri.endswith(".ipk"): +# ifp = loader.load_from_local(path) +# else: +# raise FileTypeMismatch("文件类型与预期[.ipk]不匹配.") + +# if lock.has_package(ifp.name): +# exists_version = lock.get_package(ifp.name)["version"] +# if require_update(exists_version, ifp.version): +# if not upgrade: +# raise PackageExsitsError( +# f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[blue]--upgrade[/blue]]参数进行升级." +# ) +# else: +# info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...") +# uninstall(ifp.name, is_confirm=True, echo=echo) +# success(f"[{ifp.name}={exists_version}]卸载完成.") +# else: +# if not force: +# raise PackageExsitsError( +# f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[red]--force[/red]]参数进行强制覆盖." +# ) +# else: +# info( +# f"发现已经安装的 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 卸载中..." +# ) +# uninstall(ifp.name, is_confirm=True, echo=echo) +# success( +# f"[bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow]卸载完成..." +# ) +# lock.load(auto_completion=True) + +# update( +# f"安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow] 中...", +# echo, +# ) +# ipk = extract( +# STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo +# ) # TODO 安装依赖规则包 +# success( +# f"成功安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow].", +# echo, +# ) + +# update("处理全局锁...", echo) +# if not lock.has_storage(ifp.name): +# lock.add_storage(ifp, dump=True) +# lock.add_package(ipk, dump=True) +# success("全局锁已处理完毕.", echo) + +# success(f"包[{ipk.name}]成功安装在 [blue]{ipk._source_path}[/blue].", echo) + + +# def uninstall(name: str, is_confirm: bool = False, echo: bool = False) -> bool: +# lock = PackageLock() +# path = SRC_HOME / name.strip() + +# if not (install_info := lock.get_package(name)): +# warning( +# f"由于 [bold green]{name}[/bold green]未被安装, 故忽略卸载操作.", echo +# ) +# return False + +# info( +# f"发现已经安装的 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow].", +# echo, +# ) +# update( +# f"卸载 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow]..." +# ) +# warning(f"将会清理: [blue]{path}[/blue]", echo) +# is_confirm: bool = ( +# True if (confirm("你确定要继续?") if not is_confirm else is_confirm) else False +# ) +# if is_confirm: +# shutil.rmtree(SRC_HOME / name, ignore_errors=True) +# else: +# info("忽略.", echo) +# return False + +# lock.remove_package(name, dump=True) +# success( +# f"规则包 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow] 卸载完成!", +# echo, +# ) +# return True + + +# def require(name: str, index: str = "", echo: bool = False) -> None: +# info(f"新增规则包依赖: [bold green]{name}[/bold green]", echo) +# update("检查环境中...", echo) +# pkg_lock = PackageLock() +# lock = ProjectLock() +# ipk = InfiniProject() +# success("环境检查完毕.", echo) + +# splited_name = name.split("==") # TODO 支持 >= <= > < 标识 +# name = splited_name[0] + +# if len(splited_name) > 1: +# version = splited_name[1] +# else: +# version = None + +# if not pkg_lock.has_package(name): +# info(f"检测到需要依赖的规则包[{name}]不存在, 安装中...", echo) +# install( +# f"name=={version}" if version else name, +# index=index, +# upgrade=True, +# force=True, +# echo=True, +# ) + +# info("处理 Infini 项目依赖锁...", echo) +# ipk.require(name, version, dump=True) +# lock.require(name, version, dump=True) # TODO 使用check替代 +# success("规则包依赖新增完成.", echo) +# return True + + +# def unrequire(name: str, echo: bool = False): +# info(f"删除规则包依赖: [bold green]{name}[/bold green]", echo) +# info("处理 Infini 项目依赖锁...", echo) +# ipk = InfiniProject() +# lock = ProjectLock() + +# ipk.unrequire(name, dump=True) +# lock.unrequire(name, dump=True) +# success("规则包依赖删除完成.", echo) +# return True + + +# def add(name: str, index: str = "", echo: bool = False) -> None: +# info("检查环境中...", echo) +# pkg_lock = PackageLock() +# lock = ProjectLock() +# ipk = InfiniProject() +# info("环境检查完毕.", echo) + +# splited_name = name.split("==") # TODO 支持 >= <= > < 标识 +# name = splited_name[0] + +# if len(splited_name) > 1: +# version = splited_name[1] +# else: +# version = None + +# if not pkg_lock.has_package(name): +# # TODO pip 环境安装 +# ... + +# info("处理 Infini 项目依赖锁...", echo) +# ipk.add(name, version, dump=True) +# lock.add(name, version, dump=True) +# success("环境依赖新增完成.", echo) + + +# def remove(name: str, echo: bool = False): +# info("处理 Infini 项目依赖锁...", echo) +# ipk = InfiniProject() +# lock = ProjectLock() + +# ipk.remove(name, dump=True) +# lock.remove(name, dump=True) +# success("环境依赖删除完成.", echo) diff --git a/src/ipm/const.py b/src/ipm/const.py index a45e08e..a94187f 100644 --- a/src/ipm/const.py +++ b/src/ipm/const.py @@ -11,5 +11,16 @@ STORAGE = IPM_PATH / "storage" INDEX_PATH = IPM_PATH / "index" # 文本参数 -ATTENSION = """# This file is @generated by IPM. -# It is not intended for manual editing.\n\n""" +ATTENSION = ( + "# This file is @generated by IPM.\n" "# It is not intended for manual editing.\n\n" +) +GITIGNORE = """# Initialized `.gitignores` @generated by IPM. +# Python +__pycache__/ +*.pyc + +# Builds + +build/ +dist/ +""" diff --git a/src/ipm/models/ipk.py b/src/ipm/models/ipk.py index dcdbfe8..62beb96 100644 --- a/src/ipm/models/ipk.py +++ b/src/ipm/models/ipk.py @@ -1,11 +1,13 @@ from pathlib import Path -from . import lock -from ..typing import List, Dict, Literal, StrPath, Any -from ..exceptions import SyntaxError, TomlLoadFailed +from typing import Optional, Union +from tomlkit.toml_document import TOMLDocument +from ipm.models import lock +from ipm.typing import List, Dict, Literal, StrPath +from ipm.exceptions import SyntaxError, TomlLoadFailed -import toml +import tomlkit -ProjectLock = lock.ProjectLock +# ProjectLock = lock.ProjectLock class Author: @@ -30,7 +32,7 @@ class Authors: class InfiniPackage: - source_path: Path + _source_path: Path name: str | None version: str | None @@ -45,100 +47,51 @@ class InfiniPackage: class InfiniProject(InfiniPackage): - name: str - version: str - description: str - authors: Authors - license: str + # name: str + # version: str + # description: str + # authors: Authors + # license: str - requirements: Dict[str, Any] - dependencies: Dict[str, Any] + # requirements: Dict[str, Any] + # dependencies: Dict[str, Any] def __init__(self, path: StrPath = ".") -> None: - self.source_path = Path(path).resolve() - toml_path = self.source_path / "infini.toml" - - try: - data_load = toml.load(toml_path.open("r", encoding="utf-8")) - except Exception as error: - raise TomlLoadFailed(f"项目文件[infini.toml]导入失败: {error}") from error - - if "infini" not in data_load.keys(): - raise SyntaxError("配置文件中缺少[infini]项.") - - infini: dict = data_load["infini"] - self.name = infini.get("name") or "" - self.version = infini.get("version") or "" - self.description = infini.get("description") or "" - self.authors = Authors(infini.get("authors") or []) - self.license = infini.get("license") or "MIT" - - self.requirements = data_load["requirements"] - self.dependencies = data_load["dependencies"] - - def dumps(self) -> dict: - return { - "infini": { - "name": self.source_path.name, - "version": self.version, - "description": self.description, - "authors": [ - {"name": author.name, "email": author.email} - for author in self.authors.authors - ], - "license": self.license, - }, - "requirements": self.requirements, - "dependencies": self.dependencies, - } - - def dump(self) -> str: - return toml.dump( - self.dumps(), (self.source_path / "infini.toml").open("w", encoding="utf-8") - ) + self._source_path = Path(path).resolve() + self._toml_path = self._source_path / "infini.toml" - def require(self, name: str, version: str, dump: bool = False) -> None: - for requirement in self.requirements.keys(): - if requirement == name: - self.requirements.pop(name) - break - - self.requirements[name] = version or "latest" - self.dump() if dump else "" - - def unrequire(self, name: str, dump: bool = False) -> None: - name = name.strip() - for requirement in self.requirements: - if requirement == name: - self.requirements.pop(name) - break - self.dump() if dump else "" - - def add(self, name: str, version: str, dump: bool = False) -> None: - for dependency in self.dependencies: - if "name" not in dependency.keys(): - raise SyntaxError("异常的锁文件!") - if dependency["name"] == name: - self.dependencies.remove(dependency) - break - - self.dependencies.append( - { - "name": name, - "version": version, - } - ) - self.dump() if dump else "" + if not self._source_path.exists(): + raise TomlLoadFailed(f"入口路径[{self._source_path}]不存在.") + elif not self._toml_path.exists(): + raise TomlLoadFailed( + f"项目文件[infini.toml]不存在, 请先使用`[bold green]ipm init[/]`初始化!" + ) + + self._data = self.read() + if "project" not in self._data: + raise TomlLoadFailed(f"项目文件[infini.toml]中不存在元数据!") + + def read(self) -> TOMLDocument: + if not self._toml_path.exists(): + return tomlkit.document() + return tomlkit.load(self._toml_path.open("r", encoding="utf-8")) - def remove(self, name: str, dump: bool = False) -> None: - name = name.strip() - for dependency in self.dependencies: - if "name" not in dependency.keys(): - raise SyntaxError("异常的锁文件!") - if dependency["name"] == name: - self.dependencies.remove(dependency) - break - self.dump() if dump else "" + def dumps(self) -> str: + return tomlkit.dumps(self._data) + + def dump(self) -> None: + return tomlkit.dump(self._data, self._toml_path.open("w", encoding="utf-8")) + + @property + def plain_dict(self) -> TOMLDocument: + return self._data + + @property + def name(self) -> str: + project = self._data["project"] + if not (name := project.get("name")): # type: ignore + raise ValueError("项目文件不存在`name`属性.") + return name class InfiniFrozenPackage(InfiniPackage): @@ -146,11 +99,13 @@ class InfiniFrozenPackage(InfiniPackage): version: str | None hash: str - def __init__(self, source_path: str | Path, **kwargs) -> None: - self.source_path = Path(source_path).resolve() + def __init__(self, source_path: Union[str, Path], **kwargs) -> None: + self._source_path = Path(source_path).resolve() self.hash = ( - (self.source_path.parent / (source_path.name + ".hash")).read_bytes().hex() + (self._source_path.parent / (self._source_path.name + ".hash")) + .read_bytes() + .hex() ) self.name = kwargs.get("name") @@ -158,4 +113,4 @@ class InfiniFrozenPackage(InfiniPackage): @property def hash_name(self) -> str: - return f"{self.source_path.name}.hash" + return f"{self._source_path.name}.hash" diff --git a/src/ipm/models/lock.py b/src/ipm/models/lock.py index 48552eb..900b909 100644 --- a/src/ipm/models/lock.py +++ b/src/ipm/models/lock.py @@ -6,385 +6,385 @@ from ..const import IPM_PATH, ATTENSION, SRC_HOME from ..exceptions import SyntaxError, FileNotFoundError from ..utils.uuid import generate_uuid -import toml +import tomlkit import socket -class IpmLock(metaclass=ABCMeta): - """IPM 锁基类""" - - source_path: Path - metadata: Dict[str, str] - - def __init__(self, source_path: StrPath, auto_load: bool = True) -> None: - IPM_PATH.mkdir(parents=True, exist_ok=True) - self.source_path = source_path - return self.load() if auto_load else None - - @abstractmethod - def load(self) -> None: - raise NotImplementedError - - @abstractmethod - def dumps(self) -> dict: - raise NotImplementedError - - def dump(self) -> str: - data_to_dump = ATTENSION + toml.dumps(self.dumps()) - source_file = self.source_path.open("w", encoding="utf-8") - source_file.write(data_to_dump) - source_file.close() - return data_to_dump - - -class PackageLock(IpmLock): - """全局包锁""" - - indexes: List[Dict[str, Any]] - packages: List[Dict[str, Any]] - storages: List[Dict[str, Any]] - - def __init__(self, source_path: StrPath | None = None) -> None: - super().__init__(source_path=source_path or IPM_PATH / "infini.lock") - - def load(self, auto_completion: bool = True) -> None: - if not self.source_path.exists(): - if auto_completion: - self.metadata = { - "host": socket.gethostname(), - "uuid": generate_uuid(), - } - self.indexes = [] - self.packages = [] - self.storages = [] - self.dumps() - else: - raise FileNotFoundError(f"锁文件不存在!") - else: - loaded_data = toml.load(self.source_path.open("r", encoding="utf-8")) - - if "metadata" not in loaded_data.keys(): - if not auto_completion: - raise SyntaxError(f"锁文件缺失[metadata]项.") - else: - self.metadata = { - "host": socket.gethostname(), - "uuid": generate_uuid(), - } - else: - self.metadata = loaded_data["metadata"] - - if "uuid" not in self.metadata.keys(): - if auto_completion: - self.metadata["uuid"] = generate_uuid() - else: - raise SyntaxError(f"锁文件[metadata]项缺失[uuid]项.") - - self.indexes = ( - loaded_data["indexes"] if "indexes" in loaded_data.keys() else [] - ) - self.packages = ( - loaded_data["packages"] if "packages" in loaded_data.keys() else [] - ) - self.storages = ( - loaded_data["storages"] if "storages" in loaded_data.keys() else [] - ) - - def dumps(self) -> dict: - return { - "metadata": self.metadata, - "indexes": self.indexes, - "packages": self.packages, - "storages": self.storages, - } - - def add_index( - self, index_uri: str, host: str, uuid: str, dump: bool = False - ) -> str: - for index in self.indexes: - if "index" not in index.keys(): - raise SyntaxError("异常的锁文件!") - if index["index"] == index_uri: - self.storages.remove(index) - break - - self.indexes.append( - { - "index": index_uri, - "host": host, - "uuid": uuid, - } - ) - return self.dump() if dump else "" - - def remove_index(self, uuid: str, dump: bool = False) -> str: - uuid = uuid.strip() - for index in self.indexes: - if "name" not in index.keys(): - raise SyntaxError("异常的锁文件!") - if index["name"] == uuid: - self.packages.remove(index) - break - return self.dump() if dump else "" - - def get_index(self, index_uri: str) -> Index | None: - index_uri = index_uri.strip() - for index in self.indexes: - if index["index"] == index_uri: - return index - return None - - def has_index(self, index_uri: str) -> bool: - index_uri = index_uri.strip() - for index in self.indexes: - if index["index"] == index_uri: - return True - return False - - def add_package(self, ipk: "ipk.InfiniProject", dump: bool = False) -> str: - for package in self.packages: - if "name" not in package.keys(): - raise SyntaxError("异常的锁文件!") - if package["name"] == ipk.name: - self.storages.remove(package) - break - - self.packages.append( - { - "name": ipk.name, - "version": ipk.version, - "description": ipk.description, - "requirements": ipk.requirements, - "dependencies": ipk.dependencies, - } - ) - return self.dump() if dump else "" - - def get_package(self, name: str) -> Package | None: - name = name.strip() - for package in self.packages: - if package["name"] == name: - return package - return None - - def has_package(self, name: str) -> bool: - name = name.strip() - for package in self.packages: - if package["name"] == name: - return True - return False - - def remove_package(self, name: str, dump: bool = False) -> str: - name = name.strip() - for package in self.packages: - if "name" not in package.keys(): - raise SyntaxError("异常的锁文件!") - if package["name"] == name: - self.packages.remove(package) - break - return self.dump() if dump else "" - - def add_storage(self, ifp: "ipk.InfiniFrozenPackage", dump: bool = False) -> str: - for storage in self.storages: - if "name" not in storage.keys(): - raise SyntaxError("异常的锁文件!") - if storage["name"] == ifp.name and storage["version"] == ifp.version: - self.storages.remove(storage) - break - - self.storages.append( - { - "name": ifp.name, - "version": ifp.version, - "hash": ifp.hash, - "source": f"storage/{ifp.name}/{ifp.default_name}", - } - ) - return self.dump() if dump else "" - - def remove_storage(self, name: str, dump: bool = False) -> str: - name = name.strip() - for storage in self.storages: - if "name" not in storage.keys(): - raise SyntaxError("异常的锁文件!") - if storage["name"] == name: - self.storages.remove(storage) - break - return self.dump() if dump else "" - - def get_storage(self, name: str) -> Storage | None: - name = name.strip() - for storage in self.storages: - if storage["name"] == name: - return storage - return None - - def get_particular_storage( - self, name: str, version: str | None = None - ) -> Storage | None: - name = name.strip() - for storage in self.storages: - if storage["name"] == name and ( - version is None or storage["version"] == version - ): - return storage - return None - - def has_storage(self, name: str) -> bool: - name = name.strip() - for storage in self.storages: - if storage["name"] == name: - return True - return False - - def get_ipk(self, name: str) -> "ipk.InfiniProject": - return ipk.InfiniProject(Path(SRC_HOME / name.strip()).resolve()) - - -class ProjectLock(IpmLock): - """IPM 项目锁""" - - requirements: List[Dict[str, Any]] - dependencies: List[Dict[str, Any]] - - def __init__( - self, source_path: StrPath | None = None, auto_load: bool = True - ) -> None: - super().__init__( - source_path=source_path or Path(".").resolve() / "infini.lock", - auto_load=auto_load, - ) - - def _check(self, name: str) -> Dict[str, Any]: # TODO 依照版本 version: str 区分 - requirements = [] - for name, sub_requirement in ( - PackageLock().get_ipk(name.strip()).requirements.values() - ): - exists = False - for requirement in requirements: - if requirement["name"] == name: - exists = True - break - if not exists: - sub_requirement["name"] = name - requirements.append(sub_requirement) - return requirements - - def _init(self) -> None: - for abs_requirement in self.requirements.copy(): - sub_requirements = self._check(abs_requirement["name"]) - for sub_requirement in sub_requirements: - for requirement in self.requirements: - if requirement["name"] == sub_requirement["name"]: - exists = True - break - if not exists: - self.requirements.append(sub_requirement) - - def init(self) -> None: - pkg = ipk.InfiniProject(self.source_path.parent) - self.metadata = { - "name": pkg.name, - "version": pkg.version, - "description": pkg.description, - "license": pkg.license, - } - # TODO 实现本地包 - self.requirements = [ - {"name": name, "version": version or "latest"} - for name, version in pkg.requirements.items() - ] - self.dependencies = [ - {"name": name, "version": version or "latest"} - for name, version in pkg.dependencies.items() - ] - self._init() - self.dump() - - def load(self) -> None: - pkg = ipk.InfiniProject() - - if not self.source_path.exists(): - self.init() - else: - loaded_data = toml.load(self.source_path.open("r", encoding="utf-8")) - - self.metadata = ( - loaded_data["metadata"] - if "metadata" in loaded_data.keys() - else { - "name": pkg.name, - "version": pkg.version, - "description": pkg.description, - "license": pkg.license, - } - ) - self.requirements = ( - loaded_data["requirements"] - if "requirements" in loaded_data.keys() - else [] - ) - self.dependencies = ( - loaded_data["dependencies"] - if "dependencies" in loaded_data.keys() - else [] - ) - - def dumps(self) -> Dict: - return { - "metadata": self.metadata, - "requirements": self.requirements, - "dependencies": self.dependencies, - } - - def require(self, name: str, version: str, dump: bool = False) -> None: - for requirement in self.requirements: - if "name" not in requirement.keys(): - raise SyntaxError("异常的锁文件!") - if requirement["name"] == name: - self.requirements.remove(requirement) - break - - self.requirements.append( - { - "name": name, - "version": version, - } - ) - return self.dump() if dump else "" - - def unrequire(self, name: str, dump: bool = False) -> None: - name = name.strip() - for requirement in self.requirements: - if "name" not in requirement.keys(): - raise SyntaxError("异常的锁文件!") - if requirement["name"] == name: - self.requirements.remove(requirement) - break - return self.dump() if dump else "" - - def add(self, name: str, version: str, dump: bool = False) -> None: - for dependency in self.dependencies: - if "name" not in dependency.keys(): - raise SyntaxError("异常的锁文件!") - if dependency["name"] == name: - self.dependencies.remove(dependency) - break - - self.dependencies.append( - { - "name": name, - "version": version, - } - ) - return self.dump() if dump else "" - - def remove(self, name: str, dump: bool = False) -> None: - name = name.strip() - for dependency in self.dependencies: - if "name" not in dependency.keys(): - raise SyntaxError("异常的锁文件!") - if dependency["name"] == name: - self.dependencies.remove(dependency) - break - return self.dump() if dump else "" +# class IpmLock(metaclass=ABCMeta): +# """IPM 锁基类""" + +# source_path: Path +# metadata: Dict[str, str] + +# def __init__(self, source_path: StrPath, auto_load: bool = True) -> None: +# IPM_PATH.mkdir(parents=True, exist_ok=True) +# self.source_path = source_path +# return self.load() if auto_load else None + +# @abstractmethod +# def load(self) -> None: +# raise NotImplementedError + +# @abstractmethod +# def dumps(self) -> dict: +# raise NotImplementedError + +# def dump(self) -> str: +# data_to_dump = ATTENSION + toml.dumps(self.dumps()) +# source_file = self.source_path.open("w", encoding="utf-8") +# source_file.write(data_to_dump) +# source_file.close() +# return data_to_dump + + +# class PackageLock(IpmLock): +# """全局包锁""" + +# indexes: List[Dict[str, Any]] +# packages: List[Dict[str, Any]] +# storages: List[Dict[str, Any]] + +# def __init__(self, source_path: StrPath | None = None) -> None: +# super().__init__(source_path=source_path or IPM_PATH / "infini.lock") + +# def load(self, auto_completion: bool = True) -> None: +# if not self.source_path.exists(): +# if auto_completion: +# self.metadata = { +# "host": socket.gethostname(), +# "uuid": generate_uuid(), +# } +# self.indexes = [] +# self.packages = [] +# self.storages = [] +# self.dumps() +# else: +# raise FileNotFoundError(f"锁文件不存在!") +# else: +# loaded_data = toml.load(self.source_path.open("r", encoding="utf-8")) + +# if "metadata" not in loaded_data.keys(): +# if not auto_completion: +# raise SyntaxError(f"锁文件缺失[metadata]项.") +# else: +# self.metadata = { +# "host": socket.gethostname(), +# "uuid": generate_uuid(), +# } +# else: +# self.metadata = loaded_data["metadata"] + +# if "uuid" not in self.metadata.keys(): +# if auto_completion: +# self.metadata["uuid"] = generate_uuid() +# else: +# raise SyntaxError(f"锁文件[metadata]项缺失[uuid]项.") + +# self.indexes = ( +# loaded_data["indexes"] if "indexes" in loaded_data.keys() else [] +# ) +# self.packages = ( +# loaded_data["packages"] if "packages" in loaded_data.keys() else [] +# ) +# self.storages = ( +# loaded_data["storages"] if "storages" in loaded_data.keys() else [] +# ) + +# def dumps(self) -> dict: +# return { +# "metadata": self.metadata, +# "indexes": self.indexes, +# "packages": self.packages, +# "storages": self.storages, +# } + +# def add_index( +# self, index_uri: str, host: str, uuid: str, dump: bool = False +# ) -> str: +# for index in self.indexes: +# if "index" not in index.keys(): +# raise SyntaxError("异常的锁文件!") +# if index["index"] == index_uri: +# self.storages.remove(index) +# break + +# self.indexes.append( +# { +# "index": index_uri, +# "host": host, +# "uuid": uuid, +# } +# ) +# return self.dump() if dump else "" + +# def remove_index(self, uuid: str, dump: bool = False) -> str: +# uuid = uuid.strip() +# for index in self.indexes: +# if "name" not in index.keys(): +# raise SyntaxError("异常的锁文件!") +# if index["name"] == uuid: +# self.packages.remove(index) +# break +# return self.dump() if dump else "" + +# def get_index(self, index_uri: str) -> Index | None: +# index_uri = index_uri.strip() +# for index in self.indexes: +# if index["index"] == index_uri: +# return index +# return None + +# def has_index(self, index_uri: str) -> bool: +# index_uri = index_uri.strip() +# for index in self.indexes: +# if index["index"] == index_uri: +# return True +# return False + +# def add_package(self, ipk: "ipk.InfiniProject", dump: bool = False) -> str: +# for package in self.packages: +# if "name" not in package.keys(): +# raise SyntaxError("异常的锁文件!") +# if package["name"] == ipk.name: +# self.storages.remove(package) +# break + +# self.packages.append( +# { +# "name": ipk.name, +# "version": ipk.version, +# "description": ipk.description, +# "requirements": ipk.requirements, +# "dependencies": ipk.dependencies, +# } +# ) +# return self.dump() if dump else "" + +# def get_package(self, name: str) -> Package | None: +# name = name.strip() +# for package in self.packages: +# if package["name"] == name: +# return package +# return None + +# def has_package(self, name: str) -> bool: +# name = name.strip() +# for package in self.packages: +# if package["name"] == name: +# return True +# return False + +# def remove_package(self, name: str, dump: bool = False) -> str: +# name = name.strip() +# for package in self.packages: +# if "name" not in package.keys(): +# raise SyntaxError("异常的锁文件!") +# if package["name"] == name: +# self.packages.remove(package) +# break +# return self.dump() if dump else "" + +# def add_storage(self, ifp: "ipk.InfiniFrozenPackage", dump: bool = False) -> str: +# for storage in self.storages: +# if "name" not in storage.keys(): +# raise SyntaxError("异常的锁文件!") +# if storage["name"] == ifp.name and storage["version"] == ifp.version: +# self.storages.remove(storage) +# break + +# self.storages.append( +# { +# "name": ifp.name, +# "version": ifp.version, +# "hash": ifp.hash, +# "source": f"storage/{ifp.name}/{ifp.default_name}", +# } +# ) +# return self.dump() if dump else "" + +# def remove_storage(self, name: str, dump: bool = False) -> str: +# name = name.strip() +# for storage in self.storages: +# if "name" not in storage.keys(): +# raise SyntaxError("异常的锁文件!") +# if storage["name"] == name: +# self.storages.remove(storage) +# break +# return self.dump() if dump else "" + +# def get_storage(self, name: str) -> Storage | None: +# name = name.strip() +# for storage in self.storages: +# if storage["name"] == name: +# return storage +# return None + +# def get_particular_storage( +# self, name: str, version: str | None = None +# ) -> Storage | None: +# name = name.strip() +# for storage in self.storages: +# if storage["name"] == name and ( +# version is None or storage["version"] == version +# ): +# return storage +# return None + +# def has_storage(self, name: str) -> bool: +# name = name.strip() +# for storage in self.storages: +# if storage["name"] == name: +# return True +# return False + +# def get_ipk(self, name: str) -> "ipk.InfiniProject": +# return ipk.InfiniProject(Path(SRC_HOME / name.strip()).resolve()) + + +# class ProjectLock(IpmLock): +# """IPM 项目锁""" + +# requirements: List[Dict[str, Any]] +# dependencies: List[Dict[str, Any]] + +# def __init__( +# self, source_path: StrPath | None = None, auto_load: bool = True +# ) -> None: +# super().__init__( +# source_path=source_path or Path(".").resolve() / "infini.lock", +# auto_load=auto_load, +# ) + +# def _check(self, name: str) -> Dict[str, Any]: # TODO 依照版本 version: str 区分 +# requirements = [] +# for name, sub_requirement in ( +# PackageLock().get_ipk(name.strip()).requirements.values() +# ): +# exists = False +# for requirement in requirements: +# if requirement["name"] == name: +# exists = True +# break +# if not exists: +# sub_requirement["name"] = name +# requirements.append(sub_requirement) +# return requirements + +# def _init(self) -> None: +# for abs_requirement in self.requirements.copy(): +# sub_requirements = self._check(abs_requirement["name"]) +# for sub_requirement in sub_requirements: +# for requirement in self.requirements: +# if requirement["name"] == sub_requirement["name"]: +# exists = True +# break +# if not exists: +# self.requirements.append(sub_requirement) + +# def init(self) -> None: +# pkg = ipk.InfiniProject(self.source_path.parent) +# self.metadata = { +# "name": pkg.name, +# "version": pkg.version, +# "description": pkg.description, +# "license": pkg.license, +# } +# # TODO 实现本地包 +# self.requirements = [ +# {"name": name, "version": version or "latest"} +# for name, version in pkg.requirements.items() +# ] +# self.dependencies = [ +# {"name": name, "version": version or "latest"} +# for name, version in pkg.dependencies.items() +# ] +# self._init() +# self.dump() + +# def load(self) -> None: +# pkg = ipk.InfiniProject() + +# if not self.source_path.exists(): +# self.init() +# else: +# loaded_data = toml.load(self.source_path.open("r", encoding="utf-8")) + +# self.metadata = ( +# loaded_data["metadata"] +# if "metadata" in loaded_data.keys() +# else { +# "name": pkg.name, +# "version": pkg.version, +# "description": pkg.description, +# "license": pkg.license, +# } +# ) +# self.requirements = ( +# loaded_data["requirements"] +# if "requirements" in loaded_data.keys() +# else [] +# ) +# self.dependencies = ( +# loaded_data["dependencies"] +# if "dependencies" in loaded_data.keys() +# else [] +# ) + +# def dumps(self) -> Dict: +# return { +# "metadata": self.metadata, +# "requirements": self.requirements, +# "dependencies": self.dependencies, +# } + +# def require(self, name: str, version: str, dump: bool = False) -> None: +# for requirement in self.requirements: +# if "name" not in requirement.keys(): +# raise SyntaxError("异常的锁文件!") +# if requirement["name"] == name: +# self.requirements.remove(requirement) +# break + +# self.requirements.append( +# { +# "name": name, +# "version": version, +# } +# ) +# return self.dump() if dump else "" + +# def unrequire(self, name: str, dump: bool = False) -> None: +# name = name.strip() +# for requirement in self.requirements: +# if "name" not in requirement.keys(): +# raise SyntaxError("异常的锁文件!") +# if requirement["name"] == name: +# self.requirements.remove(requirement) +# break +# return self.dump() if dump else "" + +# def add(self, name: str, version: str, dump: bool = False) -> None: +# for dependency in self.dependencies: +# if "name" not in dependency.keys(): +# raise SyntaxError("异常的锁文件!") +# if dependency["name"] == name: +# self.dependencies.remove(dependency) +# break + +# self.dependencies.append( +# { +# "name": name, +# "version": version, +# } +# ) +# return self.dump() if dump else "" + +# def remove(self, name: str, dump: bool = False) -> None: +# name = name.strip() +# for dependency in self.dependencies: +# if "name" not in dependency.keys(): +# raise SyntaxError("异常的锁文件!") +# if dependency["name"] == name: +# self.dependencies.remove(dependency) +# break +# return self.dump() if dump else "" diff --git a/src/ipm/utils/freeze.py b/src/ipm/utils/freeze.py index de66f57..e865e3c 100644 --- a/src/ipm/utils/freeze.py +++ b/src/ipm/utils/freeze.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Optional from ..exceptions import FileNotFoundError, VerifyFailed from ..models.ipk import InfiniProject, InfiniFrozenPackage from ..typing import StrPath @@ -12,13 +13,13 @@ import shutil def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage: update("构建开发环境...", echo) - build_dir = ipk.source_path / "build" - src_path = ipk.source_path / "src" - dist_path = ipk.source_path / "dist" + build_dir = ipk._source_path / "build" + src_path = ipk._source_path / "src" + dist_path = ipk._source_path / "dist" ifp_path = dist_path / ipk.default_name - if not ipk.source_path.exists(): - raise FileNotFoundError(f"文件或文件夹 [blue]{ipk.source_path.resolve()}[/blue]]不存在!") + if not ipk._source_path.exists(): + raise FileNotFoundError(f"文件或文件夹 [blue]{ipk._source_path.resolve()}[/blue]]不存在!") if build_dir.exists(): update("清理构建环境...") shutil.rmtree(build_dir, ignore_errors=True) @@ -30,7 +31,7 @@ def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage: update("复制工程文件...", echo) shutil.copytree(src_path, build_dir / "src") - shutil.copy2(ipk.source_path / "infini.toml", build_dir / "infini.toml") + shutil.copy2(ipk._source_path / "infini.toml", build_dir / "infini.toml") success("工程文件复制完毕.", echo) update("打包 [bold green]ipk[/bold green]文件...", echo) @@ -55,7 +56,7 @@ def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage: def extract_ipk( source_path: StrPath, dist_path: StrPath, echo: bool = False -) -> InfiniProject: +) -> Optional[InfiniProject]: ifp_path = Path(source_path).resolve() dist_path = Path(dist_path).resolve() hash_path = ifp_path.parent / (ifp_path.name + ".hash") @@ -86,7 +87,7 @@ def extract_ipk( try: shutil.rmtree(dist_pkg_path) except Exception as err: - return error(err) + return error(str(err)) success("旧规则包项目文件清理完毕.", echo) update(f"迁移文件至目标目录...", echo) diff --git a/src/ipm/utils/loader.py b/src/ipm/utils/loader.py index a751584..4bebf7d 100644 --- a/src/ipm/utils/loader.py +++ b/src/ipm/utils/loader.py @@ -35,6 +35,8 @@ def load_from_remote( info("解压中...", echo) temp_ipk = extract_ipk(ipk_path, temp_path) + if not temp_ipk: + raise RuntimeError("解压时出现异常.") success(f"包[{temp_ipk.name}]解压完成.") move_to = STORAGE / temp_ipk.name move_to.mkdir(parents=True, exist_ok=True) @@ -59,6 +61,8 @@ def load_from_local(source_path: Path) -> InfiniFrozenPackage: temp_path = Path(temp_dir.name).resolve() temp_ipk = extract_ipk(source_path, temp_path) + if not temp_ipk: + raise RuntimeError("解压时出现异常.") move_to = STORAGE / temp_ipk.name move_to.mkdir(parents=True, exist_ok=True) diff --git a/src/ipm/utils/version.py b/src/ipm/utils/version.py index 6088d1c..5fc6433 100644 --- a/src/ipm/utils/version.py +++ b/src/ipm/utils/version.py @@ -1,11 +1,13 @@ import re -def require_update(old_version: str, new_version: str): +def require_update(old_version: str, new_version: str) -> bool: regex = r"^(\d+)\.(\d+)\.(\d+)(.*?)?(\d+?)?$" old_tuple = re.match(regex, old_version) new_tuple = re.match(regex, new_version) + if not old_tuple or not new_tuple: + return False old_tuple_main = tuple(map(int, filter(None, old_tuple.group(1, 2, 3)))) new_tuple_main = tuple(map(int, filter(None, new_tuple.group(1, 2, 3)))) @@ -47,3 +49,4 @@ def require_update(old_version: str, new_version: str): return False elif old_pre_version == new_pre_version: return False + return False diff --git a/tests/test_api.py b/tests/test_api.py index f495ef5..caba429 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,38 +1,38 @@ -from ipm import api +# from ipm import api -import shutil +# import shutil -def test_new(): - api.new("test") - shutil.rmtree("test", ignore_errors=True) +# def test_new(): +# api.new("test") +# shutil.rmtree("test", ignore_errors=True) -def test_build(): - api.new("test") - api.build("test") - shutil.rmtree("test", ignore_errors=True) +# def test_build(): +# api.new("test") +# api.build("test") +# shutil.rmtree("test", ignore_errors=True) -def test_extract(): - api.new("test") - api.build("test") - api.extract("./test/dist/test-0.1.0.ipk") - shutil.rmtree("test", ignore_errors=True) +# def test_extract(): +# api.new("test") +# api.build("test") +# api.extract("./test/dist/test-0.1.0.ipk") +# shutil.rmtree("test", ignore_errors=True) -def test_install(): - api.new("test") - api.build("test") - api.install("./test/dist/test-0.1.0.ipk") - shutil.rmtree("test", ignore_errors=True) +# def test_install(): +# api.new("test") +# api.build("test") +# api.install("./test/dist/test-0.1.0.ipk") +# shutil.rmtree("test", ignore_errors=True) -def test_uninstall(): - api.uninstall("test", is_confirm=True) +# def test_uninstall(): +# api.uninstall("test", is_confirm=True) -def test_check(): - api.new("test") - api.check("test") - shutil.rmtree("test", ignore_errors=True) +# def test_check(): +# api.new("test") +# api.check("test") +# shutil.rmtree("test", ignore_errors=True) -- cgit v1.2.3-70-g09d2