diff options
| author | 2024-02-27 11:46:36 +0800 | |
|---|---|---|
| committer | 2024-02-27 11:46:36 +0800 | |
| commit | cac060a6e8b70e4e4e1b34555d3d4fee8bc6d007 (patch) | |
| tree | c3777687637c44d38690a364a363a8336424b59a /hydro_roll | |
| parent | 255c259cbf5ea143927dd988049c4d9ed14ac4f7 (diff) | |
| download | HydroRoll-cac060a6e8b70e4e4e1b34555d3d4fee8bc6d007.tar.gz HydroRoll-cac060a6e8b70e4e4e1b34555d3d4fee8bc6d007.zip | |
refactor!: rename HydroRoll dir as hydro_roll
Diffstat (limited to 'hydro_roll')
| -rw-r--r-- | hydro_roll/__init__.py | 80 | ||||
| -rw-r--r-- | hydro_roll/cli.py | 180 | ||||
| -rw-r--r-- | hydro_roll/config.py | 97 | ||||
| -rw-r--r-- | hydro_roll/exceptions.py | 0 | ||||
| -rw-r--r-- | hydro_roll/models/__init__.py | 0 | ||||
| -rw-r--r-- | hydro_roll/models/hola.pkl | bin | 0 -> 209 bytes | |||
| -rw-r--r-- | hydro_roll/models/utils.py | 14 | ||||
| -rw-r--r-- | hydro_roll/typing.py | 34 | ||||
| -rw-r--r-- | hydro_roll/utils.py | 162 |
9 files changed, 567 insertions, 0 deletions
diff --git a/hydro_roll/__init__.py b/hydro_roll/__init__.py new file mode 100644 index 0000000..cfaff13 --- /dev/null +++ b/hydro_roll/__init__.py @@ -0,0 +1,80 @@ +"""中间件""" + +from ast import literal_eval +import os +from os.path import dirname, join, abspath +from iamai import ConfigModel, Plugin +from iamai.log import logger +from .config import Directory +from .models.utils import * +import joblib + +try: + from .HydroRoll import sum_as_string +except ImportError: + ... + +BASE_DIR = Directory(_path=dirname(abspath("__file__"))) +HYDRO_DIR = dirname(abspath(__file__)) + + +def _init_directory(_prefix: str = ""): + """初始化水系目录""" + for _ in BASE_DIR.get_dice_dir_list(_prefix): + if not os.path.exists(_): + os.makedirs(_) + + +def _load_models(): + models = {} + models["hola"] = joblib.load(join(HYDRO_DIR, "models", "hola.pkl")) + return models + + +def load_model(model): + logger.info("loading models...") + return _load_models()[model] + + +def init_directory(_prefix: str = "HydroRoll"): + _init_directory(_prefix=_prefix) + + +class HydroRoll(Plugin): + """中间件""" + + class Config(ConfigModel): + __config_name__ = "HydroRoll" + + priority = 0 + + # TODO: infini should be able to handle all signals and tokens from Psi. + logger.info(f"Loading infini... with {sum_as_string(2,3)}") + + async def handle(self) -> None: + """ + @TODO: infini should be able to handle all signals and tokens from Psi. + @BODY: infini actives the rule-packages. + """ + + if self.event.message.get_plain_text() == ".core": + await self.event.reply("infini is running.") + elif self.event.message.startswith(".test"): + try: + result = literal_eval(self.event.message.get_plain_text()[5:]) + await self.event.reply(result) + except Exception as error: + await self.event.reply(f"{error!r}") + + async def rule(self) -> bool: + """ + @TODO: Psi should be able to handle all message first. + @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value. + """ + logger.info("loading psi...") + if not self.bot.global_state.get("HydroRoll.dir"): + hola = load_model("hola") + + init_directory() + self.bot.global_state["HydroRoll.dir"] = True + return self.event.adapter.name in ["cqhttp", "kook", "console", "mirai"] diff --git a/hydro_roll/cli.py b/hydro_roll/cli.py new file mode 100644 index 0000000..d3404ad --- /dev/null +++ b/hydro_roll/cli.py @@ -0,0 +1,180 @@ +import argparse +import os +import aiohttp +import asyncio +import json +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .typing import * + +class Cli(object): + parser = argparse.ArgumentParser(description="水系终端脚手架") + + def __init__(self): + self.parser.add_argument( + "-i", + "--install", + dest="command", + help="安装规则包、插件与模型", + action="store_const", + const="install_package", + ) + self.parser.add_argument( + "-T", + "--template", + dest="command", + help="选择模板快速创建Bot实例", + action="store_const", + const="build_template", + ) + self.parser.add_argument( + "-S", + "--search", + dest="command", + help="在指定镜像源查找规则包、插件与模型", + action="store_const", + const="search_package", + ) + self.parser.add_argument( + "-c", + "--config", + dest="command", + help="配置管理", + action="store_const", + const="config", + ) + self.args = self.parser.parse_args() + + def get_args(self): + return self.args + + def get_help(self): + return self.parser.format_help() + + async def install_packages(self): + package_name = input("请输入要安装的包名:") + url = f"https://pypi.org/pypi/{package_name}/json" + + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + await self._extract_package(data, package_name) + else: + print(f"找不到包:{package_name}") + + async def _extract_package(self, data, package_name): + latest_version = data["info"]["version"] + download_url = data["releases"][latest_version][0]["url"] + + plugins_dir = "plugins" + if not os.path.exists(plugins_dir): + os.mkdir(plugins_dir) + + file_name = download_url.split("/")[-1] + file_path = os.path.join(plugins_dir, file_name) + + async with aiohttp.ClientSession() as session: + async with session.get(download_url) as response: + if response.status == 200: + with open(file_path, "wb") as file: + file.write(await response.read()) + print(f"成功安装包:{package_name}") + else: + print(f"下载包时出错:{package_name}") + + def build_template(self): + template = input("请选择应用模板(输入数字):\n" "1. 创建轻量应用\n" "2. 创建标准应用\n" "3. 创建开发应用\n") + + if template == "1": + print("选择了轻量应用模板") + elif template == "2": + print("选择了标准应用模板") + elif template == "3": + print("选择了开发应用模板") + else: + print("无效的模板选择") + + async def search_package(self): + search_term = input("请输入要搜索的包名关键字:") + url = f"https://pypi.org/search/?q={search_term}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data: dict = response.json() # type: ignore[dict] + packages = data.get("results", []) + + for package in packages: + name = package["name"] + topics = package.get("topics", []) + + if ( + search_term.lower() in name.lower() + and "HydroRoll" in topics + ): + print(f"包名:{name}") + else: + print("搜索失败") + + def config(self): + config_dir = os.path.expanduser("~/.hydroroll") + if not os.path.exists(config_dir): + os.makedirs(config_dir) + + config_file = os.path.join(config_dir, "config.json") + + subcommand = input("请输入子命令(add/delete):") + + if subcommand == "add": + key = input("请输入要添加的键名:") + value = input("请输入要添加的键值:") + + with open(config_file, "r+") as file: + try: + config_data = json.load(file) + except json.JSONDecodeError: + config_data = {} + + config_data[key] = value + self._extracted_from_config_21(file, config_data) + print(f"成功添加配置项:{key}={value}") + + elif subcommand == "delete": + key = input("请输入要删除的键名:") + + with open(config_file, "r+") as file: + try: + config_data = json.load(file) + except json.JSONDecodeError: + config_data = {} + + if key in config_data: + del config_data[key] + self._extracted_from_config_21(file, config_data) + print(f"成功删除配置项:{key}") + else: + print(f"配置项不存在:{key}") + + else: + print("无效的子命令选择") + + # TODO Rename this here and in `config` + def _extracted_from_config_21(self, file, config_data): + file.seek(0) + json.dump(config_data, file, indent=4) + file.truncate() + + +cli = Cli() + +if cli.get_args().command == "install_package": + asyncio.run(cli.install_packages()) +elif cli.get_args().command == "build_template": + cli.build_template() +elif cli.get_args().command == "search_package": + asyncio.run(cli.search_package()) +elif cli.get_args().command == "config": + cli.config() +else: + print(cli.get_help()) diff --git a/hydro_roll/config.py b/hydro_roll/config.py new file mode 100644 index 0000000..e0789a0 --- /dev/null +++ b/hydro_roll/config.py @@ -0,0 +1,97 @@ +import argparse +import sys +import platform +from importlib.metadata import version +import os +from typing import Set, Optional +from iamai import ConfigModel + +# 创建全局 ArgumentParser 对象 +global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数") + + +class BasePluginConfig(ConfigModel): + __config_name__ = "" + handle_all_message: bool = True + """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。""" + handle_friend_message: bool = True + """是否处理好友消息。""" + handle_group_message: bool = True + """是否处理群消息。""" + accept_group: Optional[Set[int]] = None + """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。""" + message_str: str = "*{user_name} {message}" + """最终发送消息的格式。""" + + +class RegexPluginConfig(BasePluginConfig): + pass + + +class CommandPluginConfig(RegexPluginConfig): + command_prefix: Set[str] = {":", "你妈", "👅", "约瑟夫妥斯妥耶夫斯基戴安那只鸡🐔"} + """命令前缀。""" + command: Set[str] = {} + """命令文本。""" + ignore_case: bool = True + """忽略大小写。""" + + +# 定义全局配置类 +class GlobalConfig(CommandPluginConfig): + _name = "HydroRoll[水系]" + _version = "0.1.0" + _svn = "2" + _author = "简律纯" + _iamai_version = version("iamai") + _python_ver = sys.version + _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3])) + + # 定义系统组件 + class HydroSystem: + def __init__(self): + self.parser = argparse.ArgumentParser( + description="HydroRoll[水系].system 系统命令参数" + ) + self.subparsers = self.parser.add_subparsers() + self.status_parser = self.subparsers.add_parser( + "status", aliases=["stat", "state"], help="系统状态" + ) + self.reload_parser = self.subparsers.add_parser( + "reload", aliases=["rld"], help="重新加载系统" + ) + self.restart_parser = self.subparsers.add_parser( + "restart", aliases=["rst"], help="重启系统" + ) + self.collect_parser = self.subparsers.add_parser( + "collect", aliases=["gc"], help="释放 python 内存" + ) + self.help = "\n".join( + self.parser.format_help() + .replace("\r\n", "\n") + .replace("\r", "") + .split("\n")[2:-3] + ) + + class HydroBot: + def __init__(self) -> None: + self.parser = argparse.ArgumentParser(description="Bot命令") + + +class Directory(object): + def __init__(self, _path: str) -> None: + self.current_path = _path + + def get_dice_dir_list(self, _prefix: str) -> list: + + return [ + os.path.join(self.current_path, f'{_prefix}', *dirs) + for dirs in [ + ['config'], + ['data'], + ['rules'], + ['scripts'], + ['web', 'frontend'], + ['web', 'backend'], + ] + ] diff --git a/hydro_roll/exceptions.py b/hydro_roll/exceptions.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/hydro_roll/exceptions.py diff --git a/hydro_roll/models/__init__.py b/hydro_roll/models/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/hydro_roll/models/__init__.py diff --git a/hydro_roll/models/hola.pkl b/hydro_roll/models/hola.pkl Binary files differnew file mode 100644 index 0000000..b9940fa --- /dev/null +++ b/hydro_roll/models/hola.pkl diff --git a/hydro_roll/models/utils.py b/hydro_roll/models/utils.py new file mode 100644 index 0000000..73e7ba0 --- /dev/null +++ b/hydro_roll/models/utils.py @@ -0,0 +1,14 @@ +import difflib +import pickle + +def find_max_similarity(input_string, string_list): + max_similarity = 0 + max_string = "" + + for string in string_list: + similarity = difflib.SequenceMatcher(None, input_string, string).quick_ratio() + if similarity > max_similarity: + max_similarity = similarity + max_string = string + + return max_string, max_similarity
\ No newline at end of file diff --git a/hydro_roll/typing.py b/hydro_roll/typing.py new file mode 100644 index 0000000..876fa92 --- /dev/null +++ b/hydro_roll/typing.py @@ -0,0 +1,34 @@ +"""HydroRoll 类型提示支持。 + +此模块定义了部分 HydroRoll 使用的类型。 +""" + +from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable + +from iamai.message import T_MS, T_Message, T_MessageSegment + +if TYPE_CHECKING: + from iamai.bot import Bot # noqa + from iamai.event import Event # noqa + from iamai.plugin import Plugin # noqa + from iamai.config import ConfigModel # noqa + +__all__ = [ + "T_State", + "T_Event", + "T_Plugin", + "T_Config", + "T_Message", + "T_MessageSegment", + "T_MS", + "T_BotHook", + "T_EventHook", +] + +T_State = TypeVar("T_State") +T_Event = TypeVar("T_Event", bound="Event") +T_Plugin = TypeVar("T_Plugin", bound="Plugin") +T_Config = TypeVar("T_Config", bound="ConfigModel") + +T_BotHook = Callable[["Bot"], Awaitable[NoReturn]] +T_EventHook = Callable[[T_Event], Awaitable[NoReturn]]
\ No newline at end of file diff --git a/hydro_roll/utils.py b/hydro_roll/utils.py new file mode 100644 index 0000000..453e691 --- /dev/null +++ b/hydro_roll/utils.py @@ -0,0 +1,162 @@ +import re +import time +import random +from abc import ABC, abstractmethod +from typing import Type, Union, Generic, TypeVar +from iamai import Plugin +from iamai.typing import T_State +from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent + +from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig + +T_Config = TypeVar("T_Config", bound=BasePluginConfig) +T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig) +T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig) + + +class BasePlugin( + Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config], + ABC, + Generic[T_State, T_Config], +): + Config: Type[T_Config] = BasePluginConfig + + def format_str(self, format_str: str, message_str: str = "") -> str: + return format_str.format( + message=message_str, + user_name=self.event.sender.nickname, + user_id=self.event.sender.user_id, + ) + + async def rule(self) -> bool: + is_bot_off = False + + if self.event.adapter.name != "cqhttp": + return False + if self.event.type != "message": + return False + match_str = self.event.message.get_plain_text() + if is_bot_off: + if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"): + match_str = re.sub( + rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str + ) + elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"): + match_str = re.sub( + rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str + ) + else: + return False + if self.config.handle_all_message: + return self.str_match(match_str) + elif self.config.handle_friend_message: + if self.event.message_type == "private": + return self.str_match(match_str) + elif self.config.handle_group_message: + if self.event.message_type == "group": + if ( + self.config.accept_group is None + or self.event.group_id in self.config.accept_group + ): + return self.str_match(match_str) + elif self.config.handle_group_message: + if self.event.message_type == "guild": + return self.str_match(match_str) + return False + + @abstractmethod + def str_match(self, msg_str: str) -> bool: + raise NotImplemented + + +class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC): + msg_match: re.Match + re_pattern: re.Pattern + Config: Type[T_RegexPluginConfig] = RegexPluginConfig + + def str_match(self, msg_str: str) -> bool: + msg_str = msg_str.strip() + self.msg_match = self.re_pattern.fullmatch(msg_str) + return bool(self.msg_match) + + +class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC): + command_match: re.Match + command_re_pattern: re.Pattern + Config: Type[T_CommandPluginConfig] = CommandPluginConfig + + def str_match(self, msg_str: str) -> bool: + if not hasattr(self, "command_re_pattern"): + self.command_re_pattern = re.compile( + f'({"|".join(self.config.command_prefix)})' + f'({"|".join(self.config.command)})' + r"\s*(?P<command_args>.*)", + flags=re.I if self.config.ignore_case else 0, + ) + msg_str = msg_str.strip() + self.command_match = self.command_re_pattern.fullmatch(msg_str) + if not self.command_match: + return False + self.msg_match = self.re_pattern.fullmatch( + self.command_match.group("command_args") + ) + return bool(self.msg_match) + + +class PseudoRandomGenerator: + """线性同余法随机数生成器""" + + def __init__(self, seed): + self.seed = seed + + def generate(self): + while True: + self.seed = (self.seed * 1103515245 + 12345) % (2**31) + yield self.seed + + +class HydroDice: + """水系掷骰组件 + + 一些 API 相关的工具函数 + + """ + + def __init__(self, seed): + self.generator = PseudoRandomGenerator(seed) + + def roll_dice( + self, + _counts: int | str, + _sides: int | str, + is_reversed: bool = False, + streamline: bool = False, + threshold: int | str = 5, + ) -> str: + """普通掷骰 + Args: + _counts (int | str): 掷骰个数. + _sides (int | str): 每个骰子的面数. + is_reversed (bool, optional): 倒序输出. Defaults to False. + streamline (bool, optional): 忽略过程. Defaults to False. + threshold (int | str, optional): streamline 的阈值. Defaults to 5. + + Returns: + str: 表达式结果. + """ + rolls = [] + for _ in range(int(_counts)): + roll = next(self.generator.generate()) % _sides + 1 + rolls.append(roll) + total = sum(rolls) + + if streamline: + return str(total) + else: + if len(rolls) > int(threshold): + return str(total) + rolls_str = " + ".join(str(r) for r in rolls) + result_str = ( + f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}" + ) + return result_str |
