diff options
| author | 2023-11-03 03:28:35 +0800 | |
|---|---|---|
| committer | 2023-11-03 03:28:35 +0800 | |
| commit | 5a2033860a328c4116f0ede2874915315e7487b0 (patch) | |
| tree | f7b6d88d50ce33bd5fb52fbcfbca906738f412d6 /src | |
| parent | 4bf6db5200affc2f623aa02301020092c0789d19 (diff) | |
| download | HydroRoll-5a2033860a328c4116f0ede2874915315e7487b0.tar.gz HydroRoll-5a2033860a328c4116f0ede2874915315e7487b0.zip | |
Co-authored-by: HadalFauna <HadalFauna@users.noreply.github.com>
Diffstat (limited to 'src')
| -rw-r--r-- | src/HydroRoll/__init__.py | 74 | ||||
| -rw-r--r-- | src/HydroRoll/config.py | 97 | ||||
| -rw-r--r-- | src/HydroRoll/exceptions.py | 0 | ||||
| -rw-r--r-- | src/HydroRoll/models/__init__.py | 0 | ||||
| -rw-r--r-- | src/HydroRoll/models/hola.pkl | bin | 0 -> 209 bytes | |||
| -rw-r--r-- | src/HydroRoll/models/utils.py | 14 | ||||
| -rw-r--r-- | src/HydroRoll/typing.py | 34 | ||||
| -rw-r--r-- | src/HydroRoll/utils.py | 162 |
8 files changed, 381 insertions, 0 deletions
diff --git a/src/HydroRoll/__init__.py b/src/HydroRoll/__init__.py new file mode 100644 index 0000000..b3e673d --- /dev/null +++ b/src/HydroRoll/__init__.py @@ -0,0 +1,74 @@ +"""中间件""" +from ast import literal_eval +import os +from os.path import dirname, join, abspath +from iamai import ConfigModel, Plugin +from iamai.log import logger +from .config import Directory +from .models.utils import * +import joblib + +BASE_DIR = Directory(_path=dirname(abspath("__file__"))) +HYDRO_DIR = dirname(abspath(__file__)) + + +def _init_directory(_prefix: str = ''): + """初始化水系目录""" + for _ in BASE_DIR.get_dice_dir_list(_prefix): + if not os.path.exists(_): + os.makedirs(_) + + +def _load_models(): + models = {} + models['hola'] = joblib.load( + join(HYDRO_DIR, 'models', 'hola.pkl')) + return models + + +def load_model(model): + logger.info("loading models...") + return _load_models()[model] + + +def init_directory(_prefix: str = 'HydroRoll'): + _init_directory(_prefix=_prefix) + + +class HydroRoll(Plugin): + """中间件""" + class Config(ConfigModel): + __config_name__ = "HydroRoll" + + priority = 0 + + # TODO: HydroRollCore should be able to handle all signals and tokens from Psi. + logger.info("Loading HydroRollCore...") + + async def handle(self) -> None: + """ + @TODO: HydroRollCore should be able to handle all signals and tokens from Psi. + @BODY: HydroRollCore actives the rule-packages. + """ + + if self.event.message.get_plain_text() == ".core": + await self.event.reply("HydroRollCore is running.") + elif self.event.message.startswith(".test"): + try: + result = literal_eval(self.event.message.get_plain_text()[5:]) + await self.event.reply(result) + except Exception as error: + await self.event.reply(f"{error!r}") + + async def rule(self) -> bool: + """ + @TODO: Psi should be able to handle all message first. + @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value. + """ + logger.info("loading psi...") + if not self.bot.global_state.get('HydroRoll.dir'): + hola = load_model('hola') + + init_directory() + self.bot.global_state['HydroRoll.dir'] = True + return self.event.adapter.name in ['cqhttp', 'kook', 'console', 'mirai'] diff --git a/src/HydroRoll/config.py b/src/HydroRoll/config.py new file mode 100644 index 0000000..e0789a0 --- /dev/null +++ b/src/HydroRoll/config.py @@ -0,0 +1,97 @@ +import argparse +import sys +import platform +from importlib.metadata import version +import os +from typing import Set, Optional +from iamai import ConfigModel + +# 创建全局 ArgumentParser 对象 +global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数") + + +class BasePluginConfig(ConfigModel): + __config_name__ = "" + handle_all_message: bool = True + """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。""" + handle_friend_message: bool = True + """是否处理好友消息。""" + handle_group_message: bool = True + """是否处理群消息。""" + accept_group: Optional[Set[int]] = None + """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。""" + message_str: str = "*{user_name} {message}" + """最终发送消息的格式。""" + + +class RegexPluginConfig(BasePluginConfig): + pass + + +class CommandPluginConfig(RegexPluginConfig): + command_prefix: Set[str] = {":", "你妈", "👅", "约瑟夫妥斯妥耶夫斯基戴安那只鸡🐔"} + """命令前缀。""" + command: Set[str] = {} + """命令文本。""" + ignore_case: bool = True + """忽略大小写。""" + + +# 定义全局配置类 +class GlobalConfig(CommandPluginConfig): + _name = "HydroRoll[水系]" + _version = "0.1.0" + _svn = "2" + _author = "简律纯" + _iamai_version = version("iamai") + _python_ver = sys.version + _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3])) + + # 定义系统组件 + class HydroSystem: + def __init__(self): + self.parser = argparse.ArgumentParser( + description="HydroRoll[水系].system 系统命令参数" + ) + self.subparsers = self.parser.add_subparsers() + self.status_parser = self.subparsers.add_parser( + "status", aliases=["stat", "state"], help="系统状态" + ) + self.reload_parser = self.subparsers.add_parser( + "reload", aliases=["rld"], help="重新加载系统" + ) + self.restart_parser = self.subparsers.add_parser( + "restart", aliases=["rst"], help="重启系统" + ) + self.collect_parser = self.subparsers.add_parser( + "collect", aliases=["gc"], help="释放 python 内存" + ) + self.help = "\n".join( + self.parser.format_help() + .replace("\r\n", "\n") + .replace("\r", "") + .split("\n")[2:-3] + ) + + class HydroBot: + def __init__(self) -> None: + self.parser = argparse.ArgumentParser(description="Bot命令") + + +class Directory(object): + def __init__(self, _path: str) -> None: + self.current_path = _path + + def get_dice_dir_list(self, _prefix: str) -> list: + + return [ + os.path.join(self.current_path, f'{_prefix}', *dirs) + for dirs in [ + ['config'], + ['data'], + ['rules'], + ['scripts'], + ['web', 'frontend'], + ['web', 'backend'], + ] + ] diff --git a/src/HydroRoll/exceptions.py b/src/HydroRoll/exceptions.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/HydroRoll/exceptions.py diff --git a/src/HydroRoll/models/__init__.py b/src/HydroRoll/models/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/HydroRoll/models/__init__.py diff --git a/src/HydroRoll/models/hola.pkl b/src/HydroRoll/models/hola.pkl Binary files differnew file mode 100644 index 0000000..b9940fa --- /dev/null +++ b/src/HydroRoll/models/hola.pkl diff --git a/src/HydroRoll/models/utils.py b/src/HydroRoll/models/utils.py new file mode 100644 index 0000000..73e7ba0 --- /dev/null +++ b/src/HydroRoll/models/utils.py @@ -0,0 +1,14 @@ +import difflib +import pickle + +def find_max_similarity(input_string, string_list): + max_similarity = 0 + max_string = "" + + for string in string_list: + similarity = difflib.SequenceMatcher(None, input_string, string).quick_ratio() + if similarity > max_similarity: + max_similarity = similarity + max_string = string + + return max_string, max_similarity
\ No newline at end of file diff --git a/src/HydroRoll/typing.py b/src/HydroRoll/typing.py new file mode 100644 index 0000000..876fa92 --- /dev/null +++ b/src/HydroRoll/typing.py @@ -0,0 +1,34 @@ +"""HydroRoll 类型提示支持。 + +此模块定义了部分 HydroRoll 使用的类型。 +""" + +from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable + +from iamai.message import T_MS, T_Message, T_MessageSegment + +if TYPE_CHECKING: + from iamai.bot import Bot # noqa + from iamai.event import Event # noqa + from iamai.plugin import Plugin # noqa + from iamai.config import ConfigModel # noqa + +__all__ = [ + "T_State", + "T_Event", + "T_Plugin", + "T_Config", + "T_Message", + "T_MessageSegment", + "T_MS", + "T_BotHook", + "T_EventHook", +] + +T_State = TypeVar("T_State") +T_Event = TypeVar("T_Event", bound="Event") +T_Plugin = TypeVar("T_Plugin", bound="Plugin") +T_Config = TypeVar("T_Config", bound="ConfigModel") + +T_BotHook = Callable[["Bot"], Awaitable[NoReturn]] +T_EventHook = Callable[[T_Event], Awaitable[NoReturn]]
\ No newline at end of file diff --git a/src/HydroRoll/utils.py b/src/HydroRoll/utils.py new file mode 100644 index 0000000..453e691 --- /dev/null +++ b/src/HydroRoll/utils.py @@ -0,0 +1,162 @@ +import re +import time +import random +from abc import ABC, abstractmethod +from typing import Type, Union, Generic, TypeVar +from iamai import Plugin +from iamai.typing import T_State +from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent + +from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig + +T_Config = TypeVar("T_Config", bound=BasePluginConfig) +T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig) +T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig) + + +class BasePlugin( + Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config], + ABC, + Generic[T_State, T_Config], +): + Config: Type[T_Config] = BasePluginConfig + + def format_str(self, format_str: str, message_str: str = "") -> str: + return format_str.format( + message=message_str, + user_name=self.event.sender.nickname, + user_id=self.event.sender.user_id, + ) + + async def rule(self) -> bool: + is_bot_off = False + + if self.event.adapter.name != "cqhttp": + return False + if self.event.type != "message": + return False + match_str = self.event.message.get_plain_text() + if is_bot_off: + if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"): + match_str = re.sub( + rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str + ) + elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"): + match_str = re.sub( + rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str + ) + else: + return False + if self.config.handle_all_message: + return self.str_match(match_str) + elif self.config.handle_friend_message: + if self.event.message_type == "private": + return self.str_match(match_str) + elif self.config.handle_group_message: + if self.event.message_type == "group": + if ( + self.config.accept_group is None + or self.event.group_id in self.config.accept_group + ): + return self.str_match(match_str) + elif self.config.handle_group_message: + if self.event.message_type == "guild": + return self.str_match(match_str) + return False + + @abstractmethod + def str_match(self, msg_str: str) -> bool: + raise NotImplemented + + +class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC): + msg_match: re.Match + re_pattern: re.Pattern + Config: Type[T_RegexPluginConfig] = RegexPluginConfig + + def str_match(self, msg_str: str) -> bool: + msg_str = msg_str.strip() + self.msg_match = self.re_pattern.fullmatch(msg_str) + return bool(self.msg_match) + + +class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC): + command_match: re.Match + command_re_pattern: re.Pattern + Config: Type[T_CommandPluginConfig] = CommandPluginConfig + + def str_match(self, msg_str: str) -> bool: + if not hasattr(self, "command_re_pattern"): + self.command_re_pattern = re.compile( + f'({"|".join(self.config.command_prefix)})' + f'({"|".join(self.config.command)})' + r"\s*(?P<command_args>.*)", + flags=re.I if self.config.ignore_case else 0, + ) + msg_str = msg_str.strip() + self.command_match = self.command_re_pattern.fullmatch(msg_str) + if not self.command_match: + return False + self.msg_match = self.re_pattern.fullmatch( + self.command_match.group("command_args") + ) + return bool(self.msg_match) + + +class PseudoRandomGenerator: + """线性同余法随机数生成器""" + + def __init__(self, seed): + self.seed = seed + + def generate(self): + while True: + self.seed = (self.seed * 1103515245 + 12345) % (2**31) + yield self.seed + + +class HydroDice: + """水系掷骰组件 + + 一些 API 相关的工具函数 + + """ + + def __init__(self, seed): + self.generator = PseudoRandomGenerator(seed) + + def roll_dice( + self, + _counts: int | str, + _sides: int | str, + is_reversed: bool = False, + streamline: bool = False, + threshold: int | str = 5, + ) -> str: + """普通掷骰 + Args: + _counts (int | str): 掷骰个数. + _sides (int | str): 每个骰子的面数. + is_reversed (bool, optional): 倒序输出. Defaults to False. + streamline (bool, optional): 忽略过程. Defaults to False. + threshold (int | str, optional): streamline 的阈值. Defaults to 5. + + Returns: + str: 表达式结果. + """ + rolls = [] + for _ in range(int(_counts)): + roll = next(self.generator.generate()) % _sides + 1 + rolls.append(roll) + total = sum(rolls) + + if streamline: + return str(total) + else: + if len(rolls) > int(threshold): + return str(total) + rolls_str = " + ".join(str(r) for r in rolls) + result_str = ( + f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}" + ) + return result_str |
