diff options
Diffstat (limited to 'tests/plugins')
| -rw-r--r-- | tests/plugins/HydroRoll/__init__.py | 221 | ||||
| -rw-r--r-- | tests/plugins/HydroRoll/config.py | 80 | ||||
| -rw-r--r-- | tests/plugins/HydroRoll/core/__init__.py | 70 | ||||
| -rw-r--r-- | tests/plugins/HydroRoll/exceptions.py (renamed from tests/plugins/Webhook/config.py) | 0 | ||||
| -rw-r--r-- | tests/plugins/HydroRoll/psi/.gitkeep | 0 | ||||
| -rw-r--r-- | tests/plugins/HydroRoll/typing.py | 34 | ||||
| -rw-r--r-- | tests/plugins/HydroRoll/utils.py | 97 | ||||
| -rw-r--r-- | tests/plugins/Webhook/__init__.py | 32 | ||||
| -rw-r--r-- | tests/plugins/_show.py | 36 | ||||
| -rw-r--r-- | tests/plugins/e.py | 16 | ||||
| -rw-r--r-- | tests/plugins/test2.py | 15 |
11 files changed, 538 insertions, 63 deletions
diff --git a/tests/plugins/HydroRoll/__init__.py b/tests/plugins/HydroRoll/__init__.py new file mode 100644 index 0000000..b064bca --- /dev/null +++ b/tests/plugins/HydroRoll/__init__.py @@ -0,0 +1,221 @@ +from collections import defaultdict +from itertools import chain +from pathlib import Path +from typing import Dict, List, Optional, Type, Union +from iamai import ConfigModel, Plugin +from iamai.log import logger, error_or_exception +from iamai.utils import ModulePathFinder, get_classes_from_module_name, get_classes_from_dir, get_classes_from_module +from .config import GlobalConfig +from iamai.exceptions import LoadModuleError +from .utils import BasePlugin, RegexPluginBase, CommandPluginBase +import os +# from .core import Rule, RuleLoadType +from HydroRollCore import Rule, RuleLoadType + +class HydroRoll(Plugin): + + class Config(ConfigModel): + __config_name__ = "HydroRoll" + + priority = 0 + rules_priority_dict: Dict[int, List[Type[Rule]]] = defaultdict(list) + _module_path_finder = ModulePathFinder() + + + def _load_rule_class( + self, + rule_class: Type[Rule], + rule_load_type: RuleLoadType, + rule_file_path: Optional[str], + ): + logger.info(f'Loading rule from class "{rule_class!r}"') + """加载规则类。""" + priority = getattr(rule_class, "priority", None) + if type(priority) is int and priority >= 0: + for _rule in self.rules: + if _rule.__name__ == rule_class.__name__: + logger.warning( + f'Already have a same name rule "{_rule.__name__}"' + ) + rule_class.__rule_load_type__ = rule_load_type + rule_class.__rule_file_path__ = rule_file_path + self.rules_priority_dict[priority].append(rule_class) + logger.success( + f'Succeeded to load rule "{rule_class.__name__}" ' + f'from class "{rule_class!r}"' + ) + else: + error_or_exception( + f'Load rule from class "{rule_class!r}" failed:', + LoadModuleError( + f'Rule priority incorrect in the class "{rule_class!r}"' + ), + self.bot.config.bot.log.verbose_exception, + ) + + def _load_rules_from_module_name( + self, module_name: str, rule_load_type: RuleLoadType + ): + logger.info(f'Loading rules from module "{module_name}"') + """从模块名称中规则包模块。""" + try: + rule_classes = get_classes_from_module_name(module_name, Rule) + except ImportError as e: + error_or_exception( + f'Import module "{module_name}" failed:', + e, + self.bot.config.bot.log.verbose_exception, + ) + else: + for rule_class, module in rule_classes: + self._load_rule_class( + rule_class, + rule_load_type, + module.__file__, + ) + + def _load_rules( + self, + *rules: Union[Type[Plugin], str, Path], + rule_load_type: Optional[RuleLoadType] = None, + ): + """加载规则包。 + + Args: + *rules: 规则类、规则包模块名称或者规则包模块文件路径。类型可以是 `Type[Rule]`, `str` 或 `pathlib.Path`。 + 如果为 `Type[Rule]` 类型时,将作为规则类进行加载。 + 如果为 `str` 类型时,将作为规则包模块名称进行加载,格式和 Python `import` 语句相同。 + 例如:`path.of.rule`。 + 如果为 `pathlib.Path` 类型时,将作为规则包模块文件路径进行加载。 + 例如:`pathlib.Path("path/of/rule")`。 + rule_load_type: 规则加载类型,如果为 None 则自动判断,否则使用指定的类型。 + """ + logger.info("Loading rules...") + for rule_ in rules: + if isinstance(rule_, type): + if issubclass(rule_, Rule): + self._load_rule_class( + rule_, rule_load_type or RuleLoadType.CLASS, None + ) + else: + logger.error( + f'The rule class "{rule_!r}" must be a subclass of Rule' + ) + elif isinstance(rule_, str): + logger.warning(f'Loading rules from module "{rule_}"') + self._load_rules_from_module_name( + rule_, rule_load_type or RuleLoadType.NAME + ) + elif isinstance(rule_, Path): + logger.warning(f'Loading rules from path "{rule_}"') + if rule_.is_file(): + if rule_.suffix != ".py": + logger.error(f'The path "{rule_}" must endswith ".py"') + return + + rule_module_name = None + for path in self._module_path_finder.path: + try: + if rule_.stem == "__init__": + if rule_.resolve().parent.parent.samefile(Path(path)): + rule_module_name = rule_.resolve().parent.name + break + elif rule_.resolve().parent.samefile(Path(path)): + rule_module_name = rule_.stem + break + except OSError: + continue + if rule_module_name is None: + rel_path = rule_.resolve().relative_to(Path(".").resolve()) + if rel_path.stem == "__init__": + rule_module_name = ".".join(rel_path.parts[:-1]) + else: + rule_module_name = ".".join( + rel_path.parts[:-1] + (rel_path.stem,) + ) + + self._load_rules_from_module_name( + rule_module_name, rule_load_type or RuleLoadType.FILE + ) + else: + logger.error(f'The rule path "{rule_}" must be a file') + else: + logger.error(f"Type error: {rule_} can not be loaded as plugin") + + def load_rules(self, *rules: Union[Type[Rule], str, Path]): + """加载规则。 + + Args: + *rules: 规则类、规则包x模块名称或者规则包模块文件路径。类型可以是 `Type[Rule]`, `str` 或 `pathlib.Path`。 + 如果为 `Type[Rule]` 类型时,将作为规则类进行加载。 + 如果为 `str` 类型时,将作为规则包模块名称进行加载,格式和 Python `import` 语句相同。 + 例如:`path.of.rule`。 + 如果为 `pathlib.Path` 类型时,将作为规则包模块文件路径进行加载。 + 例如:`pathlib.Path("path/of/rule")`。 + """ + # self._extend_rules.extend(rules) + return self._load_rules(*rules) + + def _load_rules_from_dirs(self, *dirs: Path): + """从目录中加载规则包,以 `_` 开头的模块中的规则不会被导入。路径可以是相对路径或绝对路径。 + + Args: + *dirs: 储存包含规则的模块的模块路径。 + 例如:`pathlib.Path("path/of/rules/")` 。 + """ + logger.info("Loading rules from dirs...") + dirs = list(map(lambda x: str(x.resolve()), dirs)) # type: ignore maybe remove? + logger.warning(f'Loading rules from dirs "{", ".join(map(str, dirs))}"') + self._module_path_finder.path.extend(dirs) # type: ignore + # type: ignore + for rule_class, module in get_classes_from_dir(dirs, Rule): # type: ignore + self._load_rule_class(rule_class, RuleLoadType.DIR, module.__file__) + + def load_rules_from_dirs(self, *dirs: Path): + """从目录中加载规则,以 `_` 开头的模块中的规则不会被导入。路径可以是相对路径或绝对路径。 + + Args: + *dirs: 储存包含rule的模块的模块路径。 + 例如:`pathlib.Path("path/of/rules/")` 。 + """ + # self._extend_rule_dirs.extend(dirs) + self._load_rules_from_dirs(*dirs) + + @property + def rules(self) -> List[Type[Plugin]]: + """当前已经加载的规则包的列表。""" + return list(chain(*self.rules_priority_dict.values())) + + def __post_init__(self): + if not self.bot.global_state.get('init', False): + self.bot.global_state = dict() + self.bot.global_state['init'] = True + + self._load_rules_from_dirs(Path(os.path.join("\\".join(os.path.dirname(__file__).split('\\')[:-2]),"rules"))) #*self.config.rule['rule_dirs']) + # self._load_rules(*self.config.rule.rules) + + ... + + async def handle(self) -> None: + """ + @TODO: HydroRollCore should be able to handle all signals and tokens from Psi. + @BODY: HydroRollCore actives the rule-packages. + """ + + if self.event.message.get_plain_text() == '.core': + await self.event.reply("HydroRollCore is running.") + elif self.event.message.startswith('.show'): + try: + await self.event.reply(eval(self.event.message.get_plain_text()[6:])) + except Exception as e: + await self.event.reply(f"{e!r}") + + async def rule(self) -> bool: + """ + @TODO: Psi should be able to handle all message first. + @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value. + """ + + if self.event.type != "message": + return False + return self.event.message.get_plain_text().startswith(".")
\ No newline at end of file diff --git a/tests/plugins/HydroRoll/config.py b/tests/plugins/HydroRoll/config.py new file mode 100644 index 0000000..7cd7520 --- /dev/null +++ b/tests/plugins/HydroRoll/config.py @@ -0,0 +1,80 @@ +import argparse +import sys +import platform +from importlib.metadata import version +import os +from typing import Set, Optional +from iamai import ConfigModel + +# 创建全局 ArgumentParser 对象 +global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数") + +class BasePluginConfig(ConfigModel): + __config_name__ = "" + handle_all_message: bool = True + """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。""" + handle_friend_message: bool = True + """是否处理好友消息。""" + handle_group_message: bool = True + """是否处理群消息。""" + accept_group: Optional[Set[int]] = None + """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。""" + message_str: str = "*{user_name} {message}" + """最终发送消息的格式。""" + + +class RegexPluginConfig(BasePluginConfig): + pass + + +class CommandPluginConfig(RegexPluginConfig): + command_prefix: Set[str] = {":", "你妈", "👅", "约瑟夫妥斯妥耶夫斯基戴安那只鸡🐔"} + """命令前缀。""" + command: Set[str] = {} + """命令文本。""" + ignore_case: bool = True + """忽略大小写。""" + + +# 定义全局配置类 +class GlobalConfig(CommandPluginConfig): + _name = "HydroRoll[水系]" + _version = "0.1.0" + _svn = "2" + _author = "简律纯" + _iamai_version = version("iamai") + _python_ver = sys.version + _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3])) + current_path = os.path.dirname(os.path.abspath("__file__")) + + # 定义系统组件 + class HydroSystem: + def __init__(self): + self.parser = argparse.ArgumentParser( + description="HydroRoll[水系].system 系统命令参数" + ) + self.subparsers = self.parser.add_subparsers() + self.status_parser = self.subparsers.add_parser( + "status", aliases=["stat", "state"], help="系统状态" + ) + self.reload_parser = self.subparsers.add_parser( + "reload", aliases=["rld"], help="重新加载系统" + ) + self.restart_parser = self.subparsers.add_parser( + "restart", aliases=["rst"], help="重启系统" + ) + self.collect_parser = self.subparsers.add_parser( + "collect", aliases=["gc"], help="释放 python 内存" + ) + self.help = "\n".join( + self.parser.format_help() + .replace("\r\n", "\n") + .replace("\r", "") + .split("\n")[2:-3] + ) + + class HydroBot: + def __init__(self) -> None: + self.parser = argparse.ArgumentParser(description="Bot命令") + + diff --git a/tests/plugins/HydroRoll/core/__init__.py b/tests/plugins/HydroRoll/core/__init__.py new file mode 100644 index 0000000..ed02489 --- /dev/null +++ b/tests/plugins/HydroRoll/core/__init__.py @@ -0,0 +1,70 @@ +from abc import ABC, abstractmethod +import os, os.path +from enum import Enum +from iamai.config import ConfigModel +from iamai.utils import is_config_class +from typing import Generic, NoReturn, Optional, Type, TYPE_CHECKING +from ...HydroRoll.typing import T_Event, T_Config + +if TYPE_CHECKING: + from iamai.bot import Bot + +class RuleLoadType(Enum): + """插件加载类型。""" + + DIR = "dir" + NAME = "name" + FILE = "file" + CLASS = "class" + + +class Rule(ABC, Generic[T_Event, T_Config]): + """所有 iamai 插件的基类。 + + Attributes: + event: 当前正在被此插件处理的事件。 + priority: 插件的优先级,数字越小表示优先级越高,默认为 0。 + block: 插件执行结束后是否阻止事件的传播。True 表示阻止。 + __rule_load_type__: 插件加载类型,由 iamai 自动设置,反映了此插件是如何被加载的。 + __rule_file_path__: 当插件加载类型为 `RuleLoadType.CLASS` 时为 `None`, + 否则为定义插件在的 Python 模块的位置。 + """ + + event: T_Event + priority: int = 0 + Config: Type[ConfigModel] + + __rule_load_type__: RuleLoadType + __rule_file_path__: Optional[str] + + def __init__(self, event: T_Event): + self.event = event + + if not hasattr(self, "priority"): + self.priority = 0 + + self.get = self.bot.get + + self.__post_init__() + + def __post_init__(self): + """用于初始化后处理,被 `__init__()` 方法调用。""" + pass + + @property + def name(self) -> str: + """规则类名称。""" + return self.__class__.__name__ + + @property + def bot(self) -> "Bot": + """机器人对象。""" + return self.event.adapter.bot + + @property + def config(self) -> Optional[T_Config]: + """规则包配置。""" + config_class: ConfigModel = getattr(self, "Rule", None) + if is_config_class(config_class): + return getattr(self.config.rule, config_class.__config_name__, None) + return None
\ No newline at end of file diff --git a/tests/plugins/Webhook/config.py b/tests/plugins/HydroRoll/exceptions.py index e69de29..e69de29 100644 --- a/tests/plugins/Webhook/config.py +++ b/tests/plugins/HydroRoll/exceptions.py diff --git a/tests/plugins/HydroRoll/psi/.gitkeep b/tests/plugins/HydroRoll/psi/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/plugins/HydroRoll/psi/.gitkeep diff --git a/tests/plugins/HydroRoll/typing.py b/tests/plugins/HydroRoll/typing.py new file mode 100644 index 0000000..876fa92 --- /dev/null +++ b/tests/plugins/HydroRoll/typing.py @@ -0,0 +1,34 @@ +"""HydroRoll 类型提示支持。 + +此模块定义了部分 HydroRoll 使用的类型。 +""" + +from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable + +from iamai.message import T_MS, T_Message, T_MessageSegment + +if TYPE_CHECKING: + from iamai.bot import Bot # noqa + from iamai.event import Event # noqa + from iamai.plugin import Plugin # noqa + from iamai.config import ConfigModel # noqa + +__all__ = [ + "T_State", + "T_Event", + "T_Plugin", + "T_Config", + "T_Message", + "T_MessageSegment", + "T_MS", + "T_BotHook", + "T_EventHook", +] + +T_State = TypeVar("T_State") +T_Event = TypeVar("T_Event", bound="Event") +T_Plugin = TypeVar("T_Plugin", bound="Plugin") +T_Config = TypeVar("T_Config", bound="ConfigModel") + +T_BotHook = Callable[["Bot"], Awaitable[NoReturn]] +T_EventHook = Callable[[T_Event], Awaitable[NoReturn]]
\ No newline at end of file diff --git a/tests/plugins/HydroRoll/utils.py b/tests/plugins/HydroRoll/utils.py new file mode 100644 index 0000000..b4dbab6 --- /dev/null +++ b/tests/plugins/HydroRoll/utils.py @@ -0,0 +1,97 @@ +import re +from abc import ABC, abstractmethod +from typing import Type, Union, Generic, TypeVar +from iamai import Plugin +from iamai.typing import T_State +from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent + +from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig + +T_Config = TypeVar("T_Config", bound=BasePluginConfig) +T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig) +T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig) + +class BasePlugin( + Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config], + ABC, + Generic[T_State, T_Config], +): + Config: Type[T_Config] = BasePluginConfig + + def format_str(self, format_str: str, message_str: str = "") -> str: + return format_str.format( + message=message_str, + user_name=self.event.sender.nickname, + user_id=self.event.sender.user_id, + ) + + async def rule(self) -> bool: + is_bot_off = False + + if self.event.adapter.name != "cqhttp": + return False + if self.event.type != "message": + return False + match_str = self.event.message.get_plain_text() + if is_bot_off: + if self.event.message.startswith(f'[CQ:at,qq={self.event.self_id}]'): + match_str = re.sub(fr'^\[CQ:at,qq={self.event.self_id}\]', '', match_str) + elif self.event.message.startswith(f'[CQ:at,qq={self.event.self_tiny_id}]'): + match_str = re.sub(fr'^\[CQ:at,qq={self.event.self_tiny_id}\]', '', match_str) + else: + return False + if self.config.handle_all_message: + return self.str_match(match_str) + elif self.config.handle_friend_message: + if self.event.message_type == "private": + return self.str_match(match_str) + elif self.config.handle_group_message: + if self.event.message_type == "group": + if ( + self.config.accept_group is None + or self.event.group_id in self.config.accept_group + ): + return self.str_match(match_str) + elif self.config.handle_group_message: + if self.event.message_type == "guild": + return self.str_match(match_str) + return False + + @abstractmethod + def str_match(self, msg_str: str) -> bool: + raise NotImplemented + + +class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC): + msg_match: re.Match + re_pattern: re.Pattern + Config: Type[T_RegexPluginConfig] = RegexPluginConfig + + def str_match(self, msg_str: str) -> bool: + msg_str = msg_str.strip() + self.msg_match = self.re_pattern.fullmatch(msg_str) + return bool(self.msg_match) + + +class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC): + command_match: re.Match + command_re_pattern: re.Pattern + Config: Type[T_CommandPluginConfig] = CommandPluginConfig + + def str_match(self, msg_str: str) -> bool: + if not hasattr(self, "command_re_pattern"): + self.command_re_pattern = re.compile( + f'({"|".join(self.config.command_prefix)})' + f'({"|".join(self.config.command)})' + r"\s*(?P<command_args>.*)", + flags=re.I if self.config.ignore_case else 0, + ) + msg_str = msg_str.strip() + self.command_match = self.command_re_pattern.fullmatch(msg_str) + if not self.command_match: + return False + self.msg_match = self.re_pattern.fullmatch( + self.command_match.group("command_args") + ) + return bool(self.msg_match) + diff --git a/tests/plugins/Webhook/__init__.py b/tests/plugins/Webhook/__init__.py deleted file mode 100644 index 23f4c3a..0000000 --- a/tests/plugins/Webhook/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -from iamai import Plugin -from iamai.log import logger as log -import asyncio -import aiohttp - -payload = None - -class Webhook(Plugin): - async def handle(self) -> None: - global payload - if payload: - log.info(payload[:5]) - await self.bot.get_adapter("cqhttp").call_api( - "send_group_msg", - group_id=126211793, - message=payload - ) - - async def rule(self) -> bool: - global payload - async with aiohttp.ClientSession() as session: - try: - async with session.get('http://localhost:3000') as response: - try: - payload = await response.text() - log.info(payload) - return True - except Exception as e: - log.info(f'Failed to fetch payload: {e}') - return False - except Exception as e: - return False
\ No newline at end of file diff --git a/tests/plugins/_show.py b/tests/plugins/_show.py new file mode 100644 index 0000000..d009760 --- /dev/null +++ b/tests/plugins/_show.py @@ -0,0 +1,36 @@ +from iamai import Plugin +import json + +class Exec(Plugin): + + priority = 1 + + async def handle(self) -> None: + try: + content = [ + { + "type": "node", + "data": { + "name": f"{self.event.sender.nickname}", + "uin": f"{self.event.sender.user_id}", + "content": [ + { + "type": "text", + "data": { + "text": f"{eval(self.event.message.get_plain_text()[6:])}" + } + } + ] + } + } + ] + res = await self.event.adapter.send_group_forward_msg(group_id=int(self.event.group_id), messages=content) + except Exception as e: + await self.event.reply(f"ERROR!{e!r}") + + async def rule(self) -> bool: + return ( + self.event.type == "message" + and + self.event.message.get_plain_text().startswith(".show") + )
\ No newline at end of file diff --git a/tests/plugins/e.py b/tests/plugins/e.py deleted file mode 100644 index be28e30..0000000 --- a/tests/plugins/e.py +++ /dev/null @@ -1,16 +0,0 @@ -from iamai import Plugin - -class Exec(Plugin): - async def handle(self) -> None: - try: - await self.event.reply(eval(self.event.raw_message[5:])) - except Exception as e: - await self.event.reply(f"ERROR:\n\t{e}") - - async def rule(self) -> bool: - if self.event.adapter.name != "cqhttp": - return False - try: - return self.event.message.get_plain_text().startswith(".show") - except: - return False
\ No newline at end of file diff --git a/tests/plugins/test2.py b/tests/plugins/test2.py deleted file mode 100644 index 45b37a9..0000000 --- a/tests/plugins/test2.py +++ /dev/null @@ -1,15 +0,0 @@ -from flask import Flask, request - -class Webhooks: - app = Flask(__name__) - requests = request - payload = None - - @app.route('/', method=['POST', 'GET']) - async def handle_webhook(self): - self.payload = await request.get_json() - # 在这里处理接收到的数据 - return 'Webhook received' - - # app.run(host='0.0.0.0',port=3000) - |
