aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/example/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'example/plugins')
-rw-r--r--example/plugins/HydroRoll/__init__.py154
-rw-r--r--example/plugins/HydroRoll/config.py148
-rw-r--r--example/plugins/HydroRoll/exceptions.py0
-rw-r--r--example/plugins/HydroRoll/models/Transformer.py9
-rw-r--r--example/plugins/HydroRoll/models/__init__.py0
-rw-r--r--example/plugins/HydroRoll/models/cos_sim.py100
-rw-r--r--example/plugins/HydroRoll/models/hola.py62
-rw-r--r--example/plugins/HydroRoll/typing.py34
-rw-r--r--example/plugins/HydroRoll/utils.py184
-rw-r--r--example/plugins/_bradge-kook-cqhttp.py25
-rw-r--r--example/plugins/psi.py294
-rw-r--r--example/plugins/r.py10
-rw-r--r--example/plugins/show.py39
-rw-r--r--example/plugins/tf/__init__.py52
-rw-r--r--example/plugins/tf/config.py0
15 files changed, 1111 insertions, 0 deletions
diff --git a/example/plugins/HydroRoll/__init__.py b/example/plugins/HydroRoll/__init__.py
new file mode 100644
index 0000000..7c05d4e
--- /dev/null
+++ b/example/plugins/HydroRoll/__init__.py
@@ -0,0 +1,154 @@
+"""中间件"""
+import json
+import joblib
+import os
+import shutil
+from ast import literal_eval
+from os.path import dirname, join, abspath
+from iamai import ConfigModel, Plugin
+from iamai.log import logger
+from .config import Directory, GlobalConfig, Models
+from .utils import *
+from .models.Transformer import query
+
+BASE_DIR = dirname(abspath("__file__"))
+HYDRO_DIR = dirname(abspath(__file__))
+APP_DIR = join(BASE_DIR, "HydroRoll")
+
+# logger.info(GlobalConfig._copyright)
+
+
+class HydroRoll(Plugin):
+ """中间件"""
+
+ class Config(ConfigModel):
+ __config_name__ = "HydroRoll"
+
+ priority = 0
+
+ # TODO: HydroRollCore should be able to handle all signals and tokens from Psi.
+ logger.info("Loading HydroRollCore...")
+
+ def __post_init__(self):
+ self.state = {}
+ self.model_path_list = []
+ self.bot.global_state["HydroRoll"] = {}
+ self.model_dict = Models().get_models_dict()
+
+ self.model_path_list.append(join(BASE_DIR, "models"))
+ self.model_path_list.append(join(HYDRO_DIR, "models"))
+ self.model_path_list.append(join(BASE_DIR, "HydroRoll", "models"))
+
+ self.load_models()
+
+ async def handle(self) -> None:
+ """
+ @TODO: HydroRollCore should be able to handle all signals and tokens from Psi.
+ @BODY: HydroRollCore actives the rule-packages.
+ """
+
+ if self.event.message.get_plain_text() == ".core":
+ await self.event.reply("HydroRollCore is running.")
+ if self.event.message.startswith(".set"):
+ arg_list: list = self.event.message.get_plain_text().split()
+ sub_cmd: str = arg_list[1]
+ if sub_cmd == ("censor"):
+ operator: str = arg_list[2]
+ censor_list: list = arg_list[3:]
+ pattern = re.compile(r"[+-](\d?=?)(.*)")
+
+ try:
+ with open(
+ join(APP_DIR, "data", "censor.json"), "r+", encoding="utf8"
+ ) as f:
+ censor_content = json.load(f)
+ except (FileNotFoundError, json.JSONDecodeError):
+ censor_content = {}
+
+ censor_list = list(filter(lambda x: pattern.match(x), censor_list))
+
+ with open(
+ join(APP_DIR, "data", "censor.json"), "w+", encoding="utf8"
+ ) as f:
+ if operator in ["add", "+"]:
+ for item in censor_list:
+ match = pattern.match(item)
+ censor_content[match.group(2)] = int(
+ match.group(1).replace("=", "")
+ )
+ elif operator in ["remove", "rmv", "-"]:
+ for item in censor_list:
+ match = pattern.match(item)
+ keyword = match.group(2)
+ if keyword in censor_content:
+ del censor_content[keyword]
+ elif operator in ["list", "map"]:
+ pass # do something else
+
+ json.dump(censor_content, f, ensure_ascii=False, indent=4)
+ elif self.event.message.startswith(".get"):
+ ...
+
+ elif self.event.message.startswith(".test"):
+ try:
+ result = eval(self.event.message.get_plain_text()[5:]) #literal_eval(self.event.message.get_plain_text()[5:])
+ await self.event.reply(str(result))
+ except Exception as error:
+ await self.event.reply(f"{error!r}")
+
+ async def rule(self) -> bool:
+ """
+ @TODO: Psi should be able to handle all message first.
+ @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value.
+ """
+ logger.info("loading psi...")
+ if (
+ not self.bot.global_state["HydroRoll"].get("hola")
+ and self.event.type == "message"
+ and self.event.message_type == "private"
+ and not os.path.exists(join(BASE_DIR, "HydroRoll"))
+ ):
+ # hola = self.models["hola"]
+ # _, max_similarity = find_max_similarity(
+ # self.event.message.get_plain_text(), hola
+ # )
+ max_similarity = 1
+ if max_similarity > 0.51:
+ self.init_directory()
+ self.bot.global_state["HydroRoll"]["hola"] = True
+ await self.event.reply(f"验证成功√ 正在初始化水系目录...")
+ logger.info(GlobalConfig._copyright)
+ return self.event.adapter.name in ["cqhttp", "kook", "console", "mirai"]
+
+ def _init_directory(self, _prefix: str = ""):
+ """初始化水系目录"""
+ for _ in Directory(BASE_DIR).get_dice_dir_list(_prefix):
+ if not os.path.exists(_):
+ os.makedirs(_)
+
+ def _init_file(self, _prefix: str = ""):
+ """初始化文件"""
+
+ def init_directory(self, _prefix: str = "HydroRoll"):
+ """在指定目录生成水系文件结构"""
+ self._init_directory(_prefix=_prefix)
+
+ def _load_model(self, path: str, model_file: str):
+ if model_file is None:
+ model_file = ""
+ return joblib.load(join(path, f"{model_file}"))
+
+ def _load_models(self, model_path_list: list, model_dict: dict) -> dict:
+ """加载指定模型, 当然也可能是数据集"""
+ models = {}
+ for path in model_path_list:
+ for model_name, model_file in model_dict.items():
+ if os.path.exists(join(path, model_file)):
+ models[model_name] = self._load_model(path, model_file)
+ logger.success(f'Succeeded to load model "{model_name}"')
+ return models
+
+ def load_models(self):
+ self.models = self._load_models(self.model_path_list, self.model_dict)
+
+
diff --git a/example/plugins/HydroRoll/config.py b/example/plugins/HydroRoll/config.py
new file mode 100644
index 0000000..194a956
--- /dev/null
+++ b/example/plugins/HydroRoll/config.py
@@ -0,0 +1,148 @@
+from randomgen import AESCounter
+from numpy.random import Generator
+import argparse
+import sys
+from os.path import dirname, dirname, join, abspath
+import platform
+from importlib.metadata import version
+import os
+from typing import Set, Optional
+from iamai import ConfigModel
+import datetime
+# 创建全局 ArgumentParser 对象
+global_parser = argparse.ArgumentParser(description="HydroRoll[水系] 全局命令参数")
+
+
+class Color:
+ # 定义ANSI转义序列
+ RESET = '\033[0m'
+ BLUE_BASE = '\033[36m'
+ BLUE_DARK = '\033[34m'
+ BLUE_DARKER = '\033[32m'
+ BLUE_DARKEST = '\033[30m'
+
+
+class BasePluginConfig(ConfigModel):
+ __config_name__ = ""
+ handle_all_message: bool = True
+ """是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。"""
+ handle_friend_message: bool = True
+ """是否处理好友消息。"""
+ handle_group_message: bool = True
+ """是否处理群消息。"""
+ accept_group: Optional[Set[int]] = None
+ """处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。"""
+ message_str: str = "*{user_name} {message}"
+ """最终发送消息的格式。"""
+
+
+class RegexPluginConfig(BasePluginConfig):
+ pass
+
+
+class CommandPluginConfig(RegexPluginConfig):
+ command_prefix: Set[str] = {":", "你妈", "👅", "约瑟夫妥斯妥耶夫斯基戴安那只鸡🐔"}
+ """命令前缀。"""
+ command: Set[str] = {}
+ """命令文本。"""
+ ignore_case: bool = True
+ """忽略大小写。"""
+
+
+# 定义全局配置类
+class GlobalConfig(CommandPluginConfig):
+ _name = "HydroRoll[水系]"
+ _version = "0.1.0"
+ _svn = "2"
+ _author = "简律纯"
+ _iamai_version = version("iamai")
+ _python_ver = sys.version
+ _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3]))
+ _base_dir = dirname(abspath("__file__"))
+ _hydro_dir = dirname(abspath(__file__))
+ _copyright = f"""\033[36m
+ _ __ _ _
+ /\ /\_ _ __| |_ __ ___ /__\ ___ | | |
+ / /_/ / | | |/ _` | '__/ _ \ / \/// _ \| | |
+/ __ /| |_| | (_| | | | (_) / _ \ (_) | | |
+\/ /_/ \__, |\__,_|_| \___/\/ \_/\___/|_|_|
+ |___/
+
+\033[4m{_name} [版本 {_version}]\033[0m\033[36m
+(c) HydroRoll-Team contributors, {_author}。
+Github: https://github.com/HydroRoll-Team
+Under the MIT License, see LICENSE for more details.
+"""
+ _rg = Generator(AESCounter())
+ SEED = _rg.random() # 随机数种子
+
+ # 定义系统组件
+
+ class HydroSystem:
+ def __init__(self):
+ self.parser = argparse.ArgumentParser(
+ description="HydroRoll[水系].system 系统命令参数"
+ )
+ self.subparsers = self.parser.add_subparsers()
+ self.status_parser = self.subparsers.add_parser(
+ "status", aliases=["stat", "state"], help="系统状态"
+ )
+ self.reload_parser = self.subparsers.add_parser(
+ "reload", aliases=["rld"], help="重新加载系统"
+ )
+ self.restart_parser = self.subparsers.add_parser(
+ "restart", aliases=["rst"], help="重启系统"
+ )
+ self.collect_parser = self.subparsers.add_parser(
+ "collect", aliases=["gc"], help="释放 python 内存"
+ )
+ self.help = "\n".join(
+ self.parser.format_help()
+ .replace("\r\n", "\n")
+ .replace("\r", "")
+ .split("\n")[2:-3]
+ )
+
+ class HydroBot:
+ def __init__(self) -> None:
+ self.parser = argparse.ArgumentParser(description="Bot命令")
+
+
+class Directory(object):
+ def __init__(self, _path: str) -> None:
+ self.current_path = _path
+
+ def get_dice_dir_list(self, _prefix: str) -> list:
+
+ return [
+ os.path.join(self.current_path, f'{_prefix}', *dirs)
+ for dirs in [
+ ['config'],
+ ['data'],
+ ['rules'],
+ ['scripts'],
+ ['web', 'frontend'],
+ ['web', 'backend'],
+ ]
+ ]
+
+
+class FileManager(object):
+ def __init__(self, _path: str) -> None:
+ self.current_path = _path
+
+ def get_file_list(self, _dir: str):
+ return {
+ 'web;frontend': 'index.html',
+ 'data': 'censor.json',
+ }
+
+
+class Models:
+ """模型管理类"""
+
+ def __init__(self) -> None:
+ self.builtin_models = {'hola': 'hola.pkl'}
+
+ def get_models_dict(self) -> dict:
+ return self.builtin_models
diff --git a/example/plugins/HydroRoll/exceptions.py b/example/plugins/HydroRoll/exceptions.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/example/plugins/HydroRoll/exceptions.py
diff --git a/example/plugins/HydroRoll/models/Transformer.py b/example/plugins/HydroRoll/models/Transformer.py
new file mode 100644
index 0000000..b52422e
--- /dev/null
+++ b/example/plugins/HydroRoll/models/Transformer.py
@@ -0,0 +1,9 @@
+import requests
+
+API_URL = "https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2"
+headers = {"Authorization": "Bearer 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"}
+
+def query(payload):
+ response = requests.post(API_URL, headers=headers, json=payload)
+ return response.json()
+
diff --git a/example/plugins/HydroRoll/models/__init__.py b/example/plugins/HydroRoll/models/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/example/plugins/HydroRoll/models/__init__.py
diff --git a/example/plugins/HydroRoll/models/cos_sim.py b/example/plugins/HydroRoll/models/cos_sim.py
new file mode 100644
index 0000000..24b743d
--- /dev/null
+++ b/example/plugins/HydroRoll/models/cos_sim.py
@@ -0,0 +1,100 @@
+"""余弦相似度比较"""
+
+
+import joblib
+import jieba
+import numpy as np
+
+from sklearn.feature_extraction.text import TfidfTransformer
+from sklearn.feature_extraction.text import CountVectorizer
+from sklearn.metrics.pairwise import cosine_similarity
+
+
+class cosSim:
+ def __init__(self, simple: list = [], test_data: list = []):
+ self.simple = simple
+ self.inputs = test_data
+ self.texts = self.simple
+ self.texts.extend(self.inputs)
+
+ @property
+ def corpuss(self):
+ return [" ".join(jieba.cut(text)) for text in self.simple]
+
+ @property
+ def vocabulary(self):
+ return self.getVocabulary(self.corpuss)
+
+ @property
+ def vectors(self):
+ return self.getVectors(self.corpuss, self.vocabulary)
+
+ @property
+ def input_corpuss(self):
+ return [" ".join(jieba.cut(text)) for text in self.inputs]
+
+ @property
+ def input_vocabulary(self):
+ return self.getVocabulary(self.input_corpuss)
+
+ @property
+ def input_vector(self):
+ return self.getVectors(self.input_corpuss, self.input_vocabulary)
+
+ def append(self, add_test_data: list = []):
+ self.inputs.extend(add_test_data)
+
+ @property
+ def similarities(self):
+ similarities = []
+ corpuss = [" ".join(jieba.cut(text)) for text in self.texts]
+ vocabulary = self.getVocabulary(corpuss)
+ vector = self.getVectors(corpuss, vocabulary)
+ for v in vector[len(self.texts)-1:]:
+ sim = []
+ for v1 in vector[:len(self.simple)+1]:
+ sim.append(self.cos_sim(v1, v))
+ print('sim', sim)
+ similarities.append(max(sim))
+
+ return similarities
+
+ @staticmethod
+ def cos_sim(vector_a, vector_b):
+ """
+ 计算两个向量之间的余弦相似度
+ :param vector_a: 向量 a
+ :param vector_b: 向量 b
+ :return: sim
+ """
+ vector_a = np.array(vector_a).reshape(1, -1)
+ vector_b = np.array(vector_b).reshape(1, -1)
+ return cosine_similarity(vector_a, vector_b)[0][0]
+
+ @staticmethod
+ def getVocabulary(corpuss):
+ vectorizer = CountVectorizer(max_features=500)
+ transformer = TfidfTransformer()
+ tfidf = transformer.fit_transform(vectorizer.fit_transform(corpuss))
+ words = vectorizer.get_feature_names_out()
+ return words
+
+ @staticmethod
+ def getVectors(corpus, vocabulary):
+ vectorizer = CountVectorizer(vocabulary=vocabulary)
+ transformer = TfidfTransformer()
+ tfidf = transformer.fit_transform(vectorizer.fit_transform(corpus))
+ vectors = tfidf.toarray()
+ return vectors
+
+ def save(self, filename):
+ joblib.dump(self, filename)
+
+ @staticmethod
+ def load(filename):
+ return joblib.load(filename)
+
+ def reload(self):
+ self.texts = self.simple
+ self.texts.extend(self.inputs)
+ self.similarities \ No newline at end of file
diff --git a/example/plugins/HydroRoll/models/hola.py b/example/plugins/HydroRoll/models/hola.py
new file mode 100644
index 0000000..255f5dd
--- /dev/null
+++ b/example/plugins/HydroRoll/models/hola.py
@@ -0,0 +1,62 @@
+from cos_sim import cosSim
+import numpy as np
+
+texts = [
+ "你好 HydroRoll",
+ "你好 水系",
+ "水系你好",
+ "HydroRoll hi~",
+ "水系, hola"
+ "你好呀 水系",
+ "hi 水系",
+ "hi HydroRoll",
+ "hello 水系",
+ "hello HydroRoll",
+ "hola 水系",
+ "hola HydroRoll",
+]
+
+# print(model.corpuss)
+
+# print(model.vocabulary)
+
+
+
+model = cosSim(
+ simple=texts,
+ test_data=[
+ # 'Hi! HydroRoll is a roll system.',
+ # 'Hello, this is a system which named HydroRoll',
+ # '短文本匹配技术应用是很广泛的,包括搜索、问答、推荐、计算广告等领域,相关技术也沉淀多年,从无监督方法到有监督方法层出不穷,工业界也是都有应用,短文本匹配算是自然语言处理领域的重要技术了,虽然任务简单,但是想要做好并不是那么容易的事情。',
+ # '短文本匹配技术在搜索、问答、推荐和计算广告等领域有广泛的应用。这项技术已经发展多年,从无监督方法到有监督方法层出不穷。在工业界,短文本匹配技术已经得到了广泛的应用。虽然短文本匹配任务看起来简单,但要做好并不容易。',
+ # '你好~水系。',
+ # 'hola~~~~~~~hola水系!'
+ ]
+)
+
+# print(model.vectors)
+
+# print(model.input_vector)
+
+# print(model.input_vocabulary)
+
+# print(cosSim.cos_sim(vector_a=model.input_vector[4], vector_b=model.input_vector[5]))
+
+
+print(model.similarities)
+
+print(model.inputs)
+
+# model.append(['你好水'])
+
+# model.append(['你好'])
+
+print(model.inputs)
+
+print(model.similarities)
+
+model.reload()
+
+print(model.input_corpuss)
+
+print(model.similarities) \ No newline at end of file
diff --git a/example/plugins/HydroRoll/typing.py b/example/plugins/HydroRoll/typing.py
new file mode 100644
index 0000000..876fa92
--- /dev/null
+++ b/example/plugins/HydroRoll/typing.py
@@ -0,0 +1,34 @@
+"""HydroRoll 类型提示支持。
+
+此模块定义了部分 HydroRoll 使用的类型。
+"""
+
+from typing import TYPE_CHECKING, TypeVar, Callable, NoReturn, Awaitable
+
+from iamai.message import T_MS, T_Message, T_MessageSegment
+
+if TYPE_CHECKING:
+ from iamai.bot import Bot # noqa
+ from iamai.event import Event # noqa
+ from iamai.plugin import Plugin # noqa
+ from iamai.config import ConfigModel # noqa
+
+__all__ = [
+ "T_State",
+ "T_Event",
+ "T_Plugin",
+ "T_Config",
+ "T_Message",
+ "T_MessageSegment",
+ "T_MS",
+ "T_BotHook",
+ "T_EventHook",
+]
+
+T_State = TypeVar("T_State")
+T_Event = TypeVar("T_Event", bound="Event")
+T_Plugin = TypeVar("T_Plugin", bound="Plugin")
+T_Config = TypeVar("T_Config", bound="ConfigModel")
+
+T_BotHook = Callable[["Bot"], Awaitable[NoReturn]]
+T_EventHook = Callable[[T_Event], Awaitable[NoReturn]] \ No newline at end of file
diff --git a/example/plugins/HydroRoll/utils.py b/example/plugins/HydroRoll/utils.py
new file mode 100644
index 0000000..1f5e8bc
--- /dev/null
+++ b/example/plugins/HydroRoll/utils.py
@@ -0,0 +1,184 @@
+import difflib
+import re
+import time
+import random
+from abc import ABC, abstractmethod
+from typing import Type, Union, Generic, TypeVar
+from iamai import Plugin
+from iamai.typing import T_State
+from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent
+
+from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig
+
+T_Config = TypeVar("T_Config", bound=BasePluginConfig)
+T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig)
+T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig)
+
+
+class BasePlugin(
+ Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config],
+ ABC,
+ Generic[T_State, T_Config],
+):
+ Config: Type[T_Config] = BasePluginConfig
+
+ def format_str(self, format_str: str, message_str: str = "") -> str:
+ return format_str.format(
+ message=message_str,
+ user_name=self.event.sender.nickname,
+ user_id=self.event.sender.user_id,
+ )
+
+ async def rule(self) -> bool:
+ is_bot_off = False
+
+ if self.event.adapter.name != "cqhttp":
+ return False
+ if self.event.type != "message":
+ return False
+ match_str = self.event.message.get_plain_text()
+ if is_bot_off:
+ if self.event.message.startswith(f"[CQ:at,qq={self.event.self_id}]"):
+ match_str = re.sub(
+ rf"^\[CQ:at,qq={self.event.self_id}\]", "", match_str
+ )
+ elif self.event.message.startswith(f"[CQ:at,qq={self.event.self_tiny_id}]"):
+ match_str = re.sub(
+ rf"^\[CQ:at,qq={self.event.self_tiny_id}\]", "", match_str
+ )
+ else:
+ return False
+ if self.config.handle_all_message:
+ return self.str_match(match_str)
+ elif self.config.handle_friend_message:
+ if self.event.message_type == "private":
+ return self.str_match(match_str)
+ elif self.config.handle_group_message:
+ if self.event.message_type == "group":
+ if (
+ self.config.accept_group is None
+ or self.event.group_id in self.config.accept_group
+ ):
+ return self.str_match(match_str)
+ elif self.config.handle_group_message:
+ if self.event.message_type == "guild":
+ return self.str_match(match_str)
+ return False
+
+ @abstractmethod
+ def str_match(self, msg_str: str) -> bool:
+ raise NotImplemented
+
+
+class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC):
+ msg_match: re.Match
+ re_pattern: re.Pattern
+ Config: Type[T_RegexPluginConfig] = RegexPluginConfig
+
+ def str_match(self, msg_str: str) -> bool:
+ msg_str = msg_str.strip()
+ self.msg_match = self.re_pattern.fullmatch(msg_str)
+ return bool(self.msg_match)
+
+
+class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC):
+ command_match: re.Match
+ command_re_pattern: re.Pattern
+ Config: Type[T_CommandPluginConfig] = CommandPluginConfig
+
+ def str_match(self, msg_str: str) -> bool:
+ if not hasattr(self, "command_re_pattern"):
+ self.command_re_pattern = re.compile(
+ f'({"|".join(self.config.command_prefix)})'
+ f'({"|".join(self.config.command)})'
+ r"\s*(?P<command_args>.*)",
+ flags=re.I if self.config.ignore_case else 0,
+ )
+ msg_str = msg_str.strip()
+ self.command_match = self.command_re_pattern.fullmatch(msg_str)
+ if not self.command_match:
+ return False
+ self.msg_match = self.re_pattern.fullmatch(
+ self.command_match.group("command_args")
+ )
+ return bool(self.msg_match)
+
+
+class PseudoRandomGenerator:
+ """线性同余法随机数生成器"""
+
+ def __init__(self, seed):
+ self.seed = seed
+
+ def generate(self):
+ while True:
+ self.seed = (self.seed * 1103515245 + 12345) % (2**31)
+ yield self.seed
+
+
+class HydroDice:
+ """水系掷骰组件
+
+ 一些 API 相关的工具函数
+
+ """
+
+ def __init__(self, seed):
+ self.generator = PseudoRandomGenerator(seed)
+
+ def roll_dice(
+ self,
+ _counts: int | str,
+ _sides: int | str,
+ is_reversed: bool = False,
+ streamline: bool = False,
+ threshold: int | str = 5,
+ ) -> str:
+ """普通掷骰
+ Args:
+ _counts (int | str): 掷骰个数.
+ _sides (int | str): 每个骰子的面数.
+ is_reversed (bool, optional): 倒序输出. Defaults to False.
+ streamline (bool, optional): 忽略过程. Defaults to False.
+ threshold (int | str, optional): streamline 的阈值. Defaults to 5.
+
+ Returns:
+ str: 表达式结果.
+ """
+ rolls = []
+ for _ in range(int(_counts)):
+ roll = next(self.generator.generate()) % _sides + 1
+ rolls.append(roll)
+ total = sum(rolls)
+
+ if streamline:
+ return str(total)
+ else:
+ if len(rolls) > int(threshold):
+ return str(total)
+ rolls_str = " + ".join(str(r) for r in rolls)
+ result_str = (
+ f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}"
+ )
+ return result_str
+
+
+def find_max_similarity(input_string, string_list):
+ """寻找最大的相似度"""
+ max_similarity = 0
+ max_string = ""
+
+ for string in string_list:
+ similarity = difflib.SequenceMatcher(None, input_string, string).quick_ratio()
+ if similarity > max_similarity:
+ max_similarity = similarity
+ max_string = string
+
+ return max_string, max_similarity
+
+
+def check_file(filename: str) -> bool:
+ """根据给定参数校验文件夹内文件完整性"""
+
+
+ return False \ No newline at end of file
diff --git a/example/plugins/_bradge-kook-cqhttp.py b/example/plugins/_bradge-kook-cqhttp.py
new file mode 100644
index 0000000..4b2712d
--- /dev/null
+++ b/example/plugins/_bradge-kook-cqhttp.py
@@ -0,0 +1,25 @@
+from iamai import Plugin
+
+
+class Bradge(Plugin):
+ async def handle(self) -> None:
+ if self.event.adapter.name == "kook":
+ await self.bot.get_adapter("cqhttp").call_api(
+ "send_group_msg",
+ group_id=971050440,
+ message=f"[{self.event.adapter.name} - {self.event.extra.author.username}]\n{self.event.message}"
+ )
+ elif self.event.adapter.name == "cqhttp":
+ if self.event.group_id == 971050440:
+ await self.bot.get_adapter("kook").call_api(
+ api="message/create",
+ target_id=1661426334688259,
+ content=f"[{self.event.adapter.name} - {self.event.sender.nickname}]\n{self.event.message}"
+ )
+
+ async def rule(self) -> bool:
+ if self.event.adapter.name not in ["cqhttp","kook"]:
+ return False
+ if self.event.type not in ["message","9",9]:
+ return False
+ return True \ No newline at end of file
diff --git a/example/plugins/psi.py b/example/plugins/psi.py
new file mode 100644
index 0000000..d768115
--- /dev/null
+++ b/example/plugins/psi.py
@@ -0,0 +1,294 @@
+""" SPI - Simple Pascal Interpreter """
+
+###############################################################################
+# #
+# LEXER #
+# #
+###############################################################################
+
+# Token types
+#
+# EOF (end-of-file) token is used to indicate that
+# there is no more input left for lexical analysis
+from iamai import Plugin
+from HydroRoll.utils import HydroDice
+INTEGER, PLUS, MINUS, MUL, DIV, LPAREN, RPAREN, EOF = (
+ "INTEGER",
+ "PLUS",
+ "MINUS",
+ "MUL",
+ "DIV",
+ "(",
+ ")",
+ "EOF",
+)
+
+DICE = "DICE"
+
+
+class Token(object):
+ """A single token from the lexer."""
+
+ def __init__(self, _type, _value):
+ self.type = _type
+ self.value = _value
+
+ def __str__(self):
+ """String representation of the class instance.
+
+ Examples:
+ Token(INTEGER, 3)
+ Token(PLUS, '+')
+ Token(MUL, '*')
+ """
+ return f"Token({self.type}, {repr(self.value)})"
+
+ def __repr__(self):
+ return self.__str__()
+
+
+class Lexer(object):
+ """A lexer for the Psi language."""
+
+ def __init__(self, text):
+ # client string input, e.g. "4 + 2 * 3 - 6 / 2"
+ self.text = text
+ # self.pos is an index into self.text
+ self.pos = 0
+ self.current_char = self.text[self.pos]
+
+ def error(self):
+ """Raise an exception at the current position."""
+ raise ValueError("Invalid character")
+
+ def advance(self):
+ """Advance the `pos` pointer and set the `current_char` variable."""
+ self.pos += 1
+ if self.pos > len(self.text) - 1:
+ self.current_char = None # Indicates end of input
+ else:
+ self.current_char = self.text[self.pos]
+
+ def skip_whitespace(self):
+ while self.current_char is not None and self.current_char.isspace():
+ self.advance()
+
+ def integer(self):
+ """Return a (multidigit) integer consumed from the input."""
+ result = ""
+ while self.current_char is not None and self.current_char.isdigit():
+ result += self.current_char
+ self.advance()
+ return int(result)
+
+ def get_next_token(self):
+ """Lexical analyzer (also known as scanner or tokenizer)"""
+ while self.current_char is not None:
+ if self.current_char.isspace():
+ self.skip_whitespace()
+ continue
+
+ token_type = {
+ "+": PLUS,
+ "-": MINUS,
+ "d": DICE,
+ "*": MUL,
+ "/": DIV,
+ "(": LPAREN,
+ ")": RPAREN,
+ }.get(self.current_char)
+
+ if token_type:
+ self.advance()
+ return Token(token_type, self.current_char)
+
+ if self.current_char.isdigit():
+ return Token(INTEGER, self.integer())
+
+ self.error()
+
+ return Token(EOF, None)
+
+
+###############################################################################
+# #
+# PARSER #
+# #
+###############################################################################
+
+
+class AST(object):
+ pass
+
+
+class BinOp(AST):
+ def __init__(self, left, op, right):
+ self.left = left
+ self.token = self.op = op
+ self.right = right
+
+
+class Num(AST):
+ def __init__(self, token):
+ self.token = token
+ self.value = token.value
+
+
+class UnaryOp(AST):
+ def __init__(self, op, expr):
+ self.token = self.op = op
+ self.expr = expr
+
+
+class Parser(object):
+ def __init__(self, lexer):
+ self.lexer = lexer
+ # set current token to the first token taken from the input
+ self.current_token = self.lexer.get_next_token()
+
+ def error(self):
+ raise Exception("Invalid syntax")
+
+ def eat(self, token_type):
+ # compare the current token type with the passed token
+ # type and if they match then "eat" the current token
+ # and assign the next token to the self.current_token,
+ # otherwise raise an exception.
+ if self.current_token.type == token_type:
+ self.current_token = self.lexer.get_next_token()
+ else:
+ self.error()
+
+ def factor(self):
+ """factor : (PLUS | MINUS | DICE) factor | INTEGER | LPAREN expr RPAREN"""
+ token = self.current_token
+ if token.type == PLUS:
+ self.eat(PLUS)
+ node = UnaryOp(token, self.factor())
+ return node
+ elif token.type == MINUS:
+ self.eat(MINUS)
+ node = UnaryOp(token, self.factor())
+ return node
+ elif token.type == DICE:
+ self.eat(DICE)
+ left = Num(Token(INTEGER, 1)) # 默认骰子个数为1
+ right = self.factor()
+ node = BinOp(left, token, right)
+ return node
+ elif token.type == INTEGER:
+ self.eat(INTEGER)
+ return Num(token)
+ elif token.type == LPAREN:
+ self.eat(LPAREN)
+ node = self.expr()
+ self.eat(RPAREN)
+ return node
+
+ def term(self):
+ """term : factor ((MUL | DIV) factor)*"""
+ node = self.factor()
+
+ while self.current_token.type in (MUL, DIV):
+ token = self.current_token
+ if token.type == MUL:
+ self.eat(MUL)
+ elif token.type == DIV:
+ self.eat(DIV)
+
+ node = BinOp(left=node, op=token, right=self.factor())
+
+ return node
+
+ def expr(self):
+ """
+ expr : term ((PLUS | MINUS) term)*
+ term : factor ((MUL | DIV) factor)*
+ factor : (PLUS | MINUS) factor | INTEGER | LPAREN expr RPAREN
+ """
+ node = self.term()
+
+ while self.current_token.type in (PLUS, MINUS):
+ token = self.current_token
+ if token.type == PLUS:
+ self.eat(PLUS)
+ elif token.type == MINUS:
+ self.eat(MINUS)
+
+ node = BinOp(left=node, op=token, right=self.term())
+
+ return node
+
+ def parse(self):
+ node = self.expr()
+ if self.current_token.type != EOF:
+ self.error()
+ return node
+
+
+###############################################################################
+# #
+# INTERPRETER #
+# #
+###############################################################################
+
+
+class NodeVisitor(object):
+ def visit(self, node):
+ method_name = "visit_" + type(node).__name__
+ visitor = getattr(self, method_name, self.generic_visit)
+ return visitor(node)
+
+ def generic_visit(self, node):
+ raise Exception("No visit_{} method".format(type(node).__name__))
+
+
+class Interpreter(NodeVisitor):
+ def __init__(self, parser):
+ self.parser = parser
+
+ def visit_BinOp(self, node):
+ if node.op.type == PLUS:
+ return self.visit(node.left) + self.visit(node.right)
+ elif node.op.type == MINUS:
+ return self.visit(node.left) - self.visit(node.right)
+ elif node.op.type == DICE:
+ return int(
+ HydroDice(1).roll_dice(
+ _counts=self.visit(node.left),
+ _sides=self.visit(node.right),
+ streamline=True,
+ )
+ )
+ elif node.op.type == MUL:
+ return self.visit(node.left) * self.visit(node.right)
+ elif node.op.type == DIV:
+ return self.visit(node.left) // self.visit(node.right)
+
+ def visit_Num(self, node):
+ return node.value
+
+ def visit_UnaryOp(self, node):
+ op = node.op.type
+ if op == PLUS:
+ return +self.visit(node.expr)
+ elif op == MINUS:
+ return -self.visit(node.expr)
+
+ def interpret(self):
+ tree = self.parser.parse()
+ if tree is None:
+ return ""
+ return self.visit(tree)
+
+
+class Psi(Plugin):
+ async def handle(self) -> None:
+ lexer = Lexer(self.event.message.get_plain_text()[4:])
+ parser = Parser(lexer)
+ interpreter = Interpreter(parser)
+ result = interpreter.interpret()
+ await self.event.reply(str(result))
+
+ async def rule(self) -> bool:
+ return self.event.type == "message" and self.event.message.startswith(".psi")
diff --git a/example/plugins/r.py b/example/plugins/r.py
new file mode 100644
index 0000000..2f05af9
--- /dev/null
+++ b/example/plugins/r.py
@@ -0,0 +1,10 @@
+from iamai import Plugin
+
+
+class Roll(Plugin):
+ async def handle(self) -> None:
+ await self.event.reply("""Attack: 25
+Damage: 9""")
+
+ async def rule(self) -> bool:
+ return self.event.type == "message" and self.event.message.startswith("/r")
diff --git a/example/plugins/show.py b/example/plugins/show.py
new file mode 100644
index 0000000..b99c18c
--- /dev/null
+++ b/example/plugins/show.py
@@ -0,0 +1,39 @@
+from iamai import Plugin
+from numpy.random import Generator
+from randomgen import AESCounter
+rg = Generator(AESCounter(12345, mode="sequence"))
+
+
+class Exec(Plugin):
+
+ priority = 1
+
+ async def handle(self) -> None:
+ try:
+ content = [
+ {
+ "type": "node",
+ "data": {
+ "name": f"{self.event.sender.nickname}",
+ "uin": f"{self.event.sender.user_id}",
+ "content": [
+ {
+ "type": "text",
+ "data": {
+ "text": f"{eval(self.event.message.get_plain_text()[6:])}"
+ }
+ }
+ ]
+ }
+ }
+ ]
+ res = await self.event.adapter.send_group_forward_msg(group_id=int(self.event.group_id), messages=content)
+ except Exception as e:
+ await self.event.reply(f"ERROR!{e!r}")
+
+ async def rule(self) -> bool:
+ return (
+ self.event.type == "message"
+ and
+ self.event.message.get_plain_text().startswith(".show")
+ ) \ No newline at end of file
diff --git a/example/plugins/tf/__init__.py b/example/plugins/tf/__init__.py
new file mode 100644
index 0000000..ce78d97
--- /dev/null
+++ b/example/plugins/tf/__init__.py
@@ -0,0 +1,52 @@
+
+from iamai import Plugin
+from iamai.log import logger as _logger
+from HydroRoll.models.cos_sim import cosSim
+import jieba
+
+logger = _logger
+
+
+texts = [
+ "你好 HydroRoll",
+ "你好 水系",
+ "hi 水系",
+ "hi HydroRoll",
+ "hello 水系",
+ "hello HydroRoll",
+ "hola 水系",
+ "hola HydroRoll",
+]
+
+cos_Sim = cosSim(texts)
+logger.info(f"{cos_Sim.calcuSim('你好')}")
+
+cos_Sim.save('cos.h5')
+
+model = cosSim.load('cos.h5')
+
+logger.info(f"{model}")
+
+# class Sim(Plugin):
+# async def handle(self) -> None:
+# try:
+
+# txt_list = eval(self.event.message.get_plain_text()[5:])
+# if len(txt_list) == 1:
+# await self.event.reply(f"{cos_Sim.CalcuSim(txt_list)}")
+# elif len(txt_list) == 2:
+# corpuss = [" ".join(jieba.cut(text))
+# for text in eval(self.event.message.get_plain_text()[5:])]
+# await self.event.reply(str(corpuss))
+# vocabulary = cos_Sim.getVocabulary(corpuss)
+# v = cos_Sim.getVectors(corpuss, vocabulary=vocabulary)
+# await self.event.reply(f"weight\n=========\n{v}")
+# await self.event.reply(f"相似度\n=========\n{cos_Sim.cos_sim(v[0], v[1])}")
+# except Exception as e:
+# await self.event.reply(f"{e!r}")
+
+# async def rule(self) -> bool:
+# return self.event.type == "message" and self.event.message.startswith(".cos")
+
+
+
diff --git a/example/plugins/tf/config.py b/example/plugins/tf/config.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/example/plugins/tf/config.py