From dfecacde300b0a38907fb6a1d4aa22a202b57f0d Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Wed, 15 Nov 2023 10:15:12 +0800 Subject: refactor: embedded BasePluginConfig in utils --- example/plugins/HydroRoll/__init__.py | 20 +----- example/plugins/HydroRoll/command.py | 6 +- example/plugins/HydroRoll/config.py | 92 +++++++----------------- example/plugins/HydroRoll/typing.py | 34 --------- example/plugins/HydroRoll/utils.py | 127 ++++++++++++++++------------------ example/plugins/r.py | 10 --- example/plugins/show.py | 2 +- example/plugins/tf/__init__.py | 52 -------------- example/plugins/tf/config.py | 0 9 files changed, 90 insertions(+), 253 deletions(-) delete mode 100644 example/plugins/HydroRoll/typing.py delete mode 100644 example/plugins/r.py delete mode 100644 example/plugins/tf/__init__.py delete mode 100644 example/plugins/tf/config.py (limited to 'example/plugins') diff --git a/example/plugins/HydroRoll/__init__.py b/example/plugins/HydroRoll/__init__.py index d8c87f5..6739d94 100644 --- a/example/plugins/HydroRoll/__init__.py +++ b/example/plugins/HydroRoll/__init__.py @@ -58,6 +58,7 @@ class HydroRoll(Plugin): logger.info(f"Command {current_cmd} not found with flag {flag}") if args[0] in [".root", ".roots"]: import requests + data = requests.get("https://vercel-hitokoto.vercel.app/api/roots").json() await self.event.reply(data["line"]) else: @@ -80,9 +81,8 @@ class HydroRoll(Plugin): @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value. """ logger.info("loading psi...") - if ( - not self.bot.global_state["HydroRoll"].get("hola") - and not os.path.exists(join(BASE_DIR, "HydroRoll")) + if not self.bot.global_state["HydroRoll"].get("hola") and not os.path.exists( + join(BASE_DIR, "HydroRoll") ): # hola = self.models["hola"] # _, max_similarity = find_max_similarity( @@ -126,17 +126,3 @@ class HydroRoll(Plugin): def load_models(self): self.models = self._load_models(self.model_path_list, self.model_dict) - - # async def ask(self, ask_text: str | None, timeout: int = 10) -> None: - # if ask_text: - # await self.event.reply(ask_text) - # try: - # event = await self.event.adapter.get( - # lambda x: x.type == "message" - # and x.group_id == self.event.group_id - # and x.user_id == self.event.user_id, - # timeout=timeout, - # ) - # return event - # except GetEventTimeout as e: - # raise GetEventTimeout from e diff --git a/example/plugins/HydroRoll/command.py b/example/plugins/HydroRoll/command.py index 5a31534..dcb10b0 100644 --- a/example/plugins/HydroRoll/command.py +++ b/example/plugins/HydroRoll/command.py @@ -1,9 +1,7 @@ -from .utils import CommandPluginBase - -class Get(CommandPluginBase): +class Get: ... -class Set(CommandPluginBase): +class Set: ... \ No newline at end of file diff --git a/example/plugins/HydroRoll/config.py b/example/plugins/HydroRoll/config.py index 788c27a..97591fe 100644 --- a/example/plugins/HydroRoll/config.py +++ b/example/plugins/HydroRoll/config.py @@ -9,30 +9,14 @@ import os from typing import Set, Optional from iamai import ConfigModel import datetime -# 创建全局 ArgumentParser 对象 -global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数") +from typing import Set -class Color: - # 定义ANSI转义序列 - RESET = '\033[0m' - BLUE_BASE = '\033[36m' - BLUE_DARK = '\033[34m' - BLUE_DARKER = '\033[32m' - BLUE_DARKEST = '\033[30m' +from pydantic import Field class BasePluginConfig(ConfigModel): - __config_name__ = "" - handle_all_message: bool = True - """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。""" - handle_friend_message: bool = True - """是否处理好友消息。""" - handle_group_message: bool = True - """是否处理群消息。""" - accept_group: Optional[Set[int]] = None - """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。""" - message_str: str = "*{user_name} {message}" + message_str: str = "{message}" """最终发送消息的格式。""" @@ -41,16 +25,25 @@ class RegexPluginConfig(BasePluginConfig): class CommandPluginConfig(RegexPluginConfig): - command_prefix: Set[str] = {":"} + command_prefix: Set[str] = Field(default_factory=lambda: {".", "。"}) """命令前缀。""" - command: Set[str] = {} + command: Set[str] = Field(default_factory=set) """命令文本。""" ignore_case: bool = True """忽略大小写。""" +class Color: + # 定义ANSI转义序列 + RESET = "\033[0m" + BLUE_BASE = "\033[36m" + BLUE_DARK = "\033[34m" + BLUE_DARKER = "\033[32m" + BLUE_DARKEST = "\033[30m" + + # 定义全局配置类 -class GlobalConfig(CommandPluginConfig): +class GlobalConfig: _name = "HydroRoll[水系]" _version = "0.1.0" _svn = "2" @@ -73,39 +66,6 @@ class GlobalConfig(CommandPluginConfig): Github: https://github.com/HydroRoll-Team Under the MIT License, see LICENSE for more details. """ - _rg = Generator(AESCounter()) - SEED = _rg.random() # 随机数种子 - - # 定义系统组件 - - class HydroSystem: - def __init__(self): - self.parser = argparse.ArgumentParser( - description="HydroRoll[水系].system 系统命令参数" - ) - self.subparsers = self.parser.add_subparsers() - self.status_parser = self.subparsers.add_parser( - "status", aliases=["stat", "state"], help="系统状态" - ) - self.reload_parser = self.subparsers.add_parser( - "reload", aliases=["rld"], help="重新加载系统" - ) - self.restart_parser = self.subparsers.add_parser( - "restart", aliases=["rst"], help="重启系统" - ) - self.collect_parser = self.subparsers.add_parser( - "collect", aliases=["gc"], help="释放 python 内存" - ) - self.help = "\n".join( - self.parser.format_help() - .replace("\r\n", "\n") - .replace("\r", "") - .split("\n")[2:-3] - ) - - class HydroBot: - def __init__(self) -> None: - self.parser = argparse.ArgumentParser(description="Bot命令") class Directory(object): @@ -113,16 +73,15 @@ class Directory(object): self.current_path = _path def get_dice_dir_list(self, _prefix: str) -> list: - return [ - os.path.join(self.current_path, f'{_prefix}', *dirs) + os.path.join(self.current_path, f"{_prefix}", *dirs) for dirs in [ - ['config'], - ['data'], - ['rules'], - ['scripts'], - ['web', 'frontend'], - ['web', 'backend'], + ["config"], + ["data"], + ["rules"], + ["scripts"], + ["web", "frontend"], + ["web", "backend"], ] ] @@ -133,8 +92,8 @@ class FileManager(object): def get_file_list(self, _dir: str): return { - 'web;frontend': 'index.html', - 'data': 'censor.json', + "web;frontend": "index.html", + "data": "censor.json", } @@ -142,8 +101,7 @@ class Models: """模型管理类""" def __init__(self) -> None: - self.builtin_models = {'hola': 'hola.pkl'} + self.builtin_models = {"hola": "hola.pkl"} def get_models_dict(self) -> dict: return self.builtin_models - diff --git a/example/plugins/HydroRoll/typing.py b/example/plugins/HydroRoll/typing.py deleted file mode 100644 index 876fa92..0000000 --- a/example/plugins/HydroRoll/typing.py +++ /dev/null @@ -1,34 +0,0 @@ -"""HydroRoll 类型提示支持。 - -此模块定义了部分 HydroRoll 使用的类型。 -""" - -from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable - -from iamai.message import T_MS, T_Message, T_MessageSegment - -if TYPE_CHECKING: - from iamai.bot import Bot # noqa - from iamai.event import Event # noqa - from iamai.plugin import Plugin # noqa - from iamai.config import ConfigModel # noqa - -__all__ = [ - "T_State", - "T_Event", - "T_Plugin", - "T_Config", - "T_Message", - "T_MessageSegment", - "T_MS", - "T_BotHook", - "T_EventHook", -] - -T_State = TypeVar("T_State") -T_Event = TypeVar("T_Event", bound="Event") -T_Plugin = TypeVar("T_Plugin", bound="Plugin") -T_Config = TypeVar("T_Config", bound="ConfigModel") - -T_BotHook = Callable[["Bot"], Awaitable[NoReturn]] -T_EventHook = Callable[[T_Event], Awaitable[NoReturn]] \ No newline at end of file diff --git a/example/plugins/HydroRoll/utils.py b/example/plugins/HydroRoll/utils.py index 2665580..2b4c7bf 100644 --- a/example/plugins/HydroRoll/utils.py +++ b/example/plugins/HydroRoll/utils.py @@ -5,100 +5,94 @@ import random from abc import ABC, abstractmethod from typing import Type, Union, Generic, TypeVar from iamai import Plugin -from iamai.typing import T_State -from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent -from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig +import re +from abc import ABC, abstractmethod +from typing import Any, Generic, TypeVar + +from iamai import MessageEvent, Plugin +from iamai.typing import StateT -T_Config = TypeVar("T_Config", bound=BasePluginConfig) -T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig) -T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig) +from .config import BasePluginConfig, CommandPluginConfig, RegexPluginConfig + +ConfigT = TypeVar("ConfigT", bound=BasePluginConfig) +RegexPluginConfigT = TypeVar("RegexPluginConfigT", bound=RegexPluginConfig) +CommandPluginConfigT = TypeVar("CommandPluginConfigT", bound=CommandPluginConfig) class BasePlugin( - Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config], + Plugin[MessageEvent[Any], StateT, ConfigT], ABC, - Generic[T_State, T_Config], + Generic[StateT, ConfigT], ): - Config: Type[T_Config] = BasePluginConfig # type: ignore - def format_str(self, format_str: str, message_str: str = "") -> str: return format_str.format( message=message_str, - user_name=self.event.sender.nickname, - user_id=self.event.sender.user_id, + user_name=self.get_event_sender_name(), + user_id=self.get_event_sender_id(), ) - async def rule(self) -> bool: - is_bot_off = False + def get_event_sender_name(self) -> str: + from iamai.adapter.onebot11.event import MessageEvent as OneBotMessageEvent - if self.event.adapter.name != "cqhttp": - return False - if self.event.type != "message": - return False - match_str = self.event.message.get_plain_text() - if is_bot_off: - if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"): - match_str = re.sub( - rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str - ) - elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"): - match_str = re.sub( - rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str - ) - else: - return False - if self.config.handle_all_message: - return self.str_match(match_str) - elif self.config.handle_friend_message: - if self.event.message_type == "private": - return self.str_match(match_str) - elif self.config.handle_group_message: - if self.event.message_type == "group": - if ( - self.config.accept_group is None - or self.event.group_id in self.config.accept_group - ): - return self.str_match(match_str) - return False + if isinstance(self.event, OneBotMessageEvent): + return self.event.sender.nickname or "" + return "" + + def get_event_sender_id(self) -> str: + from iamai.adapter.onebot11.event import MessageEvent as OneBotMessageEvent + + if isinstance(self.event, OneBotMessageEvent): + if self.event.sender.user_id is not None: + return str(self.event.sender.user_id) + return "" + return "" + + async def rule(self) -> bool: + return isinstance(self.event, MessageEvent) and self.str_match( + self.event.get_plain_text() + ) @abstractmethod def str_match(self, msg_str: str) -> bool: - raise NotImplemented + raise NotImplementedError -class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC): - msg_match: re.Match - re_pattern: re.Pattern - Config: Type[T_RegexPluginConfig] = RegexPluginConfig +class RegexPluginBase(BasePlugin[StateT, RegexPluginConfigT], ABC): + msg_match: re.Match[str] + re_pattern: re.Pattern[str] def str_match(self, msg_str: str) -> bool: msg_str = msg_str.strip() - self.msg_match = self.re_pattern.fullmatch(msg_str) + msg_match = self.re_pattern.fullmatch(msg_str) + if msg_match is None: + return False + self.msg_match = msg_match return bool(self.msg_match) -class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC): - command_match: re.Match - command_re_pattern: re.Pattern - Config: Type[T_CommandPluginConfig] = CommandPluginConfig +class CommandPluginBase(RegexPluginBase[StateT, CommandPluginConfigT], ABC): + command_match: re.Match[str] + command_re_pattern: re.Pattern[str] def str_match(self, msg_str: str) -> bool: - if not hasattr(self, "command_re_pattern"): - self.command_re_pattern = re.compile( - f'({"|".join(self.config.command_prefix)})' + if not hasattr(self, "re_pattern"): + self.re_pattern = re.compile( + f'[{"".join(self.config.command_prefix)}]' f'({"|".join(self.config.command)})' r"\s*(?P.*)", flags=re.I if self.config.ignore_case else 0, ) msg_str = msg_str.strip() - self.command_match = self.command_re_pattern.fullmatch(msg_str) - if not self.command_match: + msg_match = self.re_pattern.fullmatch(msg_str) + if not msg_match: return False - self.msg_match = self.re_pattern.fullmatch( - self.command_match.group("command_args") - ) - return bool(self.msg_match) + self.msg_match = msg_match + command_match = self.re_pattern.fullmatch(self.msg_match.group("command_args")) + if not command_match: + return False + self.command_match = command_match + return True class PseudoRandomGenerator: @@ -153,9 +147,7 @@ class HydroDice: if len(rolls) > int(threshold): return str(total) rolls_str = " + ".join(str(r) for r in rolls) - return ( - f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}" - ) + return f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}" def find_max_similarity(input_string, string_list): @@ -174,6 +166,5 @@ def find_max_similarity(input_string, string_list): def check_file(filename: str) -> bool: """根据给定参数校验文件夹内文件完整性""" - - - return False \ No newline at end of file + + return False diff --git a/example/plugins/r.py b/example/plugins/r.py deleted file mode 100644 index 2f05af9..0000000 --- a/example/plugins/r.py +++ /dev/null @@ -1,10 +0,0 @@ -from iamai import Plugin - - -class Roll(Plugin): - async def handle(self) -> None: - await self.event.reply("""Attack: 25 -Damage: 9""") - - async def rule(self) -> bool: - return self.event.type == "message" and self.event.message.startswith("/r") diff --git a/example/plugins/show.py b/example/plugins/show.py index e43a241..a9cf446 100644 --- a/example/plugins/show.py +++ b/example/plugins/show.py @@ -1,6 +1,6 @@ from iamai import Plugin from numpy.random import Generator -from iamai.adapter.cqhttp.message import CQHTTPMessage, CQHTTPMessageSegment +from iamai.adapter.onebot11.message import CQHTTPMessage, CQHTTPMessageSegment ms = CQHTTPMessageSegment diff --git a/example/plugins/tf/__init__.py b/example/plugins/tf/__init__.py deleted file mode 100644 index ce78d97..0000000 --- a/example/plugins/tf/__init__.py +++ /dev/null @@ -1,52 +0,0 @@ - -from iamai import Plugin -from iamai.log import logger as _logger -from HydroRoll.models.cos_sim import cosSim -import jieba - -logger = _logger - - -texts = [ - "你好 HydroRoll", - "你好 水系", - "hi 水系", - "hi HydroRoll", - "hello 水系", - "hello HydroRoll", - "hola 水系", - "hola HydroRoll", -] - -cos_Sim = cosSim(texts) -logger.info(f"{cos_Sim.calcuSim('你好')}") - -cos_Sim.save('cos.h5') - -model = cosSim.load('cos.h5') - -logger.info(f"{model}") - -# class Sim(Plugin): -# async def handle(self) -> None: -# try: - -# txt_list = eval(self.event.message.get_plain_text()[5:]) -# if len(txt_list) == 1: -# await self.event.reply(f"{cos_Sim.CalcuSim(txt_list)}") -# elif len(txt_list) == 2: -# corpuss = [" ".join(jieba.cut(text)) -# for text in eval(self.event.message.get_plain_text()[5:])] -# await self.event.reply(str(corpuss)) -# vocabulary = cos_Sim.getVocabulary(corpuss) -# v = cos_Sim.getVectors(corpuss, vocabulary=vocabulary) -# await self.event.reply(f"weight\n=========\n{v}") -# await self.event.reply(f"相似度\n=========\n{cos_Sim.cos_sim(v[0], v[1])}") -# except Exception as e: -# await self.event.reply(f"{e!r}") - -# async def rule(self) -> bool: -# return self.event.type == "message" and self.event.message.startswith(".cos") - - - diff --git a/example/plugins/tf/config.py b/example/plugins/tf/config.py deleted file mode 100644 index e69de29..0000000 -- cgit v1.2.3-70-g09d2