From a0eced157f1acd3d57b57a7e92a2dd2bb8c7bc24 Mon Sep 17 00:00:00 2001 From: 苏向夜 Date: Tue, 16 Jan 2024 19:27:02 +0800 Subject: :tada: feat(milestone) IPM 立项 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ipm/__init__.py | 1 + src/ipm/__main__.py | 42 ++++++++++++++++++++++++ src/ipm/api.py | 51 ++++++++++++++++++++++++++++ src/ipm/const.py | 1 + src/ipm/exceptions.py | 26 +++++++++++++++ src/ipm/logging.py | 3 ++ src/ipm/models/__init__.py | 0 src/ipm/models/ipk.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++ src/ipm/typing.py | 10 ++++++ src/ipm/utils/__init__.py | 0 src/ipm/utils/_freeze.py | 17 ++++++++++ src/ipm/utils/freeze.py | 65 ++++++++++++++++++++++++++++++++++++ src/ipm/utils/hash.py | 15 +++++++++ 13 files changed, 313 insertions(+) create mode 100644 src/ipm/__init__.py create mode 100644 src/ipm/__main__.py create mode 100644 src/ipm/api.py create mode 100644 src/ipm/const.py create mode 100644 src/ipm/exceptions.py create mode 100644 src/ipm/logging.py create mode 100644 src/ipm/models/__init__.py create mode 100644 src/ipm/models/ipk.py create mode 100644 src/ipm/typing.py create mode 100644 src/ipm/utils/__init__.py create mode 100644 src/ipm/utils/_freeze.py create mode 100644 src/ipm/utils/freeze.py create mode 100644 src/ipm/utils/hash.py (limited to 'src') diff --git a/src/ipm/__init__.py b/src/ipm/__init__.py new file mode 100644 index 0000000..061d700 --- /dev/null +++ b/src/ipm/__init__.py @@ -0,0 +1 @@ +__author__ = "苏向夜 " diff --git a/src/ipm/__main__.py b/src/ipm/__main__.py new file mode 100644 index 0000000..5d878c6 --- /dev/null +++ b/src/ipm/__main__.py @@ -0,0 +1,42 @@ +from .api import install, extract, build +import argparse + + +def main(): + parser = argparse.ArgumentParser(description="Infini Package Manager") + subparsers = parser.add_subparsers(title="subcommands", dest="command") + + # Install command + install_parser = subparsers.add_parser("install", help="Install a package") + install_parser.add_argument("uri", help="Path or name of the package") + install_parser.add_argument("--index", help="Specify a custom package index") + + # Extract command + extract_parser = subparsers.add_parser("extract", help="Extract a package") + extract_parser.add_argument( + "package", help="Path or name of the package to extract" + ) + extract_parser.add_argument( + "--dist", + default=".", + help="Specify extraction directory (default: current directory)", + ) + + # Build command + build_parser = subparsers.add_parser("build", help="打包 Infini 规则包") + build_parser.add_argument( + "package", nargs="?", help="Path or name of the package to build", default="." + ) + + args = parser.parse_args() + + if args.command == "install": + install(args.uri, args.index) + elif args.command == "extract": + extract(args.package, args.dist) + elif args.command == "build": + build(args.package) + + +if __name__ == "__main__": + main() diff --git a/src/ipm/api.py b/src/ipm/api.py new file mode 100644 index 0000000..af91dd5 --- /dev/null +++ b/src/ipm/api.py @@ -0,0 +1,51 @@ +from pathlib import Path +from urllib.parse import urlparse +from .typing import StrPath +from .utils import freeze +from .models.ipk import InfiniPackage +from .exceptions import FileTypeMismatch + +import os +import requests +import tempfile + + +def build(source_path: StrPath): + freeze.build_ipk(InfiniPackage(source_path)) + + +def extract(source_path: StrPath, dist_path: StrPath | None = None): + dist_path = ( + Path(dist_path).resolve() if dist_path else Path(source_path).resolve().parent + ) + freeze.extract_ipk(source_path, dist_path) + + +def install(uri: str | None = "", index: str | None = ""): + home = Path.home() / ".ipm" / "src" + home.mkdir(parents=True, exist_ok=True) + + if uri: + if os.path.isabs(uri): + if uri.endswith(".ipk"): + extract(Path(uri).resolve(), home) + else: + raise FileTypeMismatch("文件类型与预期[.ipk]不匹配.") + elif urlparse(uri).scheme and urlparse(uri).netloc: + ipk_bytes = requests.get(uri).content + hash_bytes = requests.get(uri.rstrip("/") + ".hash").content + + temp_dir = tempfile.TemporaryDirectory() + temp_path = Path(temp_dir.name).resolve() + + ipk_file = (temp_path / "temp.ipk").open("w+b") + ipk_file.write(ipk_bytes) + ipk_file.close() + + hash_file = (temp_path / "temp.ipk.hash").open("w+b") + hash_file.write(hash_bytes) + hash_file.close() + + extract(ipk_file, home) + else: + raise FileTypeMismatch("URI指向未知的位置.") diff --git a/src/ipm/const.py b/src/ipm/const.py new file mode 100644 index 0000000..51c64dd --- /dev/null +++ b/src/ipm/const.py @@ -0,0 +1 @@ +DEBUG = True diff --git a/src/ipm/exceptions.py b/src/ipm/exceptions.py new file mode 100644 index 0000000..e6a01c0 --- /dev/null +++ b/src/ipm/exceptions.py @@ -0,0 +1,26 @@ +class IpmException(Exception): + """IPM Base Exception""" + + +class FileNotFoundError(IpmException, FileNotFoundError): + """Raises when file not founded""" + + +class FileExistsError(IpmException, FileExistsError): + """Raises when file not founded""" + + +class SyntaxError(IpmException, SyntaxError): + """Syntax Error in config file""" + + +class HashException(IpmException): + """Exception occured in hashing""" + + +class VerifyFailed(IpmException): + """Failed to verify ipk file""" + + +class FileTypeMismatch(IpmException): + """Ipk file type mismatch""" diff --git a/src/ipm/logging.py b/src/ipm/logging.py new file mode 100644 index 0000000..e2321dd --- /dev/null +++ b/src/ipm/logging.py @@ -0,0 +1,3 @@ +from multilogging import multilogger + +logger = multilogger(name="IPM") diff --git a/src/ipm/models/__init__.py b/src/ipm/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ipm/models/ipk.py b/src/ipm/models/ipk.py new file mode 100644 index 0000000..14dca06 --- /dev/null +++ b/src/ipm/models/ipk.py @@ -0,0 +1,82 @@ +from pathlib import Path +from ..typing import List, Dict, Literal +from ..exceptions import SyntaxError + +import toml + + +class Author: + name: str + email: str + + def __init__(self, name: str, email: str) -> None: + self.name = name + self.email = email + + +class Authors: + authors: list[Author] = [] + + def __init__(self, authors: List[Dict[Literal["name", "email"], str]]) -> None: + for author in authors: + self.authors.append(Author(author["name"], author["email"])) + + @property + def first(self) -> Author | None: + return None if not self.authors else self.authors[0] + + +class InfiniPackage: + source_path: Path + + name: str + version: str + description: str + authors: Authors + license: str + + def __init__(self, path: str | Path = ".") -> None: + self.source_path = Path(path).resolve() + toml_path = self.source_path / "infini.toml" + + data_load = toml.load(toml_path.open("r", encoding="utf-8")) + if "infini" not in data_load.keys(): + raise SyntaxError("配置文件中缺少[infini]项.") + + infini: dict = data_load["infini"] + self.name = infini.get("name") or "" + self.version = infini.get("version") or "" + self.description = infini.get("description") or "" + self.authors = Authors(infini.get("authors") or []) + self.license = infini.get("license") or "MIT" + + @property + def default_name(self) -> str: + return f"{self.name}-{self.version}.ipk" + + @property + def hash_name(self) -> str: + return f"{self.name}-{self.version}.ipk.hash" + + +class InfiniFrozenPackage: + source_path: Path + + name: str + version: str + description: str + authors: Authors + license: str + + def __init__(self, source_path: str | Path, **kwargs) -> None: + self.source_path = Path(source_path).resolve() + + self.name = kwargs.get("name") or "" + self.version = kwargs.get("version") or "" + self.description = kwargs.get("description") or "" + self.authors = Authors(kwargs.get("authors") or []) + self.license = kwargs.get("license") or "MIT" + + @property + def hash_name(self) -> str: + return f"{self.source_path.name}.hash" diff --git a/src/ipm/typing.py b/src/ipm/typing.py new file mode 100644 index 0000000..dbf124a --- /dev/null +++ b/src/ipm/typing.py @@ -0,0 +1,10 @@ +from pathlib import Path +from typing import ( + Literal as Literal, + List as List, + Dict as Dict, + Any as Any, + AnyStr as AnyStr, +) + +StrPath = str | Path diff --git a/src/ipm/utils/__init__.py b/src/ipm/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ipm/utils/_freeze.py b/src/ipm/utils/_freeze.py new file mode 100644 index 0000000..d6d5791 --- /dev/null +++ b/src/ipm/utils/_freeze.py @@ -0,0 +1,17 @@ +from ..logging import logger + +import tarfile +import shutil + + +def create_tar_gz(source_folder: str, output_filepath: str): + shutil.move( + shutil.make_archive(output_filepath + ".build", "gztar", source_folder), + output_filepath, + shutil.copy2, + ) + + +def extract_tar_gz(input_filename: str, output_folder: str): + with tarfile.open(input_filename, "r:gz") as tar: + tar.extractall(output_folder) diff --git a/src/ipm/utils/freeze.py b/src/ipm/utils/freeze.py new file mode 100644 index 0000000..6fd454f --- /dev/null +++ b/src/ipm/utils/freeze.py @@ -0,0 +1,65 @@ +from pathlib import Path +from . import _freeze +from ..exceptions import FileNotFoundError, VerifyFailed +from ..models.ipk import InfiniPackage, InfiniFrozenPackage +from .hash import hash_ifp, verify_ifp +from ..typing import StrPath + +import tempfile +import shutil + + +def build_ipk( + ipk: InfiniPackage, +) -> InfiniFrozenPackage: + build_dir = ipk.source_path / ".build" + src_path = ipk.source_path / "src" + dist_path = ipk.source_path / "dist" + ifp_path = dist_path / ipk.default_name + + if not ipk.source_path.exists(): + raise FileNotFoundError(f"文件或文件夹[{ipk.source_path.resolve()}]不存在!") + if build_dir.exists(): + shutil.rmtree(build_dir, ignore_errors=True) + + dist_path.mkdir(parents=True, exist_ok=True) + build_dir.mkdir(parents=True, exist_ok=True) + + shutil.copytree(src_path, build_dir / "src") + shutil.copy2(ipk.source_path / "infini.toml", build_dir / "infini.toml") + + _freeze.create_tar_gz( + str(build_dir), + str(ifp_path), + ) + + (dist_path / ipk.hash_name).write_bytes(hash_ifp(ifp_path)) + + return InfiniFrozenPackage(source_path=ifp_path, **{"name": ipk.name}) + + +def extract_ipk(source_path: StrPath, dist_path: str | Path) -> InfiniPackage: + ifp_path = Path(source_path).resolve() + dist_path = Path(dist_path).resolve() + hash_path = ifp_path.parent / (ifp_path.name + ".hash") + + if not hash_path.exists(): + raise VerifyFailed("哈希文件不存在!") + + if not verify_ifp(ifp_path, hash_path.read_bytes()): + raise VerifyFailed("文件完整性验证失败!") + + temp_dir = tempfile.TemporaryDirectory() + + temp_path = Path(temp_dir.name).resolve() / "ifp" + _freeze.extract_tar_gz(str(ifp_path), str(temp_path)) + temp_pkg = InfiniPackage(temp_path) + dist_pkg_path = dist_path / temp_pkg.name + + if dist_pkg_path.exists(): + shutil.rmtree(dist_pkg_path) + + shutil.move(temp_path, dist_pkg_path) + + temp_dir.cleanup() + return InfiniPackage(dist_pkg_path) diff --git a/src/ipm/utils/hash.py b/src/ipm/utils/hash.py new file mode 100644 index 0000000..efd66ca --- /dev/null +++ b/src/ipm/utils/hash.py @@ -0,0 +1,15 @@ +from pathlib import Path +import hashlib + + +def hash_ifp(lfp_path: str | Path, block_size=65536) -> bytes: + sha256 = hashlib.sha256() + with Path(lfp_path).resolve().open("rb") as file: + for block in iter(lambda: file.read(block_size), b""): + sha256.update(block) + return sha256.digest() + + +def verify_ifp(lfp_path, expected_hash): + actual_hash = hash_ifp(lfp_path) + return actual_hash == expected_hash -- cgit v1.2.3-70-g09d2