aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src
diff options
context:
space:
mode:
author苏向夜 <fu050409@163.com>2024-01-16 19:27:02 +0800
committer苏向夜 <fu050409@163.com>2024-01-16 19:27:02 +0800
commita0eced157f1acd3d57b57a7e92a2dd2bb8c7bc24 (patch)
treead6aec2975f1e477935982990ab471a52f45246e /src
downloadipm-a0eced157f1acd3d57b57a7e92a2dd2bb8c7bc24.tar.gz
ipm-a0eced157f1acd3d57b57a7e92a2dd2bb8c7bc24.zip
:tada: feat(milestone) IPM 立项
Diffstat (limited to 'src')
-rw-r--r--src/ipm/__init__.py1
-rw-r--r--src/ipm/__main__.py42
-rw-r--r--src/ipm/api.py51
-rw-r--r--src/ipm/const.py1
-rw-r--r--src/ipm/exceptions.py26
-rw-r--r--src/ipm/logging.py3
-rw-r--r--src/ipm/models/__init__.py0
-rw-r--r--src/ipm/models/ipk.py82
-rw-r--r--src/ipm/typing.py10
-rw-r--r--src/ipm/utils/__init__.py0
-rw-r--r--src/ipm/utils/_freeze.py17
-rw-r--r--src/ipm/utils/freeze.py65
-rw-r--r--src/ipm/utils/hash.py15
13 files changed, 313 insertions, 0 deletions
diff --git a/src/ipm/__init__.py b/src/ipm/__init__.py
new file mode 100644
index 0000000..061d700
--- /dev/null
+++ b/src/ipm/__init__.py
@@ -0,0 +1 @@
+__author__ = "苏向夜 <fu050409@163.com>"
diff --git a/src/ipm/__main__.py b/src/ipm/__main__.py
new file mode 100644
index 0000000..5d878c6
--- /dev/null
+++ b/src/ipm/__main__.py
@@ -0,0 +1,42 @@
+from .api import install, extract, build
+import argparse
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Infini Package Manager")
+ subparsers = parser.add_subparsers(title="subcommands", dest="command")
+
+ # Install command
+ install_parser = subparsers.add_parser("install", help="Install a package")
+ install_parser.add_argument("uri", help="Path or name of the package")
+ install_parser.add_argument("--index", help="Specify a custom package index")
+
+ # Extract command
+ extract_parser = subparsers.add_parser("extract", help="Extract a package")
+ extract_parser.add_argument(
+ "package", help="Path or name of the package to extract"
+ )
+ extract_parser.add_argument(
+ "--dist",
+ default=".",
+ help="Specify extraction directory (default: current directory)",
+ )
+
+ # Build command
+ build_parser = subparsers.add_parser("build", help="打包 Infini 规则包")
+ build_parser.add_argument(
+ "package", nargs="?", help="Path or name of the package to build", default="."
+ )
+
+ args = parser.parse_args()
+
+ if args.command == "install":
+ install(args.uri, args.index)
+ elif args.command == "extract":
+ extract(args.package, args.dist)
+ elif args.command == "build":
+ build(args.package)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/ipm/api.py b/src/ipm/api.py
new file mode 100644
index 0000000..af91dd5
--- /dev/null
+++ b/src/ipm/api.py
@@ -0,0 +1,51 @@
+from pathlib import Path
+from urllib.parse import urlparse
+from .typing import StrPath
+from .utils import freeze
+from .models.ipk import InfiniPackage
+from .exceptions import FileTypeMismatch
+
+import os
+import requests
+import tempfile
+
+
+def build(source_path: StrPath):
+ freeze.build_ipk(InfiniPackage(source_path))
+
+
+def extract(source_path: StrPath, dist_path: StrPath | None = None):
+ dist_path = (
+ Path(dist_path).resolve() if dist_path else Path(source_path).resolve().parent
+ )
+ freeze.extract_ipk(source_path, dist_path)
+
+
+def install(uri: str | None = "", index: str | None = ""):
+ home = Path.home() / ".ipm" / "src"
+ home.mkdir(parents=True, exist_ok=True)
+
+ if uri:
+ if os.path.isabs(uri):
+ if uri.endswith(".ipk"):
+ extract(Path(uri).resolve(), home)
+ else:
+ raise FileTypeMismatch("文件类型与预期[.ipk]不匹配.")
+ elif urlparse(uri).scheme and urlparse(uri).netloc:
+ ipk_bytes = requests.get(uri).content
+ hash_bytes = requests.get(uri.rstrip("/") + ".hash").content
+
+ temp_dir = tempfile.TemporaryDirectory()
+ temp_path = Path(temp_dir.name).resolve()
+
+ ipk_file = (temp_path / "temp.ipk").open("w+b")
+ ipk_file.write(ipk_bytes)
+ ipk_file.close()
+
+ hash_file = (temp_path / "temp.ipk.hash").open("w+b")
+ hash_file.write(hash_bytes)
+ hash_file.close()
+
+ extract(ipk_file, home)
+ else:
+ raise FileTypeMismatch("URI指向未知的位置.")
diff --git a/src/ipm/const.py b/src/ipm/const.py
new file mode 100644
index 0000000..51c64dd
--- /dev/null
+++ b/src/ipm/const.py
@@ -0,0 +1 @@
+DEBUG = True
diff --git a/src/ipm/exceptions.py b/src/ipm/exceptions.py
new file mode 100644
index 0000000..e6a01c0
--- /dev/null
+++ b/src/ipm/exceptions.py
@@ -0,0 +1,26 @@
+class IpmException(Exception):
+ """IPM Base Exception"""
+
+
+class FileNotFoundError(IpmException, FileNotFoundError):
+ """Raises when file not founded"""
+
+
+class FileExistsError(IpmException, FileExistsError):
+ """Raises when file not founded"""
+
+
+class SyntaxError(IpmException, SyntaxError):
+ """Syntax Error in config file"""
+
+
+class HashException(IpmException):
+ """Exception occured in hashing"""
+
+
+class VerifyFailed(IpmException):
+ """Failed to verify ipk file"""
+
+
+class FileTypeMismatch(IpmException):
+ """Ipk file type mismatch"""
diff --git a/src/ipm/logging.py b/src/ipm/logging.py
new file mode 100644
index 0000000..e2321dd
--- /dev/null
+++ b/src/ipm/logging.py
@@ -0,0 +1,3 @@
+from multilogging import multilogger
+
+logger = multilogger(name="IPM")
diff --git a/src/ipm/models/__init__.py b/src/ipm/models/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/ipm/models/__init__.py
diff --git a/src/ipm/models/ipk.py b/src/ipm/models/ipk.py
new file mode 100644
index 0000000..14dca06
--- /dev/null
+++ b/src/ipm/models/ipk.py
@@ -0,0 +1,82 @@
+from pathlib import Path
+from ..typing import List, Dict, Literal
+from ..exceptions import SyntaxError
+
+import toml
+
+
+class Author:
+ name: str
+ email: str
+
+ def __init__(self, name: str, email: str) -> None:
+ self.name = name
+ self.email = email
+
+
+class Authors:
+ authors: list[Author] = []
+
+ def __init__(self, authors: List[Dict[Literal["name", "email"], str]]) -> None:
+ for author in authors:
+ self.authors.append(Author(author["name"], author["email"]))
+
+ @property
+ def first(self) -> Author | None:
+ return None if not self.authors else self.authors[0]
+
+
+class InfiniPackage:
+ source_path: Path
+
+ name: str
+ version: str
+ description: str
+ authors: Authors
+ license: str
+
+ def __init__(self, path: str | Path = ".") -> None:
+ self.source_path = Path(path).resolve()
+ toml_path = self.source_path / "infini.toml"
+
+ data_load = toml.load(toml_path.open("r", encoding="utf-8"))
+ if "infini" not in data_load.keys():
+ raise SyntaxError("配置文件中缺少[infini]项.")
+
+ infini: dict = data_load["infini"]
+ self.name = infini.get("name") or ""
+ self.version = infini.get("version") or ""
+ self.description = infini.get("description") or ""
+ self.authors = Authors(infini.get("authors") or [])
+ self.license = infini.get("license") or "MIT"
+
+ @property
+ def default_name(self) -> str:
+ return f"{self.name}-{self.version}.ipk"
+
+ @property
+ def hash_name(self) -> str:
+ return f"{self.name}-{self.version}.ipk.hash"
+
+
+class InfiniFrozenPackage:
+ source_path: Path
+
+ name: str
+ version: str
+ description: str
+ authors: Authors
+ license: str
+
+ def __init__(self, source_path: str | Path, **kwargs) -> None:
+ self.source_path = Path(source_path).resolve()
+
+ self.name = kwargs.get("name") or ""
+ self.version = kwargs.get("version") or ""
+ self.description = kwargs.get("description") or ""
+ self.authors = Authors(kwargs.get("authors") or [])
+ self.license = kwargs.get("license") or "MIT"
+
+ @property
+ def hash_name(self) -> str:
+ return f"{self.source_path.name}.hash"
diff --git a/src/ipm/typing.py b/src/ipm/typing.py
new file mode 100644
index 0000000..dbf124a
--- /dev/null
+++ b/src/ipm/typing.py
@@ -0,0 +1,10 @@
+from pathlib import Path
+from typing import (
+ Literal as Literal,
+ List as List,
+ Dict as Dict,
+ Any as Any,
+ AnyStr as AnyStr,
+)
+
+StrPath = str | Path
diff --git a/src/ipm/utils/__init__.py b/src/ipm/utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/ipm/utils/__init__.py
diff --git a/src/ipm/utils/_freeze.py b/src/ipm/utils/_freeze.py
new file mode 100644
index 0000000..d6d5791
--- /dev/null
+++ b/src/ipm/utils/_freeze.py
@@ -0,0 +1,17 @@
+from ..logging import logger
+
+import tarfile
+import shutil
+
+
+def create_tar_gz(source_folder: str, output_filepath: str):
+ shutil.move(
+ shutil.make_archive(output_filepath + ".build", "gztar", source_folder),
+ output_filepath,
+ shutil.copy2,
+ )
+
+
+def extract_tar_gz(input_filename: str, output_folder: str):
+ with tarfile.open(input_filename, "r:gz") as tar:
+ tar.extractall(output_folder)
diff --git a/src/ipm/utils/freeze.py b/src/ipm/utils/freeze.py
new file mode 100644
index 0000000..6fd454f
--- /dev/null
+++ b/src/ipm/utils/freeze.py
@@ -0,0 +1,65 @@
+from pathlib import Path
+from . import _freeze
+from ..exceptions import FileNotFoundError, VerifyFailed
+from ..models.ipk import InfiniPackage, InfiniFrozenPackage
+from .hash import hash_ifp, verify_ifp
+from ..typing import StrPath
+
+import tempfile
+import shutil
+
+
+def build_ipk(
+ ipk: InfiniPackage,
+) -> InfiniFrozenPackage:
+ build_dir = ipk.source_path / ".build"
+ src_path = ipk.source_path / "src"
+ dist_path = ipk.source_path / "dist"
+ ifp_path = dist_path / ipk.default_name
+
+ if not ipk.source_path.exists():
+ raise FileNotFoundError(f"文件或文件夹[{ipk.source_path.resolve()}]不存在!")
+ if build_dir.exists():
+ shutil.rmtree(build_dir, ignore_errors=True)
+
+ dist_path.mkdir(parents=True, exist_ok=True)
+ build_dir.mkdir(parents=True, exist_ok=True)
+
+ shutil.copytree(src_path, build_dir / "src")
+ shutil.copy2(ipk.source_path / "infini.toml", build_dir / "infini.toml")
+
+ _freeze.create_tar_gz(
+ str(build_dir),
+ str(ifp_path),
+ )
+
+ (dist_path / ipk.hash_name).write_bytes(hash_ifp(ifp_path))
+
+ return InfiniFrozenPackage(source_path=ifp_path, **{"name": ipk.name})
+
+
+def extract_ipk(source_path: StrPath, dist_path: str | Path) -> InfiniPackage:
+ ifp_path = Path(source_path).resolve()
+ dist_path = Path(dist_path).resolve()
+ hash_path = ifp_path.parent / (ifp_path.name + ".hash")
+
+ if not hash_path.exists():
+ raise VerifyFailed("哈希文件不存在!")
+
+ if not verify_ifp(ifp_path, hash_path.read_bytes()):
+ raise VerifyFailed("文件完整性验证失败!")
+
+ temp_dir = tempfile.TemporaryDirectory()
+
+ temp_path = Path(temp_dir.name).resolve() / "ifp"
+ _freeze.extract_tar_gz(str(ifp_path), str(temp_path))
+ temp_pkg = InfiniPackage(temp_path)
+ dist_pkg_path = dist_path / temp_pkg.name
+
+ if dist_pkg_path.exists():
+ shutil.rmtree(dist_pkg_path)
+
+ shutil.move(temp_path, dist_pkg_path)
+
+ temp_dir.cleanup()
+ return InfiniPackage(dist_pkg_path)
diff --git a/src/ipm/utils/hash.py b/src/ipm/utils/hash.py
new file mode 100644
index 0000000..efd66ca
--- /dev/null
+++ b/src/ipm/utils/hash.py
@@ -0,0 +1,15 @@
+from pathlib import Path
+import hashlib
+
+
+def hash_ifp(lfp_path: str | Path, block_size=65536) -> bytes:
+ sha256 = hashlib.sha256()
+ with Path(lfp_path).resolve().open("rb") as file:
+ for block in iter(lambda: file.read(block_size), b""):
+ sha256.update(block)
+ return sha256.digest()
+
+
+def verify_ifp(lfp_path, expected_hash):
+ actual_hash = hash_ifp(lfp_path)
+ return actual_hash == expected_hash