aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/ipm/api.py2
-rw-r--r--src/ipm/models/index.py85
2 files changed, 49 insertions, 38 deletions
diff --git a/src/ipm/api.py b/src/ipm/api.py
index deba562..007a0d2 100644
--- a/src/ipm/api.py
+++ b/src/ipm/api.py
@@ -235,7 +235,7 @@ def yggdrasil_add(
success("环境检查完毕.", echo)
update("同步世界树中...", echo)
yggdrasil = Yggdrasil(index)
- # yggdrasil.sync()
+ yggdrasil.sync()
warning("世界树同步模块未实装, 忽略.", echo)
add_yggdrasil(toml_path, name, index)
diff --git a/src/ipm/models/index.py b/src/ipm/models/index.py
index d5b257d..7f179aa 100644
--- a/src/ipm/models/index.py
+++ b/src/ipm/models/index.py
@@ -1,58 +1,69 @@
from pathlib import Path
-from urllib.parse import urlparse
-from ..const import INDEX_PATH
-from ..typing import Storage
-from ..logging import info, success
-from ..exceptions import LockLoadFailed
+from ipm.const import INDEX_PATH
+from ipm.exceptions import LockLoadFailed
+from ipm.models.lock import PackageLock
+import tomlkit
import requests
import tempfile
import shutil
class Yggdrasil:
- source_path: Path
-
- host: str
- uuid: str
- index: str
+ """此对象内所有方法均为示例方法,可以按照需求和开发奇怪进行微调"""
def __init__(self, index: str) -> None:
- INDEX_PATH.mkdir(parents=True, exist_ok=True)
self.index = index.rstrip("/") + "/"
+ self._data = self.read()
+
+ def read(self) -> tomlkit.TOMLDocument:
+ """示例方法,读取一个本地的世界树索引文件"""
+ if not self._source_path.exists():
+ self._source_path.parent.mkdir(parents=True, exist_ok=True)
+ return tomlkit.document()
+ return tomlkit.load(self._source_path.open("r", encoding="utf-8"))
+
+ def dump(self) -> None:
+ """示例方法,将修改保存在本地"""
+ tomlkit.dump(self._data, self._source_path.open("w", encoding="utf-8"))
+
+ @staticmethod
+ def check(source_path: Path):
+ """检查一个索引文件是否合法"""
+ raise NotImplementedError
- # def init(self, source_path: Path):
- # self.source_path = source_path
- # self.lock = PackageLock(self.source_path / "infini.lock")
- # self.uuid = self.lock.metadata["uuid"]
- # self.host = self.lock.metadata.get("host") or urlparse(self.index).netloc
+ def sync(self):
+ """示例方法,下载或同步索引文件"""
+ lock_bytes = requests.get(
+ self.index + "infini.lock"
+ ).content # 索引文件地址,这里的lock仅作示例
- def sync(self, echo: bool = False): ...
+ temp_dir = tempfile.TemporaryDirectory()
+ temp_path = Path(temp_dir.name).resolve()
+ temp_lock_path = temp_path / "infini.lock" # 索引文件地址,这里的lock仅作示例
- # info(f"正在从世界树[{self.index}]同步...", echo)
- # lock_bytes = requests.get(self.index + "infini.lock").content
+ temp_lock_file = temp_lock_path.open("wb")
+ temp_lock_file.write(lock_bytes)
+ temp_lock_file.close()
- # temp_dir = tempfile.TemporaryDirectory()
- # temp_path = Path(temp_dir.name).resolve()
- # temp_lock_path = temp_path / "infini.lock"
+ Yggdrasil.check(temp_lock_path) # 检查索引文件合法性
- # temp_lock_file = temp_lock_path.open("wb")
- # temp_lock_file.write(lock_bytes)
- # temp_lock_file.close()
+ temp_lock = PackageLock(temp_lock_path)
- # temp_lock = PackageLock(temp_lock_path)
+ if "uuid" not in temp_lock.metadata.keys():
+ temp_dir.cleanup()
+ raise LockLoadFailed(f"地址[{self.index}]不是合法的世界树服务器.")
- # if "uuid" not in temp_lock.metadata.keys():
- # temp_dir.cleanup()
- # raise LockLoadFailed(f"地址[{self.index}]不是合法的世界树服务器.")
+ self._source_path = INDEX_PATH / temp_lock.metadata["uuid"]
+ self._source_path.mkdir(parents=True, exist_ok=True)
- # self.source_path = INDEX_PATH / temp_lock.metadata["uuid"]
- # self.source_path.mkdir(parents=True, exist_ok=True)
+ shutil.copy2(temp_lock_path, self._source_path)
+ temp_dir.cleanup()
- # shutil.copy2(temp_lock_path, self.source_path)
- # temp_dir.cleanup()
- # self.init(self.source_path)
- # success(f"成功建立与世界树[{self.host}]的连接.")
+ def get_url(self, name: str, version: str) -> str:
+ """从本地读取规则包下载链接"""
+ raise NotImplementedError
- # def get(self, name: str, version: str | None = None) -> Storage | None:
- # return self.lock.get_particular_storage(name, version)
+ def get_hash(self, name: str, version: str) -> str:
+ """从本地获取规则包哈希值"""
+ raise NotImplementedError