aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--pyproject.toml2
-rw-r--r--src/ipm/__main__.py196
-rw-r--r--src/ipm/api.py641
-rw-r--r--src/ipm/const.py15
-rw-r--r--src/ipm/models/ipk.py153
-rw-r--r--src/ipm/models/lock.py758
-rw-r--r--src/ipm/utils/freeze.py17
-rw-r--r--src/ipm/utils/loader.py4
-rw-r--r--src/ipm/utils/version.py5
-rw-r--r--tests/test_api.py50
10 files changed, 924 insertions, 917 deletions
diff --git a/pyproject.toml b/pyproject.toml
index d11a234..e060386 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "ipdm"
-version = "0.1.4"
+version = "0.2.0-alpha.1"
description = "Infini 包管理器"
authors = [
{ name = "苏向夜", email = "fu050409@163.com" },
diff --git a/src/ipm/__main__.py b/src/ipm/__main__.py
index 4575986..976d59c 100644
--- a/src/ipm/__main__.py
+++ b/src/ipm/__main__.py
@@ -9,33 +9,33 @@ main = typer.Typer(
)
-@main.command()
-def check():
- """分析 Infini 项目并创建项目锁"""
- try:
- if api.check(".", echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
-
-
-@main.command()
-def install(
- uri: str = typer.Argument(help="Infini 包的统一资源标识符"),
- index: str = typer.Option(None, help="世界树服务器地址"),
- upgrade: bool = typer.Option(False, "--upgrade", "-u", help="更新 Infini 包"),
- force: bool = typer.Option(False, "--force", "-f", help="强制安装"),
-):
- """安装一个 Infini 规则包到此计算机"""
- try:
- if api.install(uri, index, upgrade=upgrade, force=force, echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
+# @main.command()
+# def check():
+# """分析 Infini 项目并创建项目锁"""
+# try:
+# if api.check(".", echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
+
+
+# @main.command()
+# def install(
+# uri: str = typer.Argument(help="Infini 包的统一资源标识符"),
+# index: str = typer.Option(None, help="世界树服务器地址"),
+# upgrade: bool = typer.Option(False, "--upgrade", "-u", help="更新 Infini 包"),
+# force: bool = typer.Option(False, "--force", "-f", help="强制安装"),
+# ):
+# """安装一个 Infini 规则包到此计算机"""
+# try:
+# if api.install(uri, index, upgrade=upgrade, force=force, echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
@main.command()
@@ -48,7 +48,7 @@ def extract(
if api.extract(package, dist, echo=True):
tada()
except IpmException as err:
- error(err, echo=True)
+ error(str(err), echo=True)
finally:
status.stop()
@@ -60,7 +60,7 @@ def init(force: bool = typer.Option(None, "--force", "-f", help="强制初始化
if api.init(".", force, echo=True):
tada()
except IpmException as err:
- error(err, echo=True)
+ error(str(err), echo=True)
finally:
status.stop()
@@ -72,7 +72,7 @@ def new(package: str = typer.Argument(help="Infini 项目路径")):
if api.new(package, echo=True):
tada()
except IpmException as err:
- error(err, echo=True)
+ error(str(err), echo=True)
finally:
status.stop()
@@ -84,87 +84,85 @@ def build(package: str = typer.Argument(".", help="Infini 项目路径")):
if api.build(package, echo=True):
tada()
except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
-
-
-@main.command()
-def uninstall(package: str = typer.Argument(help="Infini 项目路径")):
- """卸载 Infini 规则包"""
- try:
- if api.uninstall(package, echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
+ error(str(err), echo=True)
finally:
status.stop()
-@main.command()
-def require(
- name: str = typer.Argument(help="Infini 包名"),
- index: str = typer.Option(None, help="世界树服务器地址"),
-):
- """新增规则包依赖"""
- try:
- if api.require(name, index, echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
-
-
-@main.command()
-def unrequire(name: str = typer.Argument(help="Infini 包名")):
- """删除规则包依赖"""
- try:
- if api.unrequire(name, echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
-
-
-@main.command()
-def add(
- name: str = typer.Argument(help="Infini 包名"),
- index: str = typer.Option(None, help="世界树服务器地址"),
-):
- """新增环境依赖"""
- try:
- if api.add(name, index=index, echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
-
-
-@main.command()
-def remove(name: str = typer.Argument(help="Infini 包名")):
- """删除环境依赖"""
- try:
- if api.remove(name, echo=True):
- tada()
- except IpmException as err:
- error(err, echo=True)
- finally:
- status.stop()
+# @main.command()
+# def uninstall(package: str = typer.Argument(help="Infini 项目路径")):
+# """卸载 Infini 规则包"""
+# try:
+# if api.uninstall(package, echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
+
+
+# @main.command()
+# def require(
+# name: str = typer.Argument(help="Infini 包名"),
+# index: str = typer.Option(None, help="世界树服务器地址"),
+# ):
+# """新增规则包依赖"""
+# try:
+# if api.require(name, index, echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
+
+
+# @main.command()
+# def unrequire(name: str = typer.Argument(help="Infini 包名")):
+# """删除规则包依赖"""
+# try:
+# if api.unrequire(name, echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
+
+
+# @main.command()
+# def add(
+# name: str = typer.Argument(help="Infini 包名"),
+# index: str = typer.Option(None, help="世界树服务器地址"),
+# ):
+# """新增环境依赖"""
+# try:
+# if api.add(name, index=index, echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
+
+
+# @main.command()
+# def remove(name: str = typer.Argument(help="Infini 包名")):
+# """删除环境依赖"""
+# try:
+# if api.remove(name, echo=True):
+# tada()
+# except IpmException as err:
+# error(str(err), echo=True)
+# finally:
+# status.stop()
# TODO
@main.command()
-def collect():
- ...
+def collect(): ...
# TODO
@main.command()
-def update():
- ...
+def update(): ...
if __name__ == "__main__":
diff --git a/src/ipm/api.py b/src/ipm/api.py
index 60dc05b..54b9c76 100644
--- a/src/ipm/api.py
+++ b/src/ipm/api.py
@@ -1,58 +1,76 @@
from pathlib import Path
-from .typing import StrPath
-from .utils import freeze, urlparser, loader
-from .const import INDEX, INDEX_PATH, STORAGE, SRC_HOME
-from .logging import status, update, info, success, warning, error, confirm, ask
-from .exceptions import (
+from typing import Optional
+from ipm.typing import StrPath
+from ipm.utils import freeze, urlparser, loader
+from ipm.const import GITIGNORE, INDEX, INDEX_PATH, STORAGE, SRC_HOME
+from ipm.logging import status, update, info, success, warning, error, confirm, ask
+from ipm.exceptions import (
FileTypeMismatch,
TomlLoadFailed,
FileNotFoundError,
PackageExsitsError,
)
-from .utils.version import require_update
-from .models.ipk import InfiniProject, InfiniFrozenPackage
-from .models.lock import PackageLock, ProjectLock
-from .models.index import Yggdrasil
+from ipm.utils.version import require_update
+from ipm.models.ipk import InfiniProject, InfiniFrozenPackage
+# from ipm.models.lock import PackageLock, ProjectLock
+from ipm.models.index import Yggdrasil
-import toml
+import tomlkit
import shutil
+import os
+import configparser
-def check(source_path: StrPath, echo: bool = False) -> bool:
- info("项目环境检查...", echo)
+# def check(source_path: StrPath, echo: bool = False) -> bool:
+# info("项目环境检查...", echo)
- update("检查环境...")
- lock = ProjectLock(
- Path(source_path).resolve() / "infini.lock",
- auto_load=False,
- )
- success("环境检查完毕.", echo)
+# update("检查环境...")
+# lock = ProjectLock(
+# Path(source_path).resolve() / "infini.lock",
+# auto_load=False,
+# )
+# success("环境检查完毕.", echo)
- update("写入依赖锁文件...", echo)
- lock.init()
- success("项目依赖锁写入完成.", echo)
- return True
+# update("写入依赖锁文件...", echo)
+# lock.init()
+# success("项目依赖锁写入完成.", echo)
+# return True
-def init(source_path: StrPath, force: bool = False, echo: bool = False) -> bool:
+def init(target_path: StrPath, force: bool = False, echo: bool = False) -> bool:
info("初始化规则包...", echo)
update("检查环境...", echo)
- source_path = Path(source_path).resolve()
- if (toml_path := (source_path / "infini.toml")).exists() and not force:
- return warning(
+ target_path = Path(target_path).resolve()
+ if (toml_path := (target_path / "infini.toml")).exists() and not force:
+ warning(
f"无法在已经初始化的地址重新初始化, 如果你的确希望重新初始化, 请使用[bold red]`ipm init --force`[/bold red].",
echo,
)
+ return False
+ email = username = None
+ gitconfig_path = Path.home().joinpath(".gitconfig")
+ if gitconfig_path.exists():
+ config = configparser.ConfigParser()
+ config.read(str(gitconfig_path), encoding="utf-8")
+ if "user" in config.sections():
+ email = config["user"].get("email")
+ username = config["user"].get("name")
+ email = email or (os.getlogin() + "@example.com")
+ username = username or os.getlogin()
success("环境检查完毕.", echo)
status.stop()
- name = ask("项目名称", default=source_path.name, echo=echo)
+ name = ask("项目名称", default=target_path.name, echo=echo)
version = ask("项目版本", default="0.1.0", echo=echo)
- description = ask("项目简介", default=f"{source_path.name.upper()} 规则包", echo=echo)
+ description = ask(
+ "项目简介", default=f"{target_path.name.upper()} 规则包", echo=echo
+ )
+ author_name = ask("作者名称", default=username, echo=echo)
+ author_email = ask("作者邮箱", default=email, echo=echo)
license = ask("开源协议", default="MIT", echo=echo)
default_entries = ["__init__.py", f"{name}.py"]
- info("请选择你要使用的入口文件", echo)
+ info("请选择你要使用的入口文件:", echo)
for index, default_entry in enumerate(default_entries):
info(f"[bold cyan]{index}[/bold cyan]. [green]{default_entry}[/green]", echo)
entry_file = ask(
@@ -63,69 +81,77 @@ def init(source_path: StrPath, force: bool = False, echo: bool = False) -> bool:
)
status.start()
+
toml_file = toml_path.open("w", encoding="utf-8")
- toml.dump(
- {
- "infini": {
- "name": name,
- "version": version,
- "description": description,
- "license": license,
- },
- "requirements": {},
- "dependencies": {},
- },
- toml_file,
- )
+ toml_data = tomlkit.document()
+ project = tomlkit.table()
+ project.add("name", name)
+ project.add("version", version)
+ project.add("description", description)
+ author = tomlkit.array()
+ author.add_line({"name": author_name, "email": author_email})
+ author.multiline(True)
+ project.add("author", author)
+ project.add("license", license)
+ toml_data.add("project", project)
+ toml_data.add("requirements", tomlkit.table())
+ toml_data.add("dependencies", tomlkit.table())
+ tomlkit.dump(toml_data, toml_file)
toml_file.close()
- (source_path / "src").mkdir(parents=True, exist_ok=True)
- (source_path / ".gitignores").write_text(
- "# Initialized `.gitignores` generated by ipm.\n\n"
- "# python\n"
- "__pycache__/\n"
- "*.pyc\n"
- "# build\n"
- "build/"
- "dist/"
- )
+ source_path = target_path.joinpath("src")
+ gitignore_filepath = target_path.joinpath(".gitignore")
+ source_path.mkdir(parents=True, exist_ok=True)
+ if not gitignore_filepath.exists():
+ (target_path / ".gitignore").write_text(GITIGNORE)
+
if entry_file == "0":
- (source_path / "src" / "__init__.py").write_text(
- "# Initialized `__init__.py` generated by ipm.\n"
- "# This is the entry file for this Infini package."
- "# Documents at https://ipm.hydroroll.team/\n\n"
- "from .events import register as events_register\n"
- "from .handlers import register as handlers_register\n"
- "from .interceptors import register as interceptors_register\n"
- )
- (source_path / "src" / "events.py").write_text(
- "# Initialized `events.py` generated by ipm.\n"
- "# Regists your text events and regist global variables here."
- "# Documents at https://ipm.hydroroll.team/\n\n"
- "from infini.register import Register\n\n\n"
- "register = Register()\n"
- )
- (source_path / "src" / "handlers.py").write_text(
- "# Initialized `handlers.py` generated by ipm.\n"
- "# Regists your handlers here."
- "# Documents at https://ipm.hydroroll.team/\n\n"
- "from infini.register import Register\n\n\n"
- "register = Register()\n"
- )
- (source_path / "src" / "interceptors.py").write_text(
- "# Initialized `interceptors.py` generated by ipm.\n"
- "# Regists your pre-interceptors and interceptors here."
- "# Documents at https://ipm.hydroroll.team/\n\n"
- "from infini.register import Register\n\n\n"
- "register = Register()\n"
- )
+ init_filepath = source_path.joinpath("__init__.py")
+ events_filepath = source_path.joinpath("events.py")
+ handlers_filepath = source_path.joinpath("handlers.py")
+ interceptors_filepath = source_path.joinpath("interceptors.py")
+
+ if not init_filepath.exists():
+ init_filepath.write_text(
+ "# Initialized `events.py` generated by ipm.\n"
+ "# Regists your text events and regist global variables here.\n"
+ "# Documents at https://ipm.hydroroll.team/\n\n"
+ "from infini.register import Register\n\n\n"
+ "register = Register()\n"
+ )
+ if not events_filepath.exists():
+ events_filepath.write_text(
+ "# Initialized `events.py` generated by ipm.\n"
+ "# Regists your text events and regist global variables here.\n"
+ "# Documents at https://ipm.hydroroll.team/\n\n"
+ "from infini.register import Register\n\n\n"
+ "register = Register()\n"
+ )
+ if not handlers_filepath.exists():
+ handlers_filepath.write_text(
+ "# Initialized `handlers.py` generated by ipm.\n"
+ "# Regists your handlers here.\n"
+ "# Documents at https://ipm.hydroroll.team/\n\n"
+ "from infini.register import Register\n\n\n"
+ "register = Register()\n"
+ )
+ if not interceptors_filepath.exists():
+ interceptors_filepath.write_text(
+ "# Initialized `interceptors.py` generated by ipm.\n"
+ "# Regists your pre-interceptors and interceptors here.\n"
+ "# Documents at https://ipm.hydroroll.team/\n\n"
+ "from infini.register import Register\n\n\n"
+ "register = Register()\n"
+ )
else:
- (source_path / "src" / default_entries[int(entry_file)]).write_text(
- "# Initialized `__init__.py` generated by ipm.\n"
- "# Documents at https://ipm.hydroroll.team/\n\n"
- "from infini.register import Register\n\n\n"
- "register = Register()\n"
- )
+ entry_filepath = source_path.joinpath(default_entries[int(entry_file)])
+ if not entry_filepath.exists():
+ entry_filepath.write_text(
+ f"# Initialized `{source_path.name}` generated by ipm.\n"
+ "# Documents at https://ipm.hydroroll.team/\n\n"
+ "from infini.register import Register\n\n\n"
+ "register = Register()\n"
+ )
return True
@@ -135,9 +161,10 @@ def new(dist_path: StrPath, echo: bool = False) -> bool:
update("检查环境...", echo)
path = Path(dist_path).resolve()
if path.exists():
- return warning(
+ warning(
f"路径 [blue]{path.relative_to(Path('.').resolve())}[/blue] 已经存在.", echo
)
+ return False
path.mkdir(parents=True, exist_ok=True)
success("环境检查完毕.", echo)
@@ -147,7 +174,7 @@ def new(dist_path: StrPath, echo: bool = False) -> bool:
return True
-def build(source_path: StrPath, echo: bool = False) -> InfiniFrozenPackage:
+def build(source_path: StrPath, echo: bool = False) -> Optional[InfiniFrozenPackage]:
info("构建规则包...", echo)
update("检查构建环境...", echo)
@@ -166,7 +193,7 @@ def build(source_path: StrPath, echo: bool = False) -> InfiniFrozenPackage:
def extract(
source_path: StrPath, dist_path: StrPath | None = None, echo: bool = False
-) -> InfiniProject:
+) -> Optional[InfiniProject]:
info("解压缩规则包...", echo)
dist_path = (
Path(dist_path).resolve() if dist_path else Path(source_path).resolve().parent
@@ -174,216 +201,224 @@ def extract(
return freeze.extract_ipk(source_path, dist_path, echo)
-def install(
- uri: str,
- index: str = "",
- upgrade: bool = False,
- force: bool = False,
- echo: bool = False,
-) -> None:
- info(f"安装规则包 [bold green]{uri}[/bold green]...", echo)
-
- update("初始化 [bold green]IPM[/bold green] 环境...", echo)
- SRC_HOME.mkdir(parents=True, exist_ok=True)
- index = index or INDEX
- lock = PackageLock()
-
- if uri.split("==")[0].isalpha():
- # TODO 兼容 >= <= > < 等标识符
- splited_uri = uri.split("==")
- name = splited_uri[0]
- if len(splited_uri) == 1:
- version = None
- else:
- version = splited_uri[1]
-
- yggdrasil = Yggdrasil(index)
-
- if not (lock_index := lock.get_index(index)):
- yggdrasil.sync()
- lock.add_index(index, yggdrasil.host, yggdrasil.uuid, dump=True)
- else:
- yggdrasil.init(INDEX_PATH / lock_index["uuid"])
-
- if not (remote_ifp := yggdrasil.get(name, version=version)):
- return warning(f"未能在世界树[{yggdrasil.index}]中搜寻到规则包[{uri}].", echo)
-
- ifp = loader.load_from_remote(
- name,
- baseurl=index,
- filename=remote_ifp["source"],
- echo=echo,
- )
- elif urlparser.is_valid_url(uri):
- filename = uri.rstrip("/").rpartition("/")[-1]
- ifp = loader.load_from_remote(
- "temp",
- uri.rstrip("/").rpartition("/")[0],
- filename,
- echo=echo,
- )
- else:
- path = Path(uri).resolve()
- update(f"检查文件 [blue]{path}[/blue]...", echo)
- if not path.exists():
- raise FileNotFoundError("给定的 URI 路径不存在!")
-
- if uri.endswith(".ipk"):
- ifp = loader.load_from_local(path)
- else:
- raise FileTypeMismatch("文件类型与预期[.ipk]不匹配.")
-
- if lock.has_package(ifp.name):
- exists_version = lock.get_package(ifp.name)["version"]
- if require_update(exists_version, ifp.version):
- if not upgrade:
- raise PackageExsitsError(
- f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[blue]--upgrade[/blue]]参数进行升级."
- )
- else:
- info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...")
- uninstall(ifp.name, is_confirm=True, echo=echo)
- success(f"[{ifp.name}={exists_version}]卸载完成.")
- else:
- if not force:
- raise PackageExsitsError(
- f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[red]--force[/red]]参数进行强制覆盖."
- )
- else:
- info(
- f"发现已经安装的 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 卸载中..."
- )
- uninstall(ifp.name, is_confirm=True, echo=echo)
- success(
- f"[bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow]卸载完成..."
- )
- lock.load(auto_completion=True)
-
- update(
- f"安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow] 中...",
- echo,
- )
- ipk = extract(STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo) # TODO 安装依赖规则包
- success(
- f"成功安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow].",
- echo,
- )
-
- update("处理全局锁...", echo)
- if not lock.has_storage(ifp.name):
- lock.add_storage(ifp, dump=True)
- lock.add_package(ipk, dump=True)
- success("全局锁已处理完毕.", echo)
-
- success(f"包[{ipk.name}]成功安装在 [blue]{ipk.source_path}[/blue].", echo)
-
-
-def uninstall(name: str, is_confirm: bool = False, echo: bool = False) -> None:
- lock = PackageLock()
- path = SRC_HOME / name.strip()
-
- if not (install_info := lock.get_package(name)):
- return warning(f"由于 [bold green]{name}[/bold green]未被安装, 故忽略卸载操作.", echo)
-
- info(
- f"发现已经安装的 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow].",
- echo,
- )
- update(
- f"卸载 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow]..."
- )
- warning(f"将会清理: [blue]{path}[/blue]", echo)
- is_confirm: bool = (
- True if (confirm("你确定要继续?") if not is_confirm else is_confirm) else False
- )
- if is_confirm:
- shutil.rmtree(SRC_HOME / name, ignore_errors=True)
- else:
- return info("忽略.", echo)
-
- lock.remove_package(name, dump=True)
- success(
- f"规则包 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow] 卸载完成!",
- echo,
- )
- return True
-
-
-def require(name: str, index: str = "", echo: bool = False) -> None:
- info(f"新增规则包依赖: [bold green]{name}[/bold green]", echo)
- update("检查环境中...", echo)
- pkg_lock = PackageLock()
- lock = ProjectLock()
- ipk = InfiniProject()
- success("环境检查完毕.", echo)
-
- splited_name = name.split("==") # TODO 支持 >= <= > < 标识
- name = splited_name[0]
-
- if len(splited_name) > 1:
- version = splited_name[1]
- else:
- version = None
-
- if not pkg_lock.has_package(name):
- info(f"检测到需要依赖的规则包[{name}]不存在, 安装中...", echo)
- install(
- f"name=={version}" if version else name,
- index=index,
- upgrade=True,
- force=True,
- echo=True,
- )
-
- info("处理 Infini 项目依赖锁...", echo)
- ipk.require(name, version, dump=True)
- lock.require(name, version, dump=True) # TODO 使用check替代
- success("规则包依赖新增完成.", echo)
- return True
-
-
-def unrequire(name: str, echo: bool = False):
- info(f"删除规则包依赖: [bold green]{name}[/bold green]", echo)
- info("处理 Infini 项目依赖锁...", echo)
- ipk = InfiniProject()
- lock = ProjectLock()
-
- ipk.unrequire(name, dump=True)
- lock.unrequire(name, dump=True)
- success("规则包依赖删除完成.", echo)
- return True
-
-
-def add(name: str, index: str = "", echo: bool = False) -> None:
- info("检查环境中...", echo)
- pkg_lock = PackageLock()
- lock = ProjectLock()
- ipk = InfiniProject()
- info("环境检查完毕.", echo)
-
- splited_name = name.split("==") # TODO 支持 >= <= > < 标识
- name = splited_name[0]
-
- if len(splited_name) > 1:
- version = splited_name[1]
- else:
- version = None
-
- if not pkg_lock.has_package(name):
- # TODO pip 环境安装
- ...
-
- info("处理 Infini 项目依赖锁...", echo)
- ipk.add(name, version, dump=True)
- lock.add(name, version, dump=True)
- success("环境依赖新增完成.", echo)
-
-
-def remove(name: str, echo: bool = False):
- info("处理 Infini 项目依赖锁...", echo)
- ipk = InfiniProject()
- lock = ProjectLock()
-
- ipk.remove(name, dump=True)
- lock.remove(name, dump=True)
- success("环境依赖删除完成.", echo)
+# def install(
+# uri: str,
+# index: str = "",
+# upgrade: bool = False,
+# force: bool = False,
+# echo: bool = False,
+# ) -> None:
+# info(f"安装规则包 [bold green]{uri}[/bold green]...", echo)
+
+# update("初始化 [bold green]IPM[/bold green] 环境...", echo)
+# SRC_HOME.mkdir(parents=True, exist_ok=True)
+# index = index or INDEX
+# lock = PackageLock()
+
+# if uri.split("==")[0].isalpha():
+# # TODO 兼容 >= <= > < 等标识符
+# splited_uri = uri.split("==")
+# name = splited_uri[0]
+# if len(splited_uri) == 1:
+# version = None
+# else:
+# version = splited_uri[1]
+
+# yggdrasil = Yggdrasil(index)
+
+# if not (lock_index := lock.get_index(index)):
+# yggdrasil.sync()
+# lock.add_index(index, yggdrasil.host, yggdrasil.uuid, dump=True)
+# else:
+# yggdrasil.init(INDEX_PATH / lock_index["uuid"])
+
+# if not (remote_ifp := yggdrasil.get(name, version=version)):
+# return warning(
+# f"未能在世界树[{yggdrasil.index}]中搜寻到规则包[{uri}].", echo
+# )
+
+# ifp = loader.load_from_remote(
+# name,
+# baseurl=index,
+# filename=remote_ifp["source"],
+# echo=echo,
+# )
+# elif urlparser.is_valid_url(uri):
+# filename = uri.rstrip("/").rpartition("/")[-1]
+# ifp = loader.load_from_remote(
+# "temp",
+# uri.rstrip("/").rpartition("/")[0],
+# filename,
+# echo=echo,
+# )
+# else:
+# path = Path(uri).resolve()
+# update(f"检查文件 [blue]{path}[/blue]...", echo)
+# if not path.exists():
+# raise FileNotFoundError("给定的 URI 路径不存在!")
+
+# if uri.endswith(".ipk"):
+# ifp = loader.load_from_local(path)
+# else:
+# raise FileTypeMismatch("文件类型与预期[.ipk]不匹配.")
+
+# if lock.has_package(ifp.name):
+# exists_version = lock.get_package(ifp.name)["version"]
+# if require_update(exists_version, ifp.version):
+# if not upgrade:
+# raise PackageExsitsError(
+# f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[blue]--upgrade[/blue]]参数进行升级."
+# )
+# else:
+# info(f"发现已经安装的[{ifp.name}={exists_version}], 卸载中...")
+# uninstall(ifp.name, is_confirm=True, echo=echo)
+# success(f"[{ifp.name}={exists_version}]卸载完成.")
+# else:
+# if not force:
+# raise PackageExsitsError(
+# f"已经安装了 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 使用[[red]--force[/red]]参数进行强制覆盖."
+# )
+# else:
+# info(
+# f"发现已经安装的 [bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow], 卸载中..."
+# )
+# uninstall(ifp.name, is_confirm=True, echo=echo)
+# success(
+# f"[bold green]{ifp.name}[/bold green] [yellow]{exists_version}[/yellow]卸载完成..."
+# )
+# lock.load(auto_completion=True)
+
+# update(
+# f"安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow] 中...",
+# echo,
+# )
+# ipk = extract(
+# STORAGE / ifp.name / ifp.default_name, SRC_HOME, echo
+# ) # TODO 安装依赖规则包
+# success(
+# f"成功安装 [bold green]{ifp.name}[/bold green] [yellow]{ifp.version}[/yellow].",
+# echo,
+# )
+
+# update("处理全局锁...", echo)
+# if not lock.has_storage(ifp.name):
+# lock.add_storage(ifp, dump=True)
+# lock.add_package(ipk, dump=True)
+# success("全局锁已处理完毕.", echo)
+
+# success(f"包[{ipk.name}]成功安装在 [blue]{ipk._source_path}[/blue].", echo)
+
+
+# def uninstall(name: str, is_confirm: bool = False, echo: bool = False) -> bool:
+# lock = PackageLock()
+# path = SRC_HOME / name.strip()
+
+# if not (install_info := lock.get_package(name)):
+# warning(
+# f"由于 [bold green]{name}[/bold green]未被安装, 故忽略卸载操作.", echo
+# )
+# return False
+
+# info(
+# f"发现已经安装的 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow].",
+# echo,
+# )
+# update(
+# f"卸载 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow]..."
+# )
+# warning(f"将会清理: [blue]{path}[/blue]", echo)
+# is_confirm: bool = (
+# True if (confirm("你确定要继续?") if not is_confirm else is_confirm) else False
+# )
+# if is_confirm:
+# shutil.rmtree(SRC_HOME / name, ignore_errors=True)
+# else:
+# info("忽略.", echo)
+# return False
+
+# lock.remove_package(name, dump=True)
+# success(
+# f"规则包 [bold green]{name}[/bold green] [yellow]{install_info['version']}[/yellow] 卸载完成!",
+# echo,
+# )
+# return True
+
+
+# def require(name: str, index: str = "", echo: bool = False) -> None:
+# info(f"新增规则包依赖: [bold green]{name}[/bold green]", echo)
+# update("检查环境中...", echo)
+# pkg_lock = PackageLock()
+# lock = ProjectLock()
+# ipk = InfiniProject()
+# success("环境检查完毕.", echo)
+
+# splited_name = name.split("==") # TODO 支持 >= <= > < 标识
+# name = splited_name[0]
+
+# if len(splited_name) > 1:
+# version = splited_name[1]
+# else:
+# version = None
+
+# if not pkg_lock.has_package(name):
+# info(f"检测到需要依赖的规则包[{name}]不存在, 安装中...", echo)
+# install(
+# f"name=={version}" if version else name,
+# index=index,
+# upgrade=True,
+# force=True,
+# echo=True,
+# )
+
+# info("处理 Infini 项目依赖锁...", echo)
+# ipk.require(name, version, dump=True)
+# lock.require(name, version, dump=True) # TODO 使用check替代
+# success("规则包依赖新增完成.", echo)
+# return True
+
+
+# def unrequire(name: str, echo: bool = False):
+# info(f"删除规则包依赖: [bold green]{name}[/bold green]", echo)
+# info("处理 Infini 项目依赖锁...", echo)
+# ipk = InfiniProject()
+# lock = ProjectLock()
+
+# ipk.unrequire(name, dump=True)
+# lock.unrequire(name, dump=True)
+# success("规则包依赖删除完成.", echo)
+# return True
+
+
+# def add(name: str, index: str = "", echo: bool = False) -> None:
+# info("检查环境中...", echo)
+# pkg_lock = PackageLock()
+# lock = ProjectLock()
+# ipk = InfiniProject()
+# info("环境检查完毕.", echo)
+
+# splited_name = name.split("==") # TODO 支持 >= <= > < 标识
+# name = splited_name[0]
+
+# if len(splited_name) > 1:
+# version = splited_name[1]
+# else:
+# version = None
+
+# if not pkg_lock.has_package(name):
+# # TODO pip 环境安装
+# ...
+
+# info("处理 Infini 项目依赖锁...", echo)
+# ipk.add(name, version, dump=True)
+# lock.add(name, version, dump=True)
+# success("环境依赖新增完成.", echo)
+
+
+# def remove(name: str, echo: bool = False):
+# info("处理 Infini 项目依赖锁...", echo)
+# ipk = InfiniProject()
+# lock = ProjectLock()
+
+# ipk.remove(name, dump=True)
+# lock.remove(name, dump=True)
+# success("环境依赖删除完成.", echo)
diff --git a/src/ipm/const.py b/src/ipm/const.py
index a45e08e..a94187f 100644
--- a/src/ipm/const.py
+++ b/src/ipm/const.py
@@ -11,5 +11,16 @@ STORAGE = IPM_PATH / "storage"
INDEX_PATH = IPM_PATH / "index"
# 文本参数
-ATTENSION = """# This file is @generated by IPM.
-# It is not intended for manual editing.\n\n"""
+ATTENSION = (
+ "# This file is @generated by IPM.\n" "# It is not intended for manual editing.\n\n"
+)
+GITIGNORE = """# Initialized `.gitignores` @generated by IPM.
+# Python
+__pycache__/
+*.pyc
+
+# Builds
+
+build/
+dist/
+"""
diff --git a/src/ipm/models/ipk.py b/src/ipm/models/ipk.py
index dcdbfe8..62beb96 100644
--- a/src/ipm/models/ipk.py
+++ b/src/ipm/models/ipk.py
@@ -1,11 +1,13 @@
from pathlib import Path
-from . import lock
-from ..typing import List, Dict, Literal, StrPath, Any
-from ..exceptions import SyntaxError, TomlLoadFailed
+from typing import Optional, Union
+from tomlkit.toml_document import TOMLDocument
+from ipm.models import lock
+from ipm.typing import List, Dict, Literal, StrPath
+from ipm.exceptions import SyntaxError, TomlLoadFailed
-import toml
+import tomlkit
-ProjectLock = lock.ProjectLock
+# ProjectLock = lock.ProjectLock
class Author:
@@ -30,7 +32,7 @@ class Authors:
class InfiniPackage:
- source_path: Path
+ _source_path: Path
name: str | None
version: str | None
@@ -45,100 +47,51 @@ class InfiniPackage:
class InfiniProject(InfiniPackage):
- name: str
- version: str
- description: str
- authors: Authors
- license: str
+ # name: str
+ # version: str
+ # description: str
+ # authors: Authors
+ # license: str
- requirements: Dict[str, Any]
- dependencies: Dict[str, Any]
+ # requirements: Dict[str, Any]
+ # dependencies: Dict[str, Any]
def __init__(self, path: StrPath = ".") -> None:
- self.source_path = Path(path).resolve()
- toml_path = self.source_path / "infini.toml"
-
- try:
- data_load = toml.load(toml_path.open("r", encoding="utf-8"))
- except Exception as error:
- raise TomlLoadFailed(f"项目文件[infini.toml]导入失败: {error}") from error
-
- if "infini" not in data_load.keys():
- raise SyntaxError("配置文件中缺少[infini]项.")
-
- infini: dict = data_load["infini"]
- self.name = infini.get("name") or ""
- self.version = infini.get("version") or ""
- self.description = infini.get("description") or ""
- self.authors = Authors(infini.get("authors") or [])
- self.license = infini.get("license") or "MIT"
-
- self.requirements = data_load["requirements"]
- self.dependencies = data_load["dependencies"]
-
- def dumps(self) -> dict:
- return {
- "infini": {
- "name": self.source_path.name,
- "version": self.version,
- "description": self.description,
- "authors": [
- {"name": author.name, "email": author.email}
- for author in self.authors.authors
- ],
- "license": self.license,
- },
- "requirements": self.requirements,
- "dependencies": self.dependencies,
- }
-
- def dump(self) -> str:
- return toml.dump(
- self.dumps(), (self.source_path / "infini.toml").open("w", encoding="utf-8")
- )
+ self._source_path = Path(path).resolve()
+ self._toml_path = self._source_path / "infini.toml"
- def require(self, name: str, version: str, dump: bool = False) -> None:
- for requirement in self.requirements.keys():
- if requirement == name:
- self.requirements.pop(name)
- break
-
- self.requirements[name] = version or "latest"
- self.dump() if dump else ""
-
- def unrequire(self, name: str, dump: bool = False) -> None:
- name = name.strip()
- for requirement in self.requirements:
- if requirement == name:
- self.requirements.pop(name)
- break
- self.dump() if dump else ""
-
- def add(self, name: str, version: str, dump: bool = False) -> None:
- for dependency in self.dependencies:
- if "name" not in dependency.keys():
- raise SyntaxError("异常的锁文件!")
- if dependency["name"] == name:
- self.dependencies.remove(dependency)
- break
-
- self.dependencies.append(
- {
- "name": name,
- "version": version,
- }
- )
- self.dump() if dump else ""
+ if not self._source_path.exists():
+ raise TomlLoadFailed(f"入口路径[{self._source_path}]不存在.")
+ elif not self._toml_path.exists():
+ raise TomlLoadFailed(
+ f"项目文件[infini.toml]不存在, 请先使用`[bold green]ipm init[/]`初始化!"
+ )
+
+ self._data = self.read()
+ if "project" not in self._data:
+ raise TomlLoadFailed(f"项目文件[infini.toml]中不存在元数据!")
+
+ def read(self) -> TOMLDocument:
+ if not self._toml_path.exists():
+ return tomlkit.document()
+ return tomlkit.load(self._toml_path.open("r", encoding="utf-8"))
- def remove(self, name: str, dump: bool = False) -> None:
- name = name.strip()
- for dependency in self.dependencies:
- if "name" not in dependency.keys():
- raise SyntaxError("异常的锁文件!")
- if dependency["name"] == name:
- self.dependencies.remove(dependency)
- break
- self.dump() if dump else ""
+ def dumps(self) -> str:
+ return tomlkit.dumps(self._data)
+
+ def dump(self) -> None:
+ return tomlkit.dump(self._data, self._toml_path.open("w", encoding="utf-8"))
+
+ @property
+ def plain_dict(self) -> TOMLDocument:
+ return self._data
+
+ @property
+ def name(self) -> str:
+ project = self._data["project"]
+ if not (name := project.get("name")): # type: ignore
+ raise ValueError("项目文件不存在`name`属性.")
+ return name
class InfiniFrozenPackage(InfiniPackage):
@@ -146,11 +99,13 @@ class InfiniFrozenPackage(InfiniPackage):
version: str | None
hash: str
- def __init__(self, source_path: str | Path, **kwargs) -> None:
- self.source_path = Path(source_path).resolve()
+ def __init__(self, source_path: Union[str, Path], **kwargs) -> None:
+ self._source_path = Path(source_path).resolve()
self.hash = (
- (self.source_path.parent / (source_path.name + ".hash")).read_bytes().hex()
+ (self._source_path.parent / (self._source_path.name + ".hash"))
+ .read_bytes()
+ .hex()
)
self.name = kwargs.get("name")
@@ -158,4 +113,4 @@ class InfiniFrozenPackage(InfiniPackage):
@property
def hash_name(self) -> str:
- return f"{self.source_path.name}.hash"
+ return f"{self._source_path.name}.hash"
diff --git a/src/ipm/models/lock.py b/src/ipm/models/lock.py
index 48552eb..900b909 100644
--- a/src/ipm/models/lock.py
+++ b/src/ipm/models/lock.py
@@ -6,385 +6,385 @@ from ..const import IPM_PATH, ATTENSION, SRC_HOME
from ..exceptions import SyntaxError, FileNotFoundError
from ..utils.uuid import generate_uuid
-import toml
+import tomlkit
import socket
-class IpmLock(metaclass=ABCMeta):
- """IPM 锁基类"""
-
- source_path: Path
- metadata: Dict[str, str]
-
- def __init__(self, source_path: StrPath, auto_load: bool = True) -> None:
- IPM_PATH.mkdir(parents=True, exist_ok=True)
- self.source_path = source_path
- return self.load() if auto_load else None
-
- @abstractmethod
- def load(self) -> None:
- raise NotImplementedError
-
- @abstractmethod
- def dumps(self) -> dict:
- raise NotImplementedError
-
- def dump(self) -> str:
- data_to_dump = ATTENSION + toml.dumps(self.dumps())
- source_file = self.source_path.open("w", encoding="utf-8")
- source_file.write(data_to_dump)
- source_file.close()
- return data_to_dump
-
-
-class PackageLock(IpmLock):
- """全局包锁"""
-
- indexes: List[Dict[str, Any]]
- packages: List[Dict[str, Any]]
- storages: List[Dict[str, Any]]
-
- def __init__(self, source_path: StrPath | None = None) -> None:
- super().__init__(source_path=source_path or IPM_PATH / "infini.lock")
-
- def load(self, auto_completion: bool = True) -> None:
- if not self.source_path.exists():
- if auto_completion:
- self.metadata = {
- "host": socket.gethostname(),
- "uuid": generate_uuid(),
- }
- self.indexes = []
- self.packages = []
- self.storages = []
- self.dumps()
- else:
- raise FileNotFoundError(f"锁文件不存在!")
- else:
- loaded_data = toml.load(self.source_path.open("r", encoding="utf-8"))
-
- if "metadata" not in loaded_data.keys():
- if not auto_completion:
- raise SyntaxError(f"锁文件缺失[metadata]项.")
- else:
- self.metadata = {
- "host": socket.gethostname(),
- "uuid": generate_uuid(),
- }
- else:
- self.metadata = loaded_data["metadata"]
-
- if "uuid" not in self.metadata.keys():
- if auto_completion:
- self.metadata["uuid"] = generate_uuid()
- else:
- raise SyntaxError(f"锁文件[metadata]项缺失[uuid]项.")
-
- self.indexes = (
- loaded_data["indexes"] if "indexes" in loaded_data.keys() else []
- )
- self.packages = (
- loaded_data["packages"] if "packages" in loaded_data.keys() else []
- )
- self.storages = (
- loaded_data["storages"] if "storages" in loaded_data.keys() else []
- )
-
- def dumps(self) -> dict:
- return {
- "metadata": self.metadata,
- "indexes": self.indexes,
- "packages": self.packages,
- "storages": self.storages,
- }
-
- def add_index(
- self, index_uri: str, host: str, uuid: str, dump: bool = False
- ) -> str:
- for index in self.indexes:
- if "index" not in index.keys():
- raise SyntaxError("异常的锁文件!")
- if index["index"] == index_uri:
- self.storages.remove(index)
- break
-
- self.indexes.append(
- {
- "index": index_uri,
- "host": host,
- "uuid": uuid,
- }
- )
- return self.dump() if dump else ""
-
- def remove_index(self, uuid: str, dump: bool = False) -> str:
- uuid = uuid.strip()
- for index in self.indexes:
- if "name" not in index.keys():
- raise SyntaxError("异常的锁文件!")
- if index["name"] == uuid:
- self.packages.remove(index)
- break
- return self.dump() if dump else ""
-
- def get_index(self, index_uri: str) -> Index | None:
- index_uri = index_uri.strip()
- for index in self.indexes:
- if index["index"] == index_uri:
- return index
- return None
-
- def has_index(self, index_uri: str) -> bool:
- index_uri = index_uri.strip()
- for index in self.indexes:
- if index["index"] == index_uri:
- return True
- return False
-
- def add_package(self, ipk: "ipk.InfiniProject", dump: bool = False) -> str:
- for package in self.packages:
- if "name" not in package.keys():
- raise SyntaxError("异常的锁文件!")
- if package["name"] == ipk.name:
- self.storages.remove(package)
- break
-
- self.packages.append(
- {
- "name": ipk.name,
- "version": ipk.version,
- "description": ipk.description,
- "requirements": ipk.requirements,
- "dependencies": ipk.dependencies,
- }
- )
- return self.dump() if dump else ""
-
- def get_package(self, name: str) -> Package | None:
- name = name.strip()
- for package in self.packages:
- if package["name"] == name:
- return package
- return None
-
- def has_package(self, name: str) -> bool:
- name = name.strip()
- for package in self.packages:
- if package["name"] == name:
- return True
- return False
-
- def remove_package(self, name: str, dump: bool = False) -> str:
- name = name.strip()
- for package in self.packages:
- if "name" not in package.keys():
- raise SyntaxError("异常的锁文件!")
- if package["name"] == name:
- self.packages.remove(package)
- break
- return self.dump() if dump else ""
-
- def add_storage(self, ifp: "ipk.InfiniFrozenPackage", dump: bool = False) -> str:
- for storage in self.storages:
- if "name" not in storage.keys():
- raise SyntaxError("异常的锁文件!")
- if storage["name"] == ifp.name and storage["version"] == ifp.version:
- self.storages.remove(storage)
- break
-
- self.storages.append(
- {
- "name": ifp.name,
- "version": ifp.version,
- "hash": ifp.hash,
- "source": f"storage/{ifp.name}/{ifp.default_name}",
- }
- )
- return self.dump() if dump else ""
-
- def remove_storage(self, name: str, dump: bool = False) -> str:
- name = name.strip()
- for storage in self.storages:
- if "name" not in storage.keys():
- raise SyntaxError("异常的锁文件!")
- if storage["name"] == name:
- self.storages.remove(storage)
- break
- return self.dump() if dump else ""
-
- def get_storage(self, name: str) -> Storage | None:
- name = name.strip()
- for storage in self.storages:
- if storage["name"] == name:
- return storage
- return None
-
- def get_particular_storage(
- self, name: str, version: str | None = None
- ) -> Storage | None:
- name = name.strip()
- for storage in self.storages:
- if storage["name"] == name and (
- version is None or storage["version"] == version
- ):
- return storage
- return None
-
- def has_storage(self, name: str) -> bool:
- name = name.strip()
- for storage in self.storages:
- if storage["name"] == name:
- return True
- return False
-
- def get_ipk(self, name: str) -> "ipk.InfiniProject":
- return ipk.InfiniProject(Path(SRC_HOME / name.strip()).resolve())
-
-
-class ProjectLock(IpmLock):
- """IPM 项目锁"""
-
- requirements: List[Dict[str, Any]]
- dependencies: List[Dict[str, Any]]
-
- def __init__(
- self, source_path: StrPath | None = None, auto_load: bool = True
- ) -> None:
- super().__init__(
- source_path=source_path or Path(".").resolve() / "infini.lock",
- auto_load=auto_load,
- )
-
- def _check(self, name: str) -> Dict[str, Any]: # TODO 依照版本 version: str 区分
- requirements = []
- for name, sub_requirement in (
- PackageLock().get_ipk(name.strip()).requirements.values()
- ):
- exists = False
- for requirement in requirements:
- if requirement["name"] == name:
- exists = True
- break
- if not exists:
- sub_requirement["name"] = name
- requirements.append(sub_requirement)
- return requirements
-
- def _init(self) -> None:
- for abs_requirement in self.requirements.copy():
- sub_requirements = self._check(abs_requirement["name"])
- for sub_requirement in sub_requirements:
- for requirement in self.requirements:
- if requirement["name"] == sub_requirement["name"]:
- exists = True
- break
- if not exists:
- self.requirements.append(sub_requirement)
-
- def init(self) -> None:
- pkg = ipk.InfiniProject(self.source_path.parent)
- self.metadata = {
- "name": pkg.name,
- "version": pkg.version,
- "description": pkg.description,
- "license": pkg.license,
- }
- # TODO 实现本地包
- self.requirements = [
- {"name": name, "version": version or "latest"}
- for name, version in pkg.requirements.items()
- ]
- self.dependencies = [
- {"name": name, "version": version or "latest"}
- for name, version in pkg.dependencies.items()
- ]
- self._init()
- self.dump()
-
- def load(self) -> None:
- pkg = ipk.InfiniProject()
-
- if not self.source_path.exists():
- self.init()
- else:
- loaded_data = toml.load(self.source_path.open("r", encoding="utf-8"))
-
- self.metadata = (
- loaded_data["metadata"]
- if "metadata" in loaded_data.keys()
- else {
- "name": pkg.name,
- "version": pkg.version,
- "description": pkg.description,
- "license": pkg.license,
- }
- )
- self.requirements = (
- loaded_data["requirements"]
- if "requirements" in loaded_data.keys()
- else []
- )
- self.dependencies = (
- loaded_data["dependencies"]
- if "dependencies" in loaded_data.keys()
- else []
- )
-
- def dumps(self) -> Dict:
- return {
- "metadata": self.metadata,
- "requirements": self.requirements,
- "dependencies": self.dependencies,
- }
-
- def require(self, name: str, version: str, dump: bool = False) -> None:
- for requirement in self.requirements:
- if "name" not in requirement.keys():
- raise SyntaxError("异常的锁文件!")
- if requirement["name"] == name:
- self.requirements.remove(requirement)
- break
-
- self.requirements.append(
- {
- "name": name,
- "version": version,
- }
- )
- return self.dump() if dump else ""
-
- def unrequire(self, name: str, dump: bool = False) -> None:
- name = name.strip()
- for requirement in self.requirements:
- if "name" not in requirement.keys():
- raise SyntaxError("异常的锁文件!")
- if requirement["name"] == name:
- self.requirements.remove(requirement)
- break
- return self.dump() if dump else ""
-
- def add(self, name: str, version: str, dump: bool = False) -> None:
- for dependency in self.dependencies:
- if "name" not in dependency.keys():
- raise SyntaxError("异常的锁文件!")
- if dependency["name"] == name:
- self.dependencies.remove(dependency)
- break
-
- self.dependencies.append(
- {
- "name": name,
- "version": version,
- }
- )
- return self.dump() if dump else ""
-
- def remove(self, name: str, dump: bool = False) -> None:
- name = name.strip()
- for dependency in self.dependencies:
- if "name" not in dependency.keys():
- raise SyntaxError("异常的锁文件!")
- if dependency["name"] == name:
- self.dependencies.remove(dependency)
- break
- return self.dump() if dump else ""
+# class IpmLock(metaclass=ABCMeta):
+# """IPM 锁基类"""
+
+# source_path: Path
+# metadata: Dict[str, str]
+
+# def __init__(self, source_path: StrPath, auto_load: bool = True) -> None:
+# IPM_PATH.mkdir(parents=True, exist_ok=True)
+# self.source_path = source_path
+# return self.load() if auto_load else None
+
+# @abstractmethod
+# def load(self) -> None:
+# raise NotImplementedError
+
+# @abstractmethod
+# def dumps(self) -> dict:
+# raise NotImplementedError
+
+# def dump(self) -> str:
+# data_to_dump = ATTENSION + toml.dumps(self.dumps())
+# source_file = self.source_path.open("w", encoding="utf-8")
+# source_file.write(data_to_dump)
+# source_file.close()
+# return data_to_dump
+
+
+# class PackageLock(IpmLock):
+# """全局包锁"""
+
+# indexes: List[Dict[str, Any]]
+# packages: List[Dict[str, Any]]
+# storages: List[Dict[str, Any]]
+
+# def __init__(self, source_path: StrPath | None = None) -> None:
+# super().__init__(source_path=source_path or IPM_PATH / "infini.lock")
+
+# def load(self, auto_completion: bool = True) -> None:
+# if not self.source_path.exists():
+# if auto_completion:
+# self.metadata = {
+# "host": socket.gethostname(),
+# "uuid": generate_uuid(),
+# }
+# self.indexes = []
+# self.packages = []
+# self.storages = []
+# self.dumps()
+# else:
+# raise FileNotFoundError(f"锁文件不存在!")
+# else:
+# loaded_data = toml.load(self.source_path.open("r", encoding="utf-8"))
+
+# if "metadata" not in loaded_data.keys():
+# if not auto_completion:
+# raise SyntaxError(f"锁文件缺失[metadata]项.")
+# else:
+# self.metadata = {
+# "host": socket.gethostname(),
+# "uuid": generate_uuid(),
+# }
+# else:
+# self.metadata = loaded_data["metadata"]
+
+# if "uuid" not in self.metadata.keys():
+# if auto_completion:
+# self.metadata["uuid"] = generate_uuid()
+# else:
+# raise SyntaxError(f"锁文件[metadata]项缺失[uuid]项.")
+
+# self.indexes = (
+# loaded_data["indexes"] if "indexes" in loaded_data.keys() else []
+# )
+# self.packages = (
+# loaded_data["packages"] if "packages" in loaded_data.keys() else []
+# )
+# self.storages = (
+# loaded_data["storages"] if "storages" in loaded_data.keys() else []
+# )
+
+# def dumps(self) -> dict:
+# return {
+# "metadata": self.metadata,
+# "indexes": self.indexes,
+# "packages": self.packages,
+# "storages": self.storages,
+# }
+
+# def add_index(
+# self, index_uri: str, host: str, uuid: str, dump: bool = False
+# ) -> str:
+# for index in self.indexes:
+# if "index" not in index.keys():
+# raise SyntaxError("异常的锁文件!")
+# if index["index"] == index_uri:
+# self.storages.remove(index)
+# break
+
+# self.indexes.append(
+# {
+# "index": index_uri,
+# "host": host,
+# "uuid": uuid,
+# }
+# )
+# return self.dump() if dump else ""
+
+# def remove_index(self, uuid: str, dump: bool = False) -> str:
+# uuid = uuid.strip()
+# for index in self.indexes:
+# if "name" not in index.keys():
+# raise SyntaxError("异常的锁文件!")
+# if index["name"] == uuid:
+# self.packages.remove(index)
+# break
+# return self.dump() if dump else ""
+
+# def get_index(self, index_uri: str) -> Index | None:
+# index_uri = index_uri.strip()
+# for index in self.indexes:
+# if index["index"] == index_uri:
+# return index
+# return None
+
+# def has_index(self, index_uri: str) -> bool:
+# index_uri = index_uri.strip()
+# for index in self.indexes:
+# if index["index"] == index_uri:
+# return True
+# return False
+
+# def add_package(self, ipk: "ipk.InfiniProject", dump: bool = False) -> str:
+# for package in self.packages:
+# if "name" not in package.keys():
+# raise SyntaxError("异常的锁文件!")
+# if package["name"] == ipk.name:
+# self.storages.remove(package)
+# break
+
+# self.packages.append(
+# {
+# "name": ipk.name,
+# "version": ipk.version,
+# "description": ipk.description,
+# "requirements": ipk.requirements,
+# "dependencies": ipk.dependencies,
+# }
+# )
+# return self.dump() if dump else ""
+
+# def get_package(self, name: str) -> Package | None:
+# name = name.strip()
+# for package in self.packages:
+# if package["name"] == name:
+# return package
+# return None
+
+# def has_package(self, name: str) -> bool:
+# name = name.strip()
+# for package in self.packages:
+# if package["name"] == name:
+# return True
+# return False
+
+# def remove_package(self, name: str, dump: bool = False) -> str:
+# name = name.strip()
+# for package in self.packages:
+# if "name" not in package.keys():
+# raise SyntaxError("异常的锁文件!")
+# if package["name"] == name:
+# self.packages.remove(package)
+# break
+# return self.dump() if dump else ""
+
+# def add_storage(self, ifp: "ipk.InfiniFrozenPackage", dump: bool = False) -> str:
+# for storage in self.storages:
+# if "name" not in storage.keys():
+# raise SyntaxError("异常的锁文件!")
+# if storage["name"] == ifp.name and storage["version"] == ifp.version:
+# self.storages.remove(storage)
+# break
+
+# self.storages.append(
+# {
+# "name": ifp.name,
+# "version": ifp.version,
+# "hash": ifp.hash,
+# "source": f"storage/{ifp.name}/{ifp.default_name}",
+# }
+# )
+# return self.dump() if dump else ""
+
+# def remove_storage(self, name: str, dump: bool = False) -> str:
+# name = name.strip()
+# for storage in self.storages:
+# if "name" not in storage.keys():
+# raise SyntaxError("异常的锁文件!")
+# if storage["name"] == name:
+# self.storages.remove(storage)
+# break
+# return self.dump() if dump else ""
+
+# def get_storage(self, name: str) -> Storage | None:
+# name = name.strip()
+# for storage in self.storages:
+# if storage["name"] == name:
+# return storage
+# return None
+
+# def get_particular_storage(
+# self, name: str, version: str | None = None
+# ) -> Storage | None:
+# name = name.strip()
+# for storage in self.storages:
+# if storage["name"] == name and (
+# version is None or storage["version"] == version
+# ):
+# return storage
+# return None
+
+# def has_storage(self, name: str) -> bool:
+# name = name.strip()
+# for storage in self.storages:
+# if storage["name"] == name:
+# return True
+# return False
+
+# def get_ipk(self, name: str) -> "ipk.InfiniProject":
+# return ipk.InfiniProject(Path(SRC_HOME / name.strip()).resolve())
+
+
+# class ProjectLock(IpmLock):
+# """IPM 项目锁"""
+
+# requirements: List[Dict[str, Any]]
+# dependencies: List[Dict[str, Any]]
+
+# def __init__(
+# self, source_path: StrPath | None = None, auto_load: bool = True
+# ) -> None:
+# super().__init__(
+# source_path=source_path or Path(".").resolve() / "infini.lock",
+# auto_load=auto_load,
+# )
+
+# def _check(self, name: str) -> Dict[str, Any]: # TODO 依照版本 version: str 区分
+# requirements = []
+# for name, sub_requirement in (
+# PackageLock().get_ipk(name.strip()).requirements.values()
+# ):
+# exists = False
+# for requirement in requirements:
+# if requirement["name"] == name:
+# exists = True
+# break
+# if not exists:
+# sub_requirement["name"] = name
+# requirements.append(sub_requirement)
+# return requirements
+
+# def _init(self) -> None:
+# for abs_requirement in self.requirements.copy():
+# sub_requirements = self._check(abs_requirement["name"])
+# for sub_requirement in sub_requirements:
+# for requirement in self.requirements:
+# if requirement["name"] == sub_requirement["name"]:
+# exists = True
+# break
+# if not exists:
+# self.requirements.append(sub_requirement)
+
+# def init(self) -> None:
+# pkg = ipk.InfiniProject(self.source_path.parent)
+# self.metadata = {
+# "name": pkg.name,
+# "version": pkg.version,
+# "description": pkg.description,
+# "license": pkg.license,
+# }
+# # TODO 实现本地包
+# self.requirements = [
+# {"name": name, "version": version or "latest"}
+# for name, version in pkg.requirements.items()
+# ]
+# self.dependencies = [
+# {"name": name, "version": version or "latest"}
+# for name, version in pkg.dependencies.items()
+# ]
+# self._init()
+# self.dump()
+
+# def load(self) -> None:
+# pkg = ipk.InfiniProject()
+
+# if not self.source_path.exists():
+# self.init()
+# else:
+# loaded_data = toml.load(self.source_path.open("r", encoding="utf-8"))
+
+# self.metadata = (
+# loaded_data["metadata"]
+# if "metadata" in loaded_data.keys()
+# else {
+# "name": pkg.name,
+# "version": pkg.version,
+# "description": pkg.description,
+# "license": pkg.license,
+# }
+# )
+# self.requirements = (
+# loaded_data["requirements"]
+# if "requirements" in loaded_data.keys()
+# else []
+# )
+# self.dependencies = (
+# loaded_data["dependencies"]
+# if "dependencies" in loaded_data.keys()
+# else []
+# )
+
+# def dumps(self) -> Dict:
+# return {
+# "metadata": self.metadata,
+# "requirements": self.requirements,
+# "dependencies": self.dependencies,
+# }
+
+# def require(self, name: str, version: str, dump: bool = False) -> None:
+# for requirement in self.requirements:
+# if "name" not in requirement.keys():
+# raise SyntaxError("异常的锁文件!")
+# if requirement["name"] == name:
+# self.requirements.remove(requirement)
+# break
+
+# self.requirements.append(
+# {
+# "name": name,
+# "version": version,
+# }
+# )
+# return self.dump() if dump else ""
+
+# def unrequire(self, name: str, dump: bool = False) -> None:
+# name = name.strip()
+# for requirement in self.requirements:
+# if "name" not in requirement.keys():
+# raise SyntaxError("异常的锁文件!")
+# if requirement["name"] == name:
+# self.requirements.remove(requirement)
+# break
+# return self.dump() if dump else ""
+
+# def add(self, name: str, version: str, dump: bool = False) -> None:
+# for dependency in self.dependencies:
+# if "name" not in dependency.keys():
+# raise SyntaxError("异常的锁文件!")
+# if dependency["name"] == name:
+# self.dependencies.remove(dependency)
+# break
+
+# self.dependencies.append(
+# {
+# "name": name,
+# "version": version,
+# }
+# )
+# return self.dump() if dump else ""
+
+# def remove(self, name: str, dump: bool = False) -> None:
+# name = name.strip()
+# for dependency in self.dependencies:
+# if "name" not in dependency.keys():
+# raise SyntaxError("异常的锁文件!")
+# if dependency["name"] == name:
+# self.dependencies.remove(dependency)
+# break
+# return self.dump() if dump else ""
diff --git a/src/ipm/utils/freeze.py b/src/ipm/utils/freeze.py
index de66f57..e865e3c 100644
--- a/src/ipm/utils/freeze.py
+++ b/src/ipm/utils/freeze.py
@@ -1,4 +1,5 @@
from pathlib import Path
+from typing import Optional
from ..exceptions import FileNotFoundError, VerifyFailed
from ..models.ipk import InfiniProject, InfiniFrozenPackage
from ..typing import StrPath
@@ -12,13 +13,13 @@ import shutil
def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage:
update("构建开发环境...", echo)
- build_dir = ipk.source_path / "build"
- src_path = ipk.source_path / "src"
- dist_path = ipk.source_path / "dist"
+ build_dir = ipk._source_path / "build"
+ src_path = ipk._source_path / "src"
+ dist_path = ipk._source_path / "dist"
ifp_path = dist_path / ipk.default_name
- if not ipk.source_path.exists():
- raise FileNotFoundError(f"文件或文件夹 [blue]{ipk.source_path.resolve()}[/blue]]不存在!")
+ if not ipk._source_path.exists():
+ raise FileNotFoundError(f"文件或文件夹 [blue]{ipk._source_path.resolve()}[/blue]]不存在!")
if build_dir.exists():
update("清理构建环境...")
shutil.rmtree(build_dir, ignore_errors=True)
@@ -30,7 +31,7 @@ def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage:
update("复制工程文件...", echo)
shutil.copytree(src_path, build_dir / "src")
- shutil.copy2(ipk.source_path / "infini.toml", build_dir / "infini.toml")
+ shutil.copy2(ipk._source_path / "infini.toml", build_dir / "infini.toml")
success("工程文件复制完毕.", echo)
update("打包 [bold green]ipk[/bold green]文件...", echo)
@@ -55,7 +56,7 @@ def build_ipk(ipk: InfiniProject, echo: bool = False) -> InfiniFrozenPackage:
def extract_ipk(
source_path: StrPath, dist_path: StrPath, echo: bool = False
-) -> InfiniProject:
+) -> Optional[InfiniProject]:
ifp_path = Path(source_path).resolve()
dist_path = Path(dist_path).resolve()
hash_path = ifp_path.parent / (ifp_path.name + ".hash")
@@ -86,7 +87,7 @@ def extract_ipk(
try:
shutil.rmtree(dist_pkg_path)
except Exception as err:
- return error(err)
+ return error(str(err))
success("旧规则包项目文件清理完毕.", echo)
update(f"迁移文件至目标目录...", echo)
diff --git a/src/ipm/utils/loader.py b/src/ipm/utils/loader.py
index a751584..4bebf7d 100644
--- a/src/ipm/utils/loader.py
+++ b/src/ipm/utils/loader.py
@@ -35,6 +35,8 @@ def load_from_remote(
info("解压中...", echo)
temp_ipk = extract_ipk(ipk_path, temp_path)
+ if not temp_ipk:
+ raise RuntimeError("解压时出现异常.")
success(f"包[{temp_ipk.name}]解压完成.")
move_to = STORAGE / temp_ipk.name
move_to.mkdir(parents=True, exist_ok=True)
@@ -59,6 +61,8 @@ def load_from_local(source_path: Path) -> InfiniFrozenPackage:
temp_path = Path(temp_dir.name).resolve()
temp_ipk = extract_ipk(source_path, temp_path)
+ if not temp_ipk:
+ raise RuntimeError("解压时出现异常.")
move_to = STORAGE / temp_ipk.name
move_to.mkdir(parents=True, exist_ok=True)
diff --git a/src/ipm/utils/version.py b/src/ipm/utils/version.py
index 6088d1c..5fc6433 100644
--- a/src/ipm/utils/version.py
+++ b/src/ipm/utils/version.py
@@ -1,11 +1,13 @@
import re
-def require_update(old_version: str, new_version: str):
+def require_update(old_version: str, new_version: str) -> bool:
regex = r"^(\d+)\.(\d+)\.(\d+)(.*?)?(\d+?)?$"
old_tuple = re.match(regex, old_version)
new_tuple = re.match(regex, new_version)
+ if not old_tuple or not new_tuple:
+ return False
old_tuple_main = tuple(map(int, filter(None, old_tuple.group(1, 2, 3))))
new_tuple_main = tuple(map(int, filter(None, new_tuple.group(1, 2, 3))))
@@ -47,3 +49,4 @@ def require_update(old_version: str, new_version: str):
return False
elif old_pre_version == new_pre_version:
return False
+ return False
diff --git a/tests/test_api.py b/tests/test_api.py
index f495ef5..caba429 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -1,38 +1,38 @@
-from ipm import api
+# from ipm import api
-import shutil
+# import shutil
-def test_new():
- api.new("test")
- shutil.rmtree("test", ignore_errors=True)
+# def test_new():
+# api.new("test")
+# shutil.rmtree("test", ignore_errors=True)
-def test_build():
- api.new("test")
- api.build("test")
- shutil.rmtree("test", ignore_errors=True)
+# def test_build():
+# api.new("test")
+# api.build("test")
+# shutil.rmtree("test", ignore_errors=True)
-def test_extract():
- api.new("test")
- api.build("test")
- api.extract("./test/dist/test-0.1.0.ipk")
- shutil.rmtree("test", ignore_errors=True)
+# def test_extract():
+# api.new("test")
+# api.build("test")
+# api.extract("./test/dist/test-0.1.0.ipk")
+# shutil.rmtree("test", ignore_errors=True)
-def test_install():
- api.new("test")
- api.build("test")
- api.install("./test/dist/test-0.1.0.ipk")
- shutil.rmtree("test", ignore_errors=True)
+# def test_install():
+# api.new("test")
+# api.build("test")
+# api.install("./test/dist/test-0.1.0.ipk")
+# shutil.rmtree("test", ignore_errors=True)
-def test_uninstall():
- api.uninstall("test", is_confirm=True)
+# def test_uninstall():
+# api.uninstall("test", is_confirm=True)
-def test_check():
- api.new("test")
- api.check("test")
- shutil.rmtree("test", ignore_errors=True)
+# def test_check():
+# api.new("test")
+# api.check("test")
+# shutil.rmtree("test", ignore_errors=True)