aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/hydro_roll
diff options
context:
space:
mode:
author简律纯 <i@jyunko.cn>2024-02-27 11:46:36 +0800
committer简律纯 <i@jyunko.cn>2024-02-27 11:46:36 +0800
commitcac060a6e8b70e4e4e1b34555d3d4fee8bc6d007 (patch)
treec3777687637c44d38690a364a363a8336424b59a /hydro_roll
parent255c259cbf5ea143927dd988049c4d9ed14ac4f7 (diff)
downloadHydroRoll-cac060a6e8b70e4e4e1b34555d3d4fee8bc6d007.tar.gz
HydroRoll-cac060a6e8b70e4e4e1b34555d3d4fee8bc6d007.zip
refactor!: rename HydroRoll dir as hydro_roll
Diffstat (limited to 'hydro_roll')
-rw-r--r--hydro_roll/__init__.py80
-rw-r--r--hydro_roll/cli.py180
-rw-r--r--hydro_roll/config.py97
-rw-r--r--hydro_roll/exceptions.py0
-rw-r--r--hydro_roll/models/__init__.py0
-rw-r--r--hydro_roll/models/hola.pklbin0 -> 209 bytes
-rw-r--r--hydro_roll/models/utils.py14
-rw-r--r--hydro_roll/typing.py34
-rw-r--r--hydro_roll/utils.py162
9 files changed, 567 insertions, 0 deletions
diff --git a/hydro_roll/__init__.py b/hydro_roll/__init__.py
new file mode 100644
index 0000000..cfaff13
--- /dev/null
+++ b/hydro_roll/__init__.py
@@ -0,0 +1,80 @@
+"""中间件"""
+
+from ast import literal_eval
+import os
+from os.path import dirname, join, abspath
+from iamai import ConfigModel, Plugin
+from iamai.log import logger
+from .config import Directory
+from .models.utils import *
+import joblib
+
+try:
+ from .HydroRoll import sum_as_string
+except ImportError:
+ ...
+
+BASE_DIR = Directory(_path=dirname(abspath("__file__")))
+HYDRO_DIR = dirname(abspath(__file__))
+
+
+def _init_directory(_prefix: str = ""):
+ """初始化水系目录"""
+ for _ in BASE_DIR.get_dice_dir_list(_prefix):
+ if not os.path.exists(_):
+ os.makedirs(_)
+
+
+def _load_models():
+ models = {}
+ models["hola"] = joblib.load(join(HYDRO_DIR, "models", "hola.pkl"))
+ return models
+
+
+def load_model(model):
+ logger.info("loading models...")
+ return _load_models()[model]
+
+
+def init_directory(_prefix: str = "HydroRoll"):
+ _init_directory(_prefix=_prefix)
+
+
+class HydroRoll(Plugin):
+ """中间件"""
+
+ class Config(ConfigModel):
+ __config_name__ = "HydroRoll"
+
+ priority = 0
+
+ # TODO: infini should be able to handle all signals and tokens from Psi.
+ logger.info(f"Loading infini... with {sum_as_string(2,3)}")
+
+ async def handle(self) -> None:
+ """
+ @TODO: infini should be able to handle all signals and tokens from Psi.
+ @BODY: infini actives the rule-packages.
+ """
+
+ if self.event.message.get_plain_text() == ".core":
+ await self.event.reply("infini is running.")
+ elif self.event.message.startswith(".test"):
+ try:
+ result = literal_eval(self.event.message.get_plain_text()[5:])
+ await self.event.reply(result)
+ except Exception as error:
+ await self.event.reply(f"{error!r}")
+
+ async def rule(self) -> bool:
+ """
+ @TODO: Psi should be able to handle all message first.
+ @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value.
+ """
+ logger.info("loading psi...")
+ if not self.bot.global_state.get("HydroRoll.dir"):
+ hola = load_model("hola")
+
+ init_directory()
+ self.bot.global_state["HydroRoll.dir"] = True
+ return self.event.adapter.name in ["cqhttp", "kook", "console", "mirai"]
diff --git a/hydro_roll/cli.py b/hydro_roll/cli.py
new file mode 100644
index 0000000..d3404ad
--- /dev/null
+++ b/hydro_roll/cli.py
@@ -0,0 +1,180 @@
+import argparse
+import os
+import aiohttp
+import asyncio
+import json
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .typing import *
+
+class Cli(object):
+ parser = argparse.ArgumentParser(description="水系终端脚手架")
+
+ def __init__(self):
+ self.parser.add_argument(
+ "-i",
+ "--install",
+ dest="command",
+ help="安装规则包、插件与模型",
+ action="store_const",
+ const="install_package",
+ )
+ self.parser.add_argument(
+ "-T",
+ "--template",
+ dest="command",
+ help="选择模板快速创建Bot实例",
+ action="store_const",
+ const="build_template",
+ )
+ self.parser.add_argument(
+ "-S",
+ "--search",
+ dest="command",
+ help="在指定镜像源查找规则包、插件与模型",
+ action="store_const",
+ const="search_package",
+ )
+ self.parser.add_argument(
+ "-c",
+ "--config",
+ dest="command",
+ help="配置管理",
+ action="store_const",
+ const="config",
+ )
+ self.args = self.parser.parse_args()
+
+ def get_args(self):
+ return self.args
+
+ def get_help(self):
+ return self.parser.format_help()
+
+ async def install_packages(self):
+ package_name = input("请输入要安装的包名:")
+ url = f"https://pypi.org/pypi/{package_name}/json"
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as response:
+ if response.status == 200:
+ data = await response.json()
+ await self._extract_package(data, package_name)
+ else:
+ print(f"找不到包:{package_name}")
+
+ async def _extract_package(self, data, package_name):
+ latest_version = data["info"]["version"]
+ download_url = data["releases"][latest_version][0]["url"]
+
+ plugins_dir = "plugins"
+ if not os.path.exists(plugins_dir):
+ os.mkdir(plugins_dir)
+
+ file_name = download_url.split("/")[-1]
+ file_path = os.path.join(plugins_dir, file_name)
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(download_url) as response:
+ if response.status == 200:
+ with open(file_path, "wb") as file:
+ file.write(await response.read())
+ print(f"成功安装包:{package_name}")
+ else:
+ print(f"下载包时出错:{package_name}")
+
+ def build_template(self):
+ template = input("请选择应用模板(输入数字):\n" "1. 创建轻量应用\n" "2. 创建标准应用\n" "3. 创建开发应用\n")
+
+ if template == "1":
+ print("选择了轻量应用模板")
+ elif template == "2":
+ print("选择了标准应用模板")
+ elif template == "3":
+ print("选择了开发应用模板")
+ else:
+ print("无效的模板选择")
+
+ async def search_package(self):
+ search_term = input("请输入要搜索的包名关键字:")
+ url = f"https://pypi.org/search/?q={search_term}"
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as response:
+ if response.status == 200:
+ data: dict = response.json() # type: ignore[dict]
+ packages = data.get("results", [])
+
+ for package in packages:
+ name = package["name"]
+ topics = package.get("topics", [])
+
+ if (
+ search_term.lower() in name.lower()
+ and "HydroRoll" in topics
+ ):
+ print(f"包名:{name}")
+ else:
+ print("搜索失败")
+
+ def config(self):
+ config_dir = os.path.expanduser("~/.hydroroll")
+ if not os.path.exists(config_dir):
+ os.makedirs(config_dir)
+
+ config_file = os.path.join(config_dir, "config.json")
+
+ subcommand = input("请输入子命令(add/delete):")
+
+ if subcommand == "add":
+ key = input("请输入要添加的键名:")
+ value = input("请输入要添加的键值:")
+
+ with open(config_file, "r+") as file:
+ try:
+ config_data = json.load(file)
+ except json.JSONDecodeError:
+ config_data = {}
+
+ config_data[key] = value
+ self._extracted_from_config_21(file, config_data)
+ print(f"成功添加配置项:{key}={value}")
+
+ elif subcommand == "delete":
+ key = input("请输入要删除的键名:")
+
+ with open(config_file, "r+") as file:
+ try:
+ config_data = json.load(file)
+ except json.JSONDecodeError:
+ config_data = {}
+
+ if key in config_data:
+ del config_data[key]
+ self._extracted_from_config_21(file, config_data)
+ print(f"成功删除配置项:{key}")
+ else:
+ print(f"配置项不存在:{key}")
+
+ else:
+ print("无效的子命令选择")
+
+ # TODO Rename this here and in `config`
+ def _extracted_from_config_21(self, file, config_data):
+ file.seek(0)
+ json.dump(config_data, file, indent=4)
+ file.truncate()
+
+
+cli = Cli()
+
+if cli.get_args().command == "install_package":
+ asyncio.run(cli.install_packages())
+elif cli.get_args().command == "build_template":
+ cli.build_template()
+elif cli.get_args().command == "search_package":
+ asyncio.run(cli.search_package())
+elif cli.get_args().command == "config":
+ cli.config()
+else:
+ print(cli.get_help())
diff --git a/hydro_roll/config.py b/hydro_roll/config.py
new file mode 100644
index 0000000..e0789a0
--- /dev/null
+++ b/hydro_roll/config.py
@@ -0,0 +1,97 @@
+import argparse
+import sys
+import platform
+from importlib.metadata import version
+import os
+from typing import Set, Optional
+from iamai import ConfigModel
+
+# 创建全局 ArgumentParser 对象
+global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数")
+
+
+class BasePluginConfig(ConfigModel):
+ __config_name__ = ""
+ handle_all_message: bool = True
+ """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。"""
+ handle_friend_message: bool = True
+ """是否处理好友消息。"""
+ handle_group_message: bool = True
+ """是否处理群消息。"""
+ accept_group: Optional[Set[int]] = None
+ """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。"""
+ message_str: str = "*{user_name} {message}"
+ """最终发送消息的格式。"""
+
+
+class RegexPluginConfig(BasePluginConfig):
+ pass
+
+
+class CommandPluginConfig(RegexPluginConfig):
+ command_prefix: Set[str] = {":", "你妈", "👅", "约瑟夫妥斯妥耶夫斯基戴安那只鸡🐔"}
+ """命令前缀。"""
+ command: Set[str] = {}
+ """命令文本。"""
+ ignore_case: bool = True
+ """忽略大小写。"""
+
+
+# 定义全局配置类
+class GlobalConfig(CommandPluginConfig):
+ _name = "HydroRoll[水系]"
+ _version = "0.1.0"
+ _svn = "2"
+ _author = "简律纯"
+ _iamai_version = version("iamai")
+ _python_ver = sys.version
+ _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3]))
+
+ # 定义系统组件
+ class HydroSystem:
+ def __init__(self):
+ self.parser = argparse.ArgumentParser(
+ description="HydroRoll[水系].system 系统命令参数"
+ )
+ self.subparsers = self.parser.add_subparsers()
+ self.status_parser = self.subparsers.add_parser(
+ "status", aliases=["stat", "state"], help="系统状态"
+ )
+ self.reload_parser = self.subparsers.add_parser(
+ "reload", aliases=["rld"], help="重新加载系统"
+ )
+ self.restart_parser = self.subparsers.add_parser(
+ "restart", aliases=["rst"], help="重启系统"
+ )
+ self.collect_parser = self.subparsers.add_parser(
+ "collect", aliases=["gc"], help="释放 python 内存"
+ )
+ self.help = "\n".join(
+ self.parser.format_help()
+ .replace("\r\n", "\n")
+ .replace("\r", "")
+ .split("\n")[2:-3]
+ )
+
+ class HydroBot:
+ def __init__(self) -> None:
+ self.parser = argparse.ArgumentParser(description="Bot命令")
+
+
+class Directory(object):
+ def __init__(self, _path: str) -> None:
+ self.current_path = _path
+
+ def get_dice_dir_list(self, _prefix: str) -> list:
+
+ return [
+ os.path.join(self.current_path, f'{_prefix}', *dirs)
+ for dirs in [
+ ['config'],
+ ['data'],
+ ['rules'],
+ ['scripts'],
+ ['web', 'frontend'],
+ ['web', 'backend'],
+ ]
+ ]
diff --git a/hydro_roll/exceptions.py b/hydro_roll/exceptions.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/hydro_roll/exceptions.py
diff --git a/hydro_roll/models/__init__.py b/hydro_roll/models/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/hydro_roll/models/__init__.py
diff --git a/hydro_roll/models/hola.pkl b/hydro_roll/models/hola.pkl
new file mode 100644
index 0000000..b9940fa
--- /dev/null
+++ b/hydro_roll/models/hola.pkl
Binary files differ
diff --git a/hydro_roll/models/utils.py b/hydro_roll/models/utils.py
new file mode 100644
index 0000000..73e7ba0
--- /dev/null
+++ b/hydro_roll/models/utils.py
@@ -0,0 +1,14 @@
+import difflib
+import pickle
+
+def find_max_similarity(input_string, string_list):
+ max_similarity = 0
+ max_string = ""
+
+ for string in string_list:
+ similarity = difflib.SequenceMatcher(None, input_string, string).quick_ratio()
+ if similarity > max_similarity:
+ max_similarity = similarity
+ max_string = string
+
+ return max_string, max_similarity \ No newline at end of file
diff --git a/hydro_roll/typing.py b/hydro_roll/typing.py
new file mode 100644
index 0000000..876fa92
--- /dev/null
+++ b/hydro_roll/typing.py
@@ -0,0 +1,34 @@
+"""HydroRoll 类型提示支持。
+
+此模块定义了部分 HydroRoll 使用的类型。
+"""
+
+from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable
+
+from iamai.message import T_MS, T_Message, T_MessageSegment
+
+if TYPE_CHECKING:
+ from iamai.bot import Bot # noqa
+ from iamai.event import Event # noqa
+ from iamai.plugin import Plugin # noqa
+ from iamai.config import ConfigModel # noqa
+
+__all__ = [
+ "T_State",
+ "T_Event",
+ "T_Plugin",
+ "T_Config",
+ "T_Message",
+ "T_MessageSegment",
+ "T_MS",
+ "T_BotHook",
+ "T_EventHook",
+]
+
+T_State = TypeVar("T_State")
+T_Event = TypeVar("T_Event", bound="Event")
+T_Plugin = TypeVar("T_Plugin", bound="Plugin")
+T_Config = TypeVar("T_Config", bound="ConfigModel")
+
+T_BotHook = Callable[["Bot"], Awaitable[NoReturn]]
+T_EventHook = Callable[[T_Event], Awaitable[NoReturn]] \ No newline at end of file
diff --git a/hydro_roll/utils.py b/hydro_roll/utils.py
new file mode 100644
index 0000000..453e691
--- /dev/null
+++ b/hydro_roll/utils.py
@@ -0,0 +1,162 @@
+import re
+import time
+import random
+from abc import ABC, abstractmethod
+from typing import Type, Union, Generic, TypeVar
+from iamai import Plugin
+from iamai.typing import T_State
+from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent
+
+from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig
+
+T_Config = TypeVar("T_Config", bound=BasePluginConfig)
+T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig)
+T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig)
+
+
+class BasePlugin(
+ Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config],
+ ABC,
+ Generic[T_State, T_Config],
+):
+ Config: Type[T_Config] = BasePluginConfig
+
+ def format_str(self, format_str: str, message_str: str = "") -> str:
+ return format_str.format(
+ message=message_str,
+ user_name=self.event.sender.nickname,
+ user_id=self.event.sender.user_id,
+ )
+
+ async def rule(self) -> bool:
+ is_bot_off = False
+
+ if self.event.adapter.name != "cqhttp":
+ return False
+ if self.event.type != "message":
+ return False
+ match_str = self.event.message.get_plain_text()
+ if is_bot_off:
+ if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"):
+ match_str = re.sub(
+ rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str
+ )
+ elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"):
+ match_str = re.sub(
+ rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str
+ )
+ else:
+ return False
+ if self.config.handle_all_message:
+ return self.str_match(match_str)
+ elif self.config.handle_friend_message:
+ if self.event.message_type == "private":
+ return self.str_match(match_str)
+ elif self.config.handle_group_message:
+ if self.event.message_type == "group":
+ if (
+ self.config.accept_group is None
+ or self.event.group_id in self.config.accept_group
+ ):
+ return self.str_match(match_str)
+ elif self.config.handle_group_message:
+ if self.event.message_type == "guild":
+ return self.str_match(match_str)
+ return False
+
+ @abstractmethod
+ def str_match(self, msg_str: str) -> bool:
+ raise NotImplemented
+
+
+class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC):
+ msg_match: re.Match
+ re_pattern: re.Pattern
+ Config: Type[T_RegexPluginConfig] = RegexPluginConfig
+
+ def str_match(self, msg_str: str) -> bool:
+ msg_str = msg_str.strip()
+ self.msg_match = self.re_pattern.fullmatch(msg_str)
+ return bool(self.msg_match)
+
+
+class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC):
+ command_match: re.Match
+ command_re_pattern: re.Pattern
+ Config: Type[T_CommandPluginConfig] = CommandPluginConfig
+
+ def str_match(self, msg_str: str) -> bool:
+ if not hasattr(self, "command_re_pattern"):
+ self.command_re_pattern = re.compile(
+ f'({"|".join(self.config.command_prefix)})'
+ f'({"|".join(self.config.command)})'
+ r"\s*(?P<command_args>.*)",
+ flags=re.I if self.config.ignore_case else 0,
+ )
+ msg_str = msg_str.strip()
+ self.command_match = self.command_re_pattern.fullmatch(msg_str)
+ if not self.command_match:
+ return False
+ self.msg_match = self.re_pattern.fullmatch(
+ self.command_match.group("command_args")
+ )
+ return bool(self.msg_match)
+
+
+class PseudoRandomGenerator:
+ """线性同余法随机数生成器"""
+
+ def __init__(self, seed):
+ self.seed = seed
+
+ def generate(self):
+ while True:
+ self.seed = (self.seed * 1103515245 + 12345) % (2**31)
+ yield self.seed
+
+
+class HydroDice:
+ """水系掷骰组件
+
+ 一些 API 相关的工具函数
+
+ """
+
+ def __init__(self, seed):
+ self.generator = PseudoRandomGenerator(seed)
+
+ def roll_dice(
+ self,
+ _counts: int | str,
+ _sides: int | str,
+ is_reversed: bool = False,
+ streamline: bool = False,
+ threshold: int | str = 5,
+ ) -> str:
+ """普通掷骰
+ Args:
+ _counts (int | str): 掷骰个数.
+ _sides (int | str): 每个骰子的面数.
+ is_reversed (bool, optional): 倒序输出. Defaults to False.
+ streamline (bool, optional): 忽略过程. Defaults to False.
+ threshold (int | str, optional): streamline 的阈值. Defaults to 5.
+
+ Returns:
+ str: 表达式结果.
+ """
+ rolls = []
+ for _ in range(int(_counts)):
+ roll = next(self.generator.generate()) % _sides + 1
+ rolls.append(roll)
+ total = sum(rolls)
+
+ if streamline:
+ return str(total)
+ else:
+ if len(rolls) > int(threshold):
+ return str(total)
+ rolls_str = " + ".join(str(r) for r in rolls)
+ result_str = (
+ f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}"
+ )
+ return result_str