diff options
| author | 2023-11-03 03:28:35 +0800 | |
|---|---|---|
| committer | 2023-11-03 03:28:35 +0800 | |
| commit | 5a2033860a328c4116f0ede2874915315e7487b0 (patch) | |
| tree | f7b6d88d50ce33bd5fb52fbcfbca906738f412d6 /tests | |
| parent | 4bf6db5200affc2f623aa02301020092c0789d19 (diff) | |
| download | HydroRoll-5a2033860a328c4116f0ede2874915315e7487b0.tar.gz HydroRoll-5a2033860a328c4116f0ede2874915315e7487b0.zip | |
Co-authored-by: HadalFauna <HadalFauna@users.noreply.github.com>
Diffstat (limited to 'tests')
22 files changed, 772 insertions, 379 deletions
diff --git a/tests/plugins/HydroRoll/psi/.gitkeep b/tests/__init__.py index e69de29..e69de29 100644 --- a/tests/plugins/HydroRoll/psi/.gitkeep +++ b/tests/__init__.py diff --git a/tests/config.example.toml b/tests/config.example.toml index d4b4697..68ddef6 100644 --- a/tests/config.example.toml +++ b/tests/config.example.toml @@ -4,7 +4,7 @@ plugin_dirs = ["plugins"] adapters = ["iamai.adapter.cqhttp","iamai.adapter.apscheduler"] [bot.log] -level = "INFO" +level = "DEBUG" verbose_exception = true [adapter.cqhttp] diff --git a/tests/config.toml b/tests/config.toml index f69b51b..19e1e5f 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -9,16 +9,17 @@ adapters = ["iamai.adapter.cqhttp","iamai.adapter.apscheduler"] level = "INFO" verbose_exception = true -[plugin.HydroRoll.rule] -rules = [] -rule_dirs = ["rules"] - [adapter.cqhttp] adapter_type = "reverse-ws" host = "127.0.0.1" -port = 15800 +port = 8080 url = "/cqhttp/ws" show_raw = false [adapter.apscheduler] -scheduler_config = { "apscheduler.timezone" = "Asia/Shanghai" }
\ No newline at end of file +scheduler_config = { "apscheduler.timezone" = "Asia/Shanghai" } + +[plugin.HydroRoll] +uid = '' +rules = [] +rule_dirs = ["rules"]
\ No newline at end of file diff --git a/tests/config/userConf.dat b/tests/config/userConf.dat Binary files differdeleted file mode 100644 index ea150d9..0000000 --- a/tests/config/userConf.dat +++ /dev/null diff --git a/tests/cos.h5 b/tests/cos.h5 Binary files differnew file mode 100644 index 0000000..4d43b53 --- /dev/null +++ b/tests/cos.h5 diff --git a/tests/plugins/HydroRoll/__init__.py b/tests/plugins/HydroRoll/__init__.py index b064bca..e1252f0 100644 --- a/tests/plugins/HydroRoll/__init__.py +++ b/tests/plugins/HydroRoll/__init__.py @@ -1,221 +1,102 @@ -from collections import defaultdict -from itertools import chain -from pathlib import Path -from typing import Dict, List, Optional, Type, Union -from iamai import ConfigModel, Plugin -from iamai.log import logger, error_or_exception -from iamai.utils import ModulePathFinder, get_classes_from_module_name, get_classes_from_dir, get_classes_from_module -from .config import GlobalConfig -from iamai.exceptions import LoadModuleError -from .utils import BasePlugin, RegexPluginBase, CommandPluginBase +"""中间件""" +import joblib import os -# from .core import Rule, RuleLoadType -from HydroRollCore import Rule, RuleLoadType +import shutil +from ast import literal_eval +from os.path import dirname, join, abspath +from iamai import ConfigModel, Plugin +from iamai.log import logger +from .config import Directory, GlobalConfig, Models +from .utils import * + +BASE_DIR = dirname(abspath("__file__")) +HYDRO_DIR = dirname(abspath(__file__)) + +# logger.info(GlobalConfig._copyright) + class HydroRoll(Plugin): - + """中间件""" class Config(ConfigModel): __config_name__ = "HydroRoll" - + priority = 0 - rules_priority_dict: Dict[int, List[Type[Rule]]] = defaultdict(list) - _module_path_finder = ModulePathFinder() - - - def _load_rule_class( - self, - rule_class: Type[Rule], - rule_load_type: RuleLoadType, - rule_file_path: Optional[str], - ): - logger.info(f'Loading rule from class "{rule_class!r}"') - """加载规则类。""" - priority = getattr(rule_class, "priority", None) - if type(priority) is int and priority >= 0: - for _rule in self.rules: - if _rule.__name__ == rule_class.__name__: - logger.warning( - f'Already have a same name rule "{_rule.__name__}"' - ) - rule_class.__rule_load_type__ = rule_load_type - rule_class.__rule_file_path__ = rule_file_path - self.rules_priority_dict[priority].append(rule_class) - logger.success( - f'Succeeded to load rule "{rule_class.__name__}" ' - f'from class "{rule_class!r}"' - ) - else: - error_or_exception( - f'Load rule from class "{rule_class!r}" failed:', - LoadModuleError( - f'Rule priority incorrect in the class "{rule_class!r}"' - ), - self.bot.config.bot.log.verbose_exception, - ) - - def _load_rules_from_module_name( - self, module_name: str, rule_load_type: RuleLoadType - ): - logger.info(f'Loading rules from module "{module_name}"') - """从模块名称中规则包模块。""" - try: - rule_classes = get_classes_from_module_name(module_name, Rule) - except ImportError as e: - error_or_exception( - f'Import module "{module_name}" failed:', - e, - self.bot.config.bot.log.verbose_exception, - ) - else: - for rule_class, module in rule_classes: - self._load_rule_class( - rule_class, - rule_load_type, - module.__file__, - ) - - def _load_rules( - self, - *rules: Union[Type[Plugin], str, Path], - rule_load_type: Optional[RuleLoadType] = None, - ): - """加载规则包。 - - Args: - *rules: 规则类、规则包模块名称或者规则包模块文件路径。类型可以是 `Type[Rule]`, `str` 或 `pathlib.Path`。 - 如果为 `Type[Rule]` 类型时,将作为规则类进行加载。 - 如果为 `str` 类型时,将作为规则包模块名称进行加载,格式和 Python `import` 语句相同。 - 例如:`path.of.rule`。 - 如果为 `pathlib.Path` 类型时,将作为规则包模块文件路径进行加载。 - 例如:`pathlib.Path("path/of/rule")`。 - rule_load_type: 规则加载类型,如果为 None 则自动判断,否则使用指定的类型。 - """ - logger.info("Loading rules...") - for rule_ in rules: - if isinstance(rule_, type): - if issubclass(rule_, Rule): - self._load_rule_class( - rule_, rule_load_type or RuleLoadType.CLASS, None - ) - else: - logger.error( - f'The rule class "{rule_!r}" must be a subclass of Rule' - ) - elif isinstance(rule_, str): - logger.warning(f'Loading rules from module "{rule_}"') - self._load_rules_from_module_name( - rule_, rule_load_type or RuleLoadType.NAME - ) - elif isinstance(rule_, Path): - logger.warning(f'Loading rules from path "{rule_}"') - if rule_.is_file(): - if rule_.suffix != ".py": - logger.error(f'The path "{rule_}" must endswith ".py"') - return - - rule_module_name = None - for path in self._module_path_finder.path: - try: - if rule_.stem == "__init__": - if rule_.resolve().parent.parent.samefile(Path(path)): - rule_module_name = rule_.resolve().parent.name - break - elif rule_.resolve().parent.samefile(Path(path)): - rule_module_name = rule_.stem - break - except OSError: - continue - if rule_module_name is None: - rel_path = rule_.resolve().relative_to(Path(".").resolve()) - if rel_path.stem == "__init__": - rule_module_name = ".".join(rel_path.parts[:-1]) - else: - rule_module_name = ".".join( - rel_path.parts[:-1] + (rel_path.stem,) - ) - - self._load_rules_from_module_name( - rule_module_name, rule_load_type or RuleLoadType.FILE - ) - else: - logger.error(f'The rule path "{rule_}" must be a file') - else: - logger.error(f"Type error: {rule_} can not be loaded as plugin") - - def load_rules(self, *rules: Union[Type[Rule], str, Path]): - """加载规则。 - - Args: - *rules: 规则类、规则包x模块名称或者规则包模块文件路径。类型可以是 `Type[Rule]`, `str` 或 `pathlib.Path`。 - 如果为 `Type[Rule]` 类型时,将作为规则类进行加载。 - 如果为 `str` 类型时,将作为规则包模块名称进行加载,格式和 Python `import` 语句相同。 - 例如:`path.of.rule`。 - 如果为 `pathlib.Path` 类型时,将作为规则包模块文件路径进行加载。 - 例如:`pathlib.Path("path/of/rule")`。 - """ - # self._extend_rules.extend(rules) - return self._load_rules(*rules) - - def _load_rules_from_dirs(self, *dirs: Path): - """从目录中加载规则包,以 `_` 开头的模块中的规则不会被导入。路径可以是相对路径或绝对路径。 - - Args: - *dirs: 储存包含规则的模块的模块路径。 - 例如:`pathlib.Path("path/of/rules/")` 。 - """ - logger.info("Loading rules from dirs...") - dirs = list(map(lambda x: str(x.resolve()), dirs)) # type: ignore maybe remove? - logger.warning(f'Loading rules from dirs "{", ".join(map(str, dirs))}"') - self._module_path_finder.path.extend(dirs) # type: ignore - # type: ignore - for rule_class, module in get_classes_from_dir(dirs, Rule): # type: ignore - self._load_rule_class(rule_class, RuleLoadType.DIR, module.__file__) - - def load_rules_from_dirs(self, *dirs: Path): - """从目录中加载规则,以 `_` 开头的模块中的规则不会被导入。路径可以是相对路径或绝对路径。 - - Args: - *dirs: 储存包含rule的模块的模块路径。 - 例如:`pathlib.Path("path/of/rules/")` 。 - """ - # self._extend_rule_dirs.extend(dirs) - self._load_rules_from_dirs(*dirs) - - @property - def rules(self) -> List[Type[Plugin]]: - """当前已经加载的规则包的列表。""" - return list(chain(*self.rules_priority_dict.values())) - + + # TODO: HydroRollCore should be able to handle all signals and tokens from Psi. + logger.info("Loading HydroRollCore...") + def __post_init__(self): - if not self.bot.global_state.get('init', False): - self.bot.global_state = dict() - self.bot.global_state['init'] = True - - self._load_rules_from_dirs(Path(os.path.join("\\".join(os.path.dirname(__file__).split('\\')[:-2]),"rules"))) #*self.config.rule['rule_dirs']) - # self._load_rules(*self.config.rule.rules) - - ... - + self.state = {} + self.model_path_list = [] + self.bot.global_state['HydroRoll'] = {} + self.model_dict = Models().get_models_dict() + + self.model_path_list.append(join(BASE_DIR, "models")) + self.model_path_list.append(join(HYDRO_DIR, "models")) + self.model_path_list.append(join(BASE_DIR, "HydroRoll", "models")) + + self.load_models() + async def handle(self) -> None: """ @TODO: HydroRollCore should be able to handle all signals and tokens from Psi. @BODY: HydroRollCore actives the rule-packages. """ - if self.event.message.get_plain_text() == '.core': + if self.event.message.get_plain_text() == ".core": await self.event.reply("HydroRollCore is running.") - elif self.event.message.startswith('.show'): + elif self.event.message.startswith(".test"): try: - await self.event.reply(eval(self.event.message.get_plain_text()[6:])) - except Exception as e: - await self.event.reply(f"{e!r}") - + result = literal_eval(self.event.message.get_plain_text()[5:]) + await self.event.reply(str(result)) + except Exception as error: + await self.event.reply(f"{error!r}") + async def rule(self) -> bool: """ - @TODO: Psi should be able to handle all message first. + @TODO: Psi should be able to handle all message first. @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value. """ - - if self.event.type != "message": - return False - return self.event.message.get_plain_text().startswith(".")
\ No newline at end of file + logger.info("loading psi...") + if not self.bot.global_state['HydroRoll'].get('hola') and self.event.type == "message" and self.event.message_type == "private" and not os.path.exists(join(BASE_DIR, "HydroRoll")): + hola = self.models['hola'] + _, max_similarity = find_max_similarity( + self.event.message.get_plain_text(), hola) + if max_similarity > 0.51: + self.init_directory() + self.bot.global_state['HydroRoll']['hola'] = True + await self.event.reply(f"验证成功√ 正在初始化水系目录...") + logger.info(GlobalConfig._copyright) + return self.event.adapter.name in ['cqhttp', 'kook', 'console', 'mirai'] + + def _init_directory(self, _prefix: str = ''): + """初始化水系目录""" + for _ in Directory(BASE_DIR).get_dice_dir_list(_prefix): + if not os.path.exists(_): + os.makedirs(_) + + def _init_file(self, _prefix: str = ''): + """初始化文件""" + + def init_directory(self, _prefix: str = 'HydroRoll'): + """在指定目录生成水系文件结构""" + self._init_directory(_prefix=_prefix) + + def _load_model(self, path: str, model_file: str): + if model_file is None: + model_file = '' + return joblib.load(join(path, f'{model_file}')) + + def _load_models(self, model_path_list: list, model_dict: dict) -> dict: + """加载指定模型, 当然也可能是数据集""" + models = {} + for path in model_path_list: + for model_name, model_file in model_dict.items(): + if os.path.exists(join(path, model_file)): + models[model_name] = self._load_model(path, model_file) + logger.success( + f'Succeeded to load model "{model_name}"') + return models + + def load_models(self): + self.models = self._load_models(self.model_path_list, self.model_dict) diff --git a/tests/plugins/HydroRoll/config.py b/tests/plugins/HydroRoll/config.py index 7cd7520..daab4d3 100644 --- a/tests/plugins/HydroRoll/config.py +++ b/tests/plugins/HydroRoll/config.py @@ -1,3 +1,5 @@ +from randomgen import AESCounter +from numpy.random import Generator import argparse import sys import platform @@ -5,10 +7,20 @@ from importlib.metadata import version import os from typing import Set, Optional from iamai import ConfigModel - +import datetime # 创建全局 ArgumentParser 对象 global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数") + +class Color: + # 定义ANSI转义序列 + RESET = '\033[0m' + BLUE_BASE = '\033[36m' + BLUE_DARK = '\033[34m' + BLUE_DARKER = '\033[32m' + BLUE_DARKEST = '\033[30m' + + class BasePluginConfig(ConfigModel): __config_name__ = "" handle_all_message: bool = True @@ -45,9 +57,24 @@ class GlobalConfig(CommandPluginConfig): _iamai_version = version("iamai") _python_ver = sys.version _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3])) - current_path = os.path.dirname(os.path.abspath("__file__")) + _copyright = f"""\033[36m + _ __ _ _ + /\ /\_ _ __| |_ __ ___ /__\ ___ | | | + / /_/ / | | |/ _` | '__/ _ \ / \/// _ \| | | +/ __ /| |_| | (_| | | | (_) / _ \ (_) | | | +\/ /_/ \__, |\__,_|_| \___/\/ \_/\___/|_|_| + |___/ + +\033[4m{_name} [版本 {_version}]\033[0m\033[36m +(c) HydroRoll-Team contributors, {_author}。 +Github: https://github.com/HydroRoll-Team +Under the MIT License, see LICENSE for more details. +""" + _rg = Generator(AESCounter()) + SEED = _rg.random() # 随机数种子 # 定义系统组件 + class HydroSystem: def __init__(self): self.parser = argparse.ArgumentParser( @@ -78,3 +105,40 @@ class GlobalConfig(CommandPluginConfig): self.parser = argparse.ArgumentParser(description="Bot命令") +class Directory(object): + def __init__(self, _path: str) -> None: + self.current_path = _path + + def get_dice_dir_list(self, _prefix: str) -> list: + + return [ + os.path.join(self.current_path, f'{_prefix}', *dirs) + for dirs in [ + ['config'], + ['data'], + ['rules'], + ['scripts'], + ['web', 'frontend'], + ['web', 'backend'], + ] + ] + + +class FileManager(object): + def __init__(self, _path: str) -> None: + self.current_path = _path + + def get_file_list(self, _dir: str): + return { + 'web;frontend': 'index.html' + } + + +class Models: + """模型管理类""" + + def __init__(self) -> None: + self.builtin_models = {'hola': 'hola.pkl'} + + def get_models_dict(self) -> dict: + return self.builtin_models diff --git a/tests/plugins/HydroRoll/core/__init__.py b/tests/plugins/HydroRoll/core/__init__.py deleted file mode 100644 index ed02489..0000000 --- a/tests/plugins/HydroRoll/core/__init__.py +++ /dev/null @@ -1,70 +0,0 @@ -from abc import ABC, abstractmethod -import os, os.path -from enum import Enum -from iamai.config import ConfigModel -from iamai.utils import is_config_class -from typing import Generic, NoReturn, Optional, Type, TYPE_CHECKING -from ...HydroRoll.typing import T_Event, T_Config - -if TYPE_CHECKING: - from iamai.bot import Bot - -class RuleLoadType(Enum): - """插件加载类型。""" - - DIR = "dir" - NAME = "name" - FILE = "file" - CLASS = "class" - - -class Rule(ABC, Generic[T_Event, T_Config]): - """所有 iamai 插件的基类。 - - Attributes: - event: 当前正在被此插件处理的事件。 - priority: 插件的优先级,数字越小表示优先级越高,默认为 0。 - block: 插件执行结束后是否阻止事件的传播。True 表示阻止。 - __rule_load_type__: 插件加载类型,由 iamai 自动设置,反映了此插件是如何被加载的。 - __rule_file_path__: 当插件加载类型为 `RuleLoadType.CLASS` 时为 `None`, - 否则为定义插件在的 Python 模块的位置。 - """ - - event: T_Event - priority: int = 0 - Config: Type[ConfigModel] - - __rule_load_type__: RuleLoadType - __rule_file_path__: Optional[str] - - def __init__(self, event: T_Event): - self.event = event - - if not hasattr(self, "priority"): - self.priority = 0 - - self.get = self.bot.get - - self.__post_init__() - - def __post_init__(self): - """用于初始化后处理,被 `__init__()` 方法调用。""" - pass - - @property - def name(self) -> str: - """规则类名称。""" - return self.__class__.__name__ - - @property - def bot(self) -> "Bot": - """机器人对象。""" - return self.event.adapter.bot - - @property - def config(self) -> Optional[T_Config]: - """规则包配置。""" - config_class: ConfigModel = getattr(self, "Rule", None) - if is_config_class(config_class): - return getattr(self.config.rule, config_class.__config_name__, None) - return None
\ No newline at end of file diff --git a/tests/rules/The Pool/config.py b/tests/plugins/HydroRoll/models/__init__.py index e69de29..e69de29 100644 --- a/tests/rules/The Pool/config.py +++ b/tests/plugins/HydroRoll/models/__init__.py diff --git a/tests/plugins/HydroRoll/models/cos_sim.py b/tests/plugins/HydroRoll/models/cos_sim.py new file mode 100644 index 0000000..24b743d --- /dev/null +++ b/tests/plugins/HydroRoll/models/cos_sim.py @@ -0,0 +1,100 @@ +"""余弦相似度比较""" + + +import joblib +import jieba +import numpy as np + +from sklearn.feature_extraction.text import TfidfTransformer +from sklearn.feature_extraction.text import CountVectorizer +from sklearn.metrics.pairwise import cosine_similarity + + +class cosSim: + def __init__(self, simple: list = [], test_data: list = []): + self.simple = simple + self.inputs = test_data + self.texts = self.simple + self.texts.extend(self.inputs) + + @property + def corpuss(self): + return [" ".join(jieba.cut(text)) for text in self.simple] + + @property + def vocabulary(self): + return self.getVocabulary(self.corpuss) + + @property + def vectors(self): + return self.getVectors(self.corpuss, self.vocabulary) + + @property + def input_corpuss(self): + return [" ".join(jieba.cut(text)) for text in self.inputs] + + @property + def input_vocabulary(self): + return self.getVocabulary(self.input_corpuss) + + @property + def input_vector(self): + return self.getVectors(self.input_corpuss, self.input_vocabulary) + + def append(self, add_test_data: list = []): + self.inputs.extend(add_test_data) + + @property + def similarities(self): + similarities = [] + corpuss = [" ".join(jieba.cut(text)) for text in self.texts] + vocabulary = self.getVocabulary(corpuss) + vector = self.getVectors(corpuss, vocabulary) + for v in vector[len(self.texts)-1:]: + sim = [] + for v1 in vector[:len(self.simple)+1]: + sim.append(self.cos_sim(v1, v)) + print('sim', sim) + similarities.append(max(sim)) + + return similarities + + @staticmethod + def cos_sim(vector_a, vector_b): + """ + 计算两个向量之间的余弦相似度 + :param vector_a: 向量 a + :param vector_b: 向量 b + :return: sim + """ + vector_a = np.array(vector_a).reshape(1, -1) + vector_b = np.array(vector_b).reshape(1, -1) + return cosine_similarity(vector_a, vector_b)[0][0] + + @staticmethod + def getVocabulary(corpuss): + vectorizer = CountVectorizer(max_features=500) + transformer = TfidfTransformer() + tfidf = transformer.fit_transform(vectorizer.fit_transform(corpuss)) + words = vectorizer.get_feature_names_out() + return words + + @staticmethod + def getVectors(corpus, vocabulary): + vectorizer = CountVectorizer(vocabulary=vocabulary) + transformer = TfidfTransformer() + tfidf = transformer.fit_transform(vectorizer.fit_transform(corpus)) + vectors = tfidf.toarray() + return vectors + + def save(self, filename): + joblib.dump(self, filename) + + @staticmethod + def load(filename): + return joblib.load(filename) + + def reload(self): + self.texts = self.simple + self.texts.extend(self.inputs) + self.similarities
\ No newline at end of file diff --git a/tests/plugins/HydroRoll/models/hola.py b/tests/plugins/HydroRoll/models/hola.py new file mode 100644 index 0000000..255f5dd --- /dev/null +++ b/tests/plugins/HydroRoll/models/hola.py @@ -0,0 +1,62 @@ +from cos_sim import cosSim +import numpy as np + +texts = [ + "你好 HydroRoll", + "你好 水系", + "水系你好", + "HydroRoll hi~", + "水系, hola" + "你好呀 水系", + "hi 水系", + "hi HydroRoll", + "hello 水系", + "hello HydroRoll", + "hola 水系", + "hola HydroRoll", +] + +# print(model.corpuss) + +# print(model.vocabulary) + + + +model = cosSim( + simple=texts, + test_data=[ + # 'Hi! HydroRoll is a roll system.', + # 'Hello, this is a system which named HydroRoll', + # '短文本匹配技术应用是很广泛的,包括搜索、问答、推荐、计算广告等领域,相关技术也沉淀多年,从无监督方法到有监督方法层出不穷,工业界也是都有应用,短文本匹配算是自然语言处理领域的重要技术了,虽然任务简单,但是想要做好并不是那么容易的事情。', + # '短文本匹配技术在搜索、问答、推荐和计算广告等领域有广泛的应用。这项技术已经发展多年,从无监督方法到有监督方法层出不穷。在工业界,短文本匹配技术已经得到了广泛的应用。虽然短文本匹配任务看起来简单,但要做好并不容易。', + # '你好~水系。', + # 'hola~~~~~~~hola水系!' + ] +) + +# print(model.vectors) + +# print(model.input_vector) + +# print(model.input_vocabulary) + +# print(cosSim.cos_sim(vector_a=model.input_vector[4], vector_b=model.input_vector[5])) + + +print(model.similarities) + +print(model.inputs) + +# model.append(['你好水']) + +# model.append(['你好']) + +print(model.inputs) + +print(model.similarities) + +model.reload() + +print(model.input_corpuss) + +print(model.similarities)
\ No newline at end of file diff --git a/tests/plugins/HydroRoll/utils.py b/tests/plugins/HydroRoll/utils.py index b4dbab6..f871523 100644 --- a/tests/plugins/HydroRoll/utils.py +++ b/tests/plugins/HydroRoll/utils.py @@ -1,4 +1,7 @@ +import difflib import re +import time +import random from abc import ABC, abstractmethod from typing import Type, Union, Generic, TypeVar from iamai import Plugin @@ -9,7 +12,9 @@ from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig T_Config = TypeVar("T_Config", bound=BasePluginConfig) T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig) -T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig) +T_CommandPluginConfig = TypeVar( + "T_CommandPluginConfig", bound=CommandPluginConfig) + class BasePlugin( Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config], @@ -27,17 +32,21 @@ class BasePlugin( async def rule(self) -> bool: is_bot_off = False - + if self.event.adapter.name != "cqhttp": return False if self.event.type != "message": return False match_str = self.event.message.get_plain_text() if is_bot_off: - if self.event.message.startswith(f'[CQ:at,qq={self.event.self_id}]'): - match_str = re.sub(fr'^\[CQ:at,qq={self.event.self_id}\]', '', match_str) - elif self.event.message.startswith(f'[CQ:at,qq={self.event.self_tiny_id}]'): - match_str = re.sub(fr'^\[CQ:at,qq={self.event.self_tiny_id}\]', '', match_str) + if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"): + match_str = re.sub( + rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str + ) + elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"): + match_str = re.sub( + rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str + ) else: return False if self.config.handle_all_message: @@ -94,4 +103,77 @@ class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC): self.command_match.group("command_args") ) return bool(self.msg_match) - + + +class PseudoRandomGenerator: + """线性同余法随机数生成器""" + + def __init__(self, seed): + self.seed = seed + + def generate(self): + while True: + self.seed = (self.seed * 1103515245 + 12345) % (2**31) + yield self.seed + + +class HydroDice: + """水系掷骰组件 + + 一些 API 相关的工具函数 + + """ + + def __init__(self, seed): + self.generator = PseudoRandomGenerator(seed) + + def roll_dice( + self, + _counts: int | str, + _sides: int | str, + is_reversed: bool = False, + streamline: bool = False, + threshold: int | str = 5, + ) -> str: + """普通掷骰 + Args: + _counts (int | str): 掷骰个数. + _sides (int | str): 每个骰子的面数. + is_reversed (bool, optional): 倒序输出. Defaults to False. + streamline (bool, optional): 忽略过程. Defaults to False. + threshold (int | str, optional): streamline 的阈值. Defaults to 5. + + Returns: + str: 表达式结果. + """ + rolls = [] + for _ in range(int(_counts)): + roll = next(self.generator.generate()) % _sides + 1 + rolls.append(roll) + total = sum(rolls) + + if streamline: + return str(total) + else: + if len(rolls) > int(threshold): + return str(total) + rolls_str = " + ".join(str(r) for r in rolls) + result_str = ( + f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}" + ) + return result_str + + +def find_max_similarity(input_string, string_list): + """寻找最大的相似度""" + max_similarity = 0 + max_string = "" + + for string in string_list: + similarity = difflib.SequenceMatcher( + None, input_string, string).quick_ratio() + if similarity > max_similarity: + max_similarity = similarity + max_string = string + + return max_string, max_similarity diff --git a/tests/plugins/psi.py b/tests/plugins/psi.py new file mode 100644 index 0000000..d768115 --- /dev/null +++ b/tests/plugins/psi.py @@ -0,0 +1,294 @@ +""" SPI - Simple Pascal Interpreter """ + +############################################################################### +# # +# LEXER # +# # +############################################################################### + +# Token types +# +# EOF (end-of-file) token is used to indicate that +# there is no more input left for lexical analysis +from iamai import Plugin +from HydroRoll.utils import HydroDice +INTEGER, PLUS, MINUS, MUL, DIV, LPAREN, RPAREN, EOF = ( + "INTEGER", + "PLUS", + "MINUS", + "MUL", + "DIV", + "(", + ")", + "EOF", +) + +DICE = "DICE" + + +class Token(object): + """A single token from the lexer.""" + + def __init__(self, _type, _value): + self.type = _type + self.value = _value + + def __str__(self): + """String representation of the class instance. + + Examples: + Token(INTEGER, 3) + Token(PLUS, '+') + Token(MUL, '*') + """ + return f"Token({self.type}, {repr(self.value)})" + + def __repr__(self): + return self.__str__() + + +class Lexer(object): + """A lexer for the Psi language.""" + + def __init__(self, text): + # client string input, e.g. "4 + 2 * 3 - 6 / 2" + self.text = text + # self.pos is an index into self.text + self.pos = 0 + self.current_char = self.text[self.pos] + + def error(self): + """Raise an exception at the current position.""" + raise ValueError("Invalid character") + + def advance(self): + """Advance the `pos` pointer and set the `current_char` variable.""" + self.pos += 1 + if self.pos > len(self.text) - 1: + self.current_char = None # Indicates end of input + else: + self.current_char = self.text[self.pos] + + def skip_whitespace(self): + while self.current_char is not None and self.current_char.isspace(): + self.advance() + + def integer(self): + """Return a (multidigit) integer consumed from the input.""" + result = "" + while self.current_char is not None and self.current_char.isdigit(): + result += self.current_char + self.advance() + return int(result) + + def get_next_token(self): + """Lexical analyzer (also known as scanner or tokenizer)""" + while self.current_char is not None: + if self.current_char.isspace(): + self.skip_whitespace() + continue + + token_type = { + "+": PLUS, + "-": MINUS, + "d": DICE, + "*": MUL, + "/": DIV, + "(": LPAREN, + ")": RPAREN, + }.get(self.current_char) + + if token_type: + self.advance() + return Token(token_type, self.current_char) + + if self.current_char.isdigit(): + return Token(INTEGER, self.integer()) + + self.error() + + return Token(EOF, None) + + +############################################################################### +# # +# PARSER # +# # +############################################################################### + + +class AST(object): + pass + + +class BinOp(AST): + def __init__(self, left, op, right): + self.left = left + self.token = self.op = op + self.right = right + + +class Num(AST): + def __init__(self, token): + self.token = token + self.value = token.value + + +class UnaryOp(AST): + def __init__(self, op, expr): + self.token = self.op = op + self.expr = expr + + +class Parser(object): + def __init__(self, lexer): + self.lexer = lexer + # set current token to the first token taken from the input + self.current_token = self.lexer.get_next_token() + + def error(self): + raise Exception("Invalid syntax") + + def eat(self, token_type): + # compare the current token type with the passed token + # type and if they match then "eat" the current token + # and assign the next token to the self.current_token, + # otherwise raise an exception. + if self.current_token.type == token_type: + self.current_token = self.lexer.get_next_token() + else: + self.error() + + def factor(self): + """factor : (PLUS | MINUS | DICE) factor | INTEGER | LPAREN expr RPAREN""" + token = self.current_token + if token.type == PLUS: + self.eat(PLUS) + node = UnaryOp(token, self.factor()) + return node + elif token.type == MINUS: + self.eat(MINUS) + node = UnaryOp(token, self.factor()) + return node + elif token.type == DICE: + self.eat(DICE) + left = Num(Token(INTEGER, 1)) # 默认骰子个数为1 + right = self.factor() + node = BinOp(left, token, right) + return node + elif token.type == INTEGER: + self.eat(INTEGER) + return Num(token) + elif token.type == LPAREN: + self.eat(LPAREN) + node = self.expr() + self.eat(RPAREN) + return node + + def term(self): + """term : factor ((MUL | DIV) factor)*""" + node = self.factor() + + while self.current_token.type in (MUL, DIV): + token = self.current_token + if token.type == MUL: + self.eat(MUL) + elif token.type == DIV: + self.eat(DIV) + + node = BinOp(left=node, op=token, right=self.factor()) + + return node + + def expr(self): + """ + expr : term ((PLUS | MINUS) term)* + term : factor ((MUL | DIV) factor)* + factor : (PLUS | MINUS) factor | INTEGER | LPAREN expr RPAREN + """ + node = self.term() + + while self.current_token.type in (PLUS, MINUS): + token = self.current_token + if token.type == PLUS: + self.eat(PLUS) + elif token.type == MINUS: + self.eat(MINUS) + + node = BinOp(left=node, op=token, right=self.term()) + + return node + + def parse(self): + node = self.expr() + if self.current_token.type != EOF: + self.error() + return node + + +############################################################################### +# # +# INTERPRETER # +# # +############################################################################### + + +class NodeVisitor(object): + def visit(self, node): + method_name = "visit_" + type(node).__name__ + visitor = getattr(self, method_name, self.generic_visit) + return visitor(node) + + def generic_visit(self, node): + raise Exception("No visit_{} method".format(type(node).__name__)) + + +class Interpreter(NodeVisitor): + def __init__(self, parser): + self.parser = parser + + def visit_BinOp(self, node): + if node.op.type == PLUS: + return self.visit(node.left) + self.visit(node.right) + elif node.op.type == MINUS: + return self.visit(node.left) - self.visit(node.right) + elif node.op.type == DICE: + return int( + HydroDice(1).roll_dice( + _counts=self.visit(node.left), + _sides=self.visit(node.right), + streamline=True, + ) + ) + elif node.op.type == MUL: + return self.visit(node.left) * self.visit(node.right) + elif node.op.type == DIV: + return self.visit(node.left) // self.visit(node.right) + + def visit_Num(self, node): + return node.value + + def visit_UnaryOp(self, node): + op = node.op.type + if op == PLUS: + return +self.visit(node.expr) + elif op == MINUS: + return -self.visit(node.expr) + + def interpret(self): + tree = self.parser.parse() + if tree is None: + return "" + return self.visit(tree) + + +class Psi(Plugin): + async def handle(self) -> None: + lexer = Lexer(self.event.message.get_plain_text()[4:]) + parser = Parser(lexer) + interpreter = Interpreter(parser) + result = interpreter.interpret() + await self.event.reply(str(result)) + + async def rule(self) -> bool: + return self.event.type == "message" and self.event.message.startswith(".psi") diff --git a/tests/plugins/r.py b/tests/plugins/r.py new file mode 100644 index 0000000..2f05af9 --- /dev/null +++ b/tests/plugins/r.py @@ -0,0 +1,10 @@ +from iamai import Plugin + + +class Roll(Plugin): + async def handle(self) -> None: + await self.event.reply("""Attack: 25 +Damage: 9""") + + async def rule(self) -> bool: + return self.event.type == "message" and self.event.message.startswith("/r") diff --git a/tests/plugins/_show.py b/tests/plugins/show.py index d009760..b99c18c 100644 --- a/tests/plugins/_show.py +++ b/tests/plugins/show.py @@ -1,10 +1,13 @@ from iamai import Plugin -import json +from numpy.random import Generator +from randomgen import AESCounter +rg = Generator(AESCounter(12345, mode="sequence")) + class Exec(Plugin): - + priority = 1 - + async def handle(self) -> None: try: content = [ @@ -15,7 +18,7 @@ class Exec(Plugin): "uin": f"{self.event.sender.user_id}", "content": [ { - "type": "text", + "type": "text", "data": { "text": f"{eval(self.event.message.get_plain_text()[6:])}" } @@ -27,7 +30,7 @@ class Exec(Plugin): res = await self.event.adapter.send_group_forward_msg(group_id=int(self.event.group_id), messages=content) except Exception as e: await self.event.reply(f"ERROR!{e!r}") - + async def rule(self) -> bool: return ( self.event.type == "message" diff --git a/tests/plugins/tf/__init__.py b/tests/plugins/tf/__init__.py new file mode 100644 index 0000000..ce78d97 --- /dev/null +++ b/tests/plugins/tf/__init__.py @@ -0,0 +1,52 @@ + +from iamai import Plugin +from iamai.log import logger as _logger +from HydroRoll.models.cos_sim import cosSim +import jieba + +logger = _logger + + +texts = [ + "你好 HydroRoll", + "你好 水系", + "hi 水系", + "hi HydroRoll", + "hello 水系", + "hello HydroRoll", + "hola 水系", + "hola HydroRoll", +] + +cos_Sim = cosSim(texts) +logger.info(f"{cos_Sim.calcuSim('你好')}") + +cos_Sim.save('cos.h5') + +model = cosSim.load('cos.h5') + +logger.info(f"{model}") + +# class Sim(Plugin): +# async def handle(self) -> None: +# try: + +# txt_list = eval(self.event.message.get_plain_text()[5:]) +# if len(txt_list) == 1: +# await self.event.reply(f"{cos_Sim.CalcuSim(txt_list)}") +# elif len(txt_list) == 2: +# corpuss = [" ".join(jieba.cut(text)) +# for text in eval(self.event.message.get_plain_text()[5:])] +# await self.event.reply(str(corpuss)) +# vocabulary = cos_Sim.getVocabulary(corpuss) +# v = cos_Sim.getVectors(corpuss, vocabulary=vocabulary) +# await self.event.reply(f"weight\n=========\n{v}") +# await self.event.reply(f"相似度\n=========\n{cos_Sim.cos_sim(v[0], v[1])}") +# except Exception as e: +# await self.event.reply(f"{e!r}") + +# async def rule(self) -> bool: +# return self.event.type == "message" and self.event.message.startswith(".cos") + + + diff --git a/tests/plugins/tf/config.py b/tests/plugins/tf/config.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/plugins/tf/config.py diff --git a/tests/rules/The Pool/__init__.py b/tests/rules/The Pool/__init__.py deleted file mode 100644 index 200ac13..0000000 --- a/tests/rules/The Pool/__init__.py +++ /dev/null @@ -1,59 +0,0 @@ -from HydroRollCore import Rule, ConfigModel -# from ...plugins.HydroRoll.core import Rule, RuleLoadType - -class ThePool(Rule): - name: str = "池[The Pool]" - tags = ["The Pool", "池", "合作", "叙事", "d6体系"] - - class Config(ConfigModel): - __config_name__ = "ThePool" - - class DefaultDice: - """默认骰池 - - dice: 骰子数量 - sides: 骰子面数 - """ - - def __init__(self): - self._dice = 15 - self._sides = 6 - - @property - def dice(self): - return self._dice - - @property - def sides(self): - return self._sides - -class Wiki(ThePool): - def __init__(self): - self._intro = """《池》[The Pool]是一款合作叙事向的角色扮演游戏。 - 你可以在游戏里使用任何你喜欢的设定。桌上的一人得成为游戏主持人(或者说 GM)并运作游戏。 - 要玩这款 RPG,你需要许多 d6(有六个面的骰子),里面有一些得是GM 骰,它们的外表应该与其他骰子有所区别。 - 在角色创建开始时,每个玩家在自己的起始骰池中有 15 颗骰子。剩下的骰子会被放到公共骰池中。 - """ - - self._make_char = """创建角色很简单,写一段 50 个单词长的故事 [Story](译者注:约为 80 个汉字)。 - 想象你在写一本书,然后这一段就是对书中主角的介绍。你只有 50 个词,所以最好把重点放到两个地方:这个角色最重要的元素,以及他会如何融入到设定之中。 - 角色的名字不算在词数限制之中。 - """ - - self._char_example = """故事示例: - 这是我为《池》创造的第一个角色。我们所选的设定是一个充斥着黑暗魔法的奇幻世界。 - “受训于隐秘的失落之地教团,达马特是一位元素法师。他爱上了一位年轻的新晋门徒,而当法师试图教授一道她无法控制的法术时,她死了。 - 达马特被逐出教团,如今,他正寻找着复活她的方法。” - """ - - @property - def intro(self): - return self._intro - - @property - def make_char(self): - return self._make_char - - @property - def char_example(self): - return self._char_example
\ No newline at end of file diff --git a/tests/rules/The Pool/池 THE POOL.pdf b/tests/rules/The Pool/池 THE POOL.pdf Binary files differdeleted file mode 100644 index 5057045..0000000 --- a/tests/rules/The Pool/池 THE POOL.pdf +++ /dev/null diff --git a/tests/rules/rule-1.py b/tests/rules/rule-1.py deleted file mode 100644 index aedebdc..0000000 --- a/tests/rules/rule-1.py +++ /dev/null @@ -1,11 +0,0 @@ -from HydroRollCore import Rule - -class Rule_1(Rule): - """规则包示例1 - - """ - - priority = 1 - - async def check(self) -> None: - ...
\ No newline at end of file diff --git a/tests/test.py b/tests/test.py index b3eed87..e69de29 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,3 +0,0 @@ -
-
-
diff --git a/tests/web/server.py b/tests/web/server.py deleted file mode 100644 index 67729fa..0000000 --- a/tests/web/server.py +++ /dev/null @@ -1,13 +0,0 @@ -from flask import Flask, request -import json - -app = Flask(__name__) - -@app.route('/', methods=['POST']) # type: ignore -def handle(): - payload = request.content_type - print(payload) - # return json.loads(payload) - -if __name__ == '__main__': - app.run('127.0.0.1',3000)
\ No newline at end of file |
