aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/HydroRoll/__init__.py74
-rw-r--r--src/HydroRoll/config.py97
-rw-r--r--src/HydroRoll/exceptions.py0
-rw-r--r--src/HydroRoll/models/__init__.py0
-rw-r--r--src/HydroRoll/models/hola.pklbin0 -> 209 bytes
-rw-r--r--src/HydroRoll/models/utils.py14
-rw-r--r--src/HydroRoll/typing.py34
-rw-r--r--src/HydroRoll/utils.py162
8 files changed, 381 insertions, 0 deletions
diff --git a/src/HydroRoll/__init__.py b/src/HydroRoll/__init__.py
new file mode 100644
index 0000000..b3e673d
--- /dev/null
+++ b/src/HydroRoll/__init__.py
@@ -0,0 +1,74 @@
+"""中间件"""
+from ast import literal_eval
+import os
+from os.path import dirname, join, abspath
+from iamai import ConfigModel, Plugin
+from iamai.log import logger
+from .config import Directory
+from .models.utils import *
+import joblib
+
+BASE_DIR = Directory(_path=dirname(abspath("__file__")))
+HYDRO_DIR = dirname(abspath(__file__))
+
+
+def _init_directory(_prefix: str = ''):
+ """初始化水系目录"""
+ for _ in BASE_DIR.get_dice_dir_list(_prefix):
+ if not os.path.exists(_):
+ os.makedirs(_)
+
+
+def _load_models():
+ models = {}
+ models['hola'] = joblib.load(
+ join(HYDRO_DIR, 'models', 'hola.pkl'))
+ return models
+
+
+def load_model(model):
+ logger.info("loading models...")
+ return _load_models()[model]
+
+
+def init_directory(_prefix: str = 'HydroRoll'):
+ _init_directory(_prefix=_prefix)
+
+
+class HydroRoll(Plugin):
+ """中间件"""
+ class Config(ConfigModel):
+ __config_name__ = "HydroRoll"
+
+ priority = 0
+
+ # TODO: HydroRollCore should be able to handle all signals and tokens from Psi.
+ logger.info("Loading HydroRollCore...")
+
+ async def handle(self) -> None:
+ """
+ @TODO: HydroRollCore should be able to handle all signals and tokens from Psi.
+ @BODY: HydroRollCore actives the rule-packages.
+ """
+
+ if self.event.message.get_plain_text() == ".core":
+ await self.event.reply("HydroRollCore is running.")
+ elif self.event.message.startswith(".test"):
+ try:
+ result = literal_eval(self.event.message.get_plain_text()[5:])
+ await self.event.reply(result)
+ except Exception as error:
+ await self.event.reply(f"{error!r}")
+
+ async def rule(self) -> bool:
+ """
+ @TODO: Psi should be able to handle all message first.
+ @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value.
+ """
+ logger.info("loading psi...")
+ if not self.bot.global_state.get('HydroRoll.dir'):
+ hola = load_model('hola')
+
+ init_directory()
+ self.bot.global_state['HydroRoll.dir'] = True
+ return self.event.adapter.name in ['cqhttp', 'kook', 'console', 'mirai']
diff --git a/src/HydroRoll/config.py b/src/HydroRoll/config.py
new file mode 100644
index 0000000..e0789a0
--- /dev/null
+++ b/src/HydroRoll/config.py
@@ -0,0 +1,97 @@
+import argparse
+import sys
+import platform
+from importlib.metadata import version
+import os
+from typing import Set, Optional
+from iamai import ConfigModel
+
+# 创建全局 ArgumentParser 对象
+global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数")
+
+
+class BasePluginConfig(ConfigModel):
+ __config_name__ = ""
+ handle_all_message: bool = True
+ """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。"""
+ handle_friend_message: bool = True
+ """是否处理好友消息。"""
+ handle_group_message: bool = True
+ """是否处理群消息。"""
+ accept_group: Optional[Set[int]] = None
+ """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。"""
+ message_str: str = "*{user_name} {message}"
+ """最终发送消息的格式。"""
+
+
+class RegexPluginConfig(BasePluginConfig):
+ pass
+
+
+class CommandPluginConfig(RegexPluginConfig):
+ command_prefix: Set[str] = {":", "你妈", "👅", "约瑟夫妥斯妥耶夫斯基戴安那只鸡🐔"}
+ """命令前缀。"""
+ command: Set[str] = {}
+ """命令文本。"""
+ ignore_case: bool = True
+ """忽略大小写。"""
+
+
+# 定义全局配置类
+class GlobalConfig(CommandPluginConfig):
+ _name = "HydroRoll[水系]"
+ _version = "0.1.0"
+ _svn = "2"
+ _author = "简律纯"
+ _iamai_version = version("iamai")
+ _python_ver = sys.version
+ _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3]))
+
+ # 定义系统组件
+ class HydroSystem:
+ def __init__(self):
+ self.parser = argparse.ArgumentParser(
+ description="HydroRoll[水系].system 系统命令参数"
+ )
+ self.subparsers = self.parser.add_subparsers()
+ self.status_parser = self.subparsers.add_parser(
+ "status", aliases=["stat", "state"], help="系统状态"
+ )
+ self.reload_parser = self.subparsers.add_parser(
+ "reload", aliases=["rld"], help="重新加载系统"
+ )
+ self.restart_parser = self.subparsers.add_parser(
+ "restart", aliases=["rst"], help="重启系统"
+ )
+ self.collect_parser = self.subparsers.add_parser(
+ "collect", aliases=["gc"], help="释放 python 内存"
+ )
+ self.help = "\n".join(
+ self.parser.format_help()
+ .replace("\r\n", "\n")
+ .replace("\r", "")
+ .split("\n")[2:-3]
+ )
+
+ class HydroBot:
+ def __init__(self) -> None:
+ self.parser = argparse.ArgumentParser(description="Bot命令")
+
+
+class Directory(object):
+ def __init__(self, _path: str) -> None:
+ self.current_path = _path
+
+ def get_dice_dir_list(self, _prefix: str) -> list:
+
+ return [
+ os.path.join(self.current_path, f'{_prefix}', *dirs)
+ for dirs in [
+ ['config'],
+ ['data'],
+ ['rules'],
+ ['scripts'],
+ ['web', 'frontend'],
+ ['web', 'backend'],
+ ]
+ ]
diff --git a/src/HydroRoll/exceptions.py b/src/HydroRoll/exceptions.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/HydroRoll/exceptions.py
diff --git a/src/HydroRoll/models/__init__.py b/src/HydroRoll/models/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/HydroRoll/models/__init__.py
diff --git a/src/HydroRoll/models/hola.pkl b/src/HydroRoll/models/hola.pkl
new file mode 100644
index 0000000..b9940fa
--- /dev/null
+++ b/src/HydroRoll/models/hola.pkl
Binary files differ
diff --git a/src/HydroRoll/models/utils.py b/src/HydroRoll/models/utils.py
new file mode 100644
index 0000000..73e7ba0
--- /dev/null
+++ b/src/HydroRoll/models/utils.py
@@ -0,0 +1,14 @@
+import difflib
+import pickle
+
+def find_max_similarity(input_string, string_list):
+ max_similarity = 0
+ max_string = ""
+
+ for string in string_list:
+ similarity = difflib.SequenceMatcher(None, input_string, string).quick_ratio()
+ if similarity > max_similarity:
+ max_similarity = similarity
+ max_string = string
+
+ return max_string, max_similarity \ No newline at end of file
diff --git a/src/HydroRoll/typing.py b/src/HydroRoll/typing.py
new file mode 100644
index 0000000..876fa92
--- /dev/null
+++ b/src/HydroRoll/typing.py
@@ -0,0 +1,34 @@
+"""HydroRoll 类型提示支持。
+
+此模块定义了部分 HydroRoll 使用的类型。
+"""
+
+from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable
+
+from iamai.message import T_MS, T_Message, T_MessageSegment
+
+if TYPE_CHECKING:
+ from iamai.bot import Bot # noqa
+ from iamai.event import Event # noqa
+ from iamai.plugin import Plugin # noqa
+ from iamai.config import ConfigModel # noqa
+
+__all__ = [
+ "T_State",
+ "T_Event",
+ "T_Plugin",
+ "T_Config",
+ "T_Message",
+ "T_MessageSegment",
+ "T_MS",
+ "T_BotHook",
+ "T_EventHook",
+]
+
+T_State = TypeVar("T_State")
+T_Event = TypeVar("T_Event", bound="Event")
+T_Plugin = TypeVar("T_Plugin", bound="Plugin")
+T_Config = TypeVar("T_Config", bound="ConfigModel")
+
+T_BotHook = Callable[["Bot"], Awaitable[NoReturn]]
+T_EventHook = Callable[[T_Event], Awaitable[NoReturn]] \ No newline at end of file
diff --git a/src/HydroRoll/utils.py b/src/HydroRoll/utils.py
new file mode 100644
index 0000000..453e691
--- /dev/null
+++ b/src/HydroRoll/utils.py
@@ -0,0 +1,162 @@
+import re
+import time
+import random
+from abc import ABC, abstractmethod
+from typing import Type, Union, Generic, TypeVar
+from iamai import Plugin
+from iamai.typing import T_State
+from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent
+
+from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig
+
+T_Config = TypeVar("T_Config", bound=BasePluginConfig)
+T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig)
+T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig)
+
+
+class BasePlugin(
+ Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config],
+ ABC,
+ Generic[T_State, T_Config],
+):
+ Config: Type[T_Config] = BasePluginConfig
+
+ def format_str(self, format_str: str, message_str: str = "") -> str:
+ return format_str.format(
+ message=message_str,
+ user_name=self.event.sender.nickname,
+ user_id=self.event.sender.user_id,
+ )
+
+ async def rule(self) -> bool:
+ is_bot_off = False
+
+ if self.event.adapter.name != "cqhttp":
+ return False
+ if self.event.type != "message":
+ return False
+ match_str = self.event.message.get_plain_text()
+ if is_bot_off:
+ if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"):
+ match_str = re.sub(
+ rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str
+ )
+ elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"):
+ match_str = re.sub(
+ rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str
+ )
+ else:
+ return False
+ if self.config.handle_all_message:
+ return self.str_match(match_str)
+ elif self.config.handle_friend_message:
+ if self.event.message_type == "private":
+ return self.str_match(match_str)
+ elif self.config.handle_group_message:
+ if self.event.message_type == "group":
+ if (
+ self.config.accept_group is None
+ or self.event.group_id in self.config.accept_group
+ ):
+ return self.str_match(match_str)
+ elif self.config.handle_group_message:
+ if self.event.message_type == "guild":
+ return self.str_match(match_str)
+ return False
+
+ @abstractmethod
+ def str_match(self, msg_str: str) -> bool:
+ raise NotImplemented
+
+
+class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC):
+ msg_match: re.Match
+ re_pattern: re.Pattern
+ Config: Type[T_RegexPluginConfig] = RegexPluginConfig
+
+ def str_match(self, msg_str: str) -> bool:
+ msg_str = msg_str.strip()
+ self.msg_match = self.re_pattern.fullmatch(msg_str)
+ return bool(self.msg_match)
+
+
+class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC):
+ command_match: re.Match
+ command_re_pattern: re.Pattern
+ Config: Type[T_CommandPluginConfig] = CommandPluginConfig
+
+ def str_match(self, msg_str: str) -> bool:
+ if not hasattr(self, "command_re_pattern"):
+ self.command_re_pattern = re.compile(
+ f'({"|".join(self.config.command_prefix)})'
+ f'({"|".join(self.config.command)})'
+ r"\s*(?P<command_args>.*)",
+ flags=re.I if self.config.ignore_case else 0,
+ )
+ msg_str = msg_str.strip()
+ self.command_match = self.command_re_pattern.fullmatch(msg_str)
+ if not self.command_match:
+ return False
+ self.msg_match = self.re_pattern.fullmatch(
+ self.command_match.group("command_args")
+ )
+ return bool(self.msg_match)
+
+
+class PseudoRandomGenerator:
+ """线性同余法随机数生成器"""
+
+ def __init__(self, seed):
+ self.seed = seed
+
+ def generate(self):
+ while True:
+ self.seed = (self.seed * 1103515245 + 12345) % (2**31)
+ yield self.seed
+
+
+class HydroDice:
+ """水系掷骰组件
+
+ 一些 API 相关的工具函数
+
+ """
+
+ def __init__(self, seed):
+ self.generator = PseudoRandomGenerator(seed)
+
+ def roll_dice(
+ self,
+ _counts: int | str,
+ _sides: int | str,
+ is_reversed: bool = False,
+ streamline: bool = False,
+ threshold: int | str = 5,
+ ) -> str:
+ """普通掷骰
+ Args:
+ _counts (int | str): 掷骰个数.
+ _sides (int | str): 每个骰子的面数.
+ is_reversed (bool, optional): 倒序输出. Defaults to False.
+ streamline (bool, optional): 忽略过程. Defaults to False.
+ threshold (int | str, optional): streamline 的阈值. Defaults to 5.
+
+ Returns:
+ str: 表达式结果.
+ """
+ rolls = []
+ for _ in range(int(_counts)):
+ roll = next(self.generator.generate()) % _sides + 1
+ rolls.append(roll)
+ total = sum(rolls)
+
+ if streamline:
+ return str(total)
+ else:
+ if len(rolls) > int(threshold):
+ return str(total)
+ rolls_str = " + ".join(str(r) for r in rolls)
+ result_str = (
+ f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}"
+ )
+ return result_str