diff options
Diffstat (limited to 'example')
| -rw-r--r-- | example/HydroRoll/data/censor.json | 1 | ||||
| -rw-r--r-- | example/HydroRoll/data/reply.psi | 9 | ||||
| -rw-r--r-- | example/config.example.toml | 18 | ||||
| -rw-r--r-- | example/main.py | 8 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/__init__.py | 136 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/command.py | 7 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/config.py | 110 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/exceptions.py | 0 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/models/Transformer.py | 9 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/models/__init__.py | 0 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/models/cos_sim.py | 100 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/models/hola.py | 62 | ||||
| -rw-r--r-- | example/plugins/HydroRoll/utils.py | 170 | ||||
| -rw-r--r-- | example/plugins/_bradge-kook-cqhttp.py | 25 | ||||
| -rw-r--r-- | example/plugins/psi.py | 294 | ||||
| -rw-r--r-- | example/plugins/r.py | 23 | ||||
| -rw-r--r-- | example/plugins/show.py | 44 |
17 files changed, 0 insertions, 1016 deletions
diff --git a/example/HydroRoll/data/censor.json b/example/HydroRoll/data/censor.json deleted file mode 100644 index 9e26dfe..0000000 --- a/example/HydroRoll/data/censor.json +++ /dev/null @@ -1 +0,0 @@ -{}
\ No newline at end of file diff --git a/example/HydroRoll/data/reply.psi b/example/HydroRoll/data/reply.psi deleted file mode 100644 index fdb5ee6..0000000 --- a/example/HydroRoll/data/reply.psi +++ /dev/null @@ -1,9 +0,0 @@ -""" -{ - ".core": "core", -} -""" - -func main(){ - -}
\ No newline at end of file diff --git a/example/config.example.toml b/example/config.example.toml deleted file mode 100644 index 68ddef6..0000000 --- a/example/config.example.toml +++ /dev/null @@ -1,18 +0,0 @@ -[bot] -plugins = [] -plugin_dirs = ["plugins"] -adapters = ["iamai.adapter.cqhttp","iamai.adapter.apscheduler"] - -[bot.log] -level = "DEBUG" -verbose_exception = true - -[adapter.cqhttp] -adapter_type = "reverse-ws" -host = "127.0.0.1" -port = 15800 -url = "/cqhttp/ws" -show_raw = false - -[adapter.apscheduler] -scheduler_config = { "apscheduler.timezone" = "Asia/Shanghai" }
\ No newline at end of file diff --git a/example/main.py b/example/main.py deleted file mode 100644 index 211b3d3..0000000 --- a/example/main.py +++ /dev/null @@ -1,8 +0,0 @@ -from iamai import Bot - -bot = Bot(hot_reload=True) - -if __name__ == "__main__": - bot.run() - - diff --git a/example/plugins/HydroRoll/__init__.py b/example/plugins/HydroRoll/__init__.py deleted file mode 100644 index 0ff17fa..0000000 --- a/example/plugins/HydroRoll/__init__.py +++ /dev/null @@ -1,136 +0,0 @@ -"""中间件""" -import re -import json -import joblib -import os -import shutil - -from iamai import ConfigModel, Plugin -from iamai.log import logger -from iamai.exceptions import GetEventTimeout -from iamai.event import MessageEvent, Event -from iamai.typing import StateT - -from .config import Directory, GlobalConfig, Models -from .utils import * -from .models.Transformer import query -from .command import Set, Get -from .config import ( - BasePluginConfig, - CommandPluginConfig, - RegexPluginConfig, - GlobalConfig, -) - -from ast import literal_eval -from os.path import dirname, join, abspath -from abc import ABC, abstractmethod -from typing import Any, Generic, TypeVar -from typing_extensions import Annotated - -ConfigT = TypeVar("ConfigT", bound=BasePluginConfig) -RegexPluginConfigT = TypeVar("RegexPluginConfigT", bound=RegexPluginConfig) -CommandPluginConfigT = TypeVar("CommandPluginConfigT", bound=CommandPluginConfig) - - -BASE_DIR = dirname(abspath("__file__")) -HYDRO_DIR = dirname(abspath(__file__)) -APP_DIR = join(BASE_DIR, "HydroRoll") - -# logger.info(GlobalConfig._copyright) - - -class Dice(Plugin[MessageEvent, Annotated[dict, {}], RegexPluginConfig]): - """中间件""" - - priority = 0 - - # TODO: HydroRollCore should be able to handle all signals and tokens from Psi. - logger.info("Loading HydroRollCore...") - - def __post_init__(self): - self.state = {} - self.model_path_list = [] - self.bot.global_state["HydroRoll"] = {} - self.model_dict = Models().get_models_dict() - - self.model_path_list.append(join(BASE_DIR, "models")) - self.model_path_list.append(join(HYDRO_DIR, "models")) - self.model_path_list.append(join(BASE_DIR, "HydroRoll", "models")) - - self.load_models() - - async def handle(self) -> None: - """ - @TODO: HydroRollCore should be able to handle all signals and tokens from Psi. - @BODY: HydroRollCore actives the rule-packages. - """ - global flag - - args = self.event.get_plain_text().split(" ") - command_list = [".root", ".roots", ".core", ".set", ".get", ".test"] - current_cmd = args[0] - flag = True in [cmd.startswith(current_cmd) for cmd in command_list] - logger.info(f"Command {current_cmd} not found with flag {flag}") - if args[0] in [".root", ".roots"]: - try: - import aiohttp - - async with aiohttp.ClientSession() as session: - async with session.get("https://api.hydroroll.team/api/roots") as response: - data = await response.json() - await self.event.reply(data["line"]) - except Exception as e: - await self.event.reply(f"{e!r}") - elif args[0] == ".core": - await self.event.reply(f"{self.state}") - # if args[0].startswith(".set"): - # resolve = Set(args[1:]) # TODO: handle multiple sets - # elif args[0].startswith(".get"): - # resolve = Get(args[1:]) # TODO: handle multiple gets - elif args[0].startswith(".test"): - try: - result = eval(self.event.message.get_plain_text()[5:]) - await self.event.reply(str(result)) - except Exception as error: - await self.event.reply(f"{error!r}") - - async def rule(self) -> bool: - """ - @TODO: Psi should be able to handle all message first. - @BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value. - """ - logger.info("loading psi...") - return isinstance(self.event, MessageEvent) - - def _init_directory(self, _prefix: str = ""): - """初始化水系目录""" - for _ in Directory(BASE_DIR).get_dice_dir_list(_prefix): - if not os.path.exists(_): - os.makedirs(_) - - def _init_file(self, _prefix: str = ""): - """初始化文件""" - - def init_directory(self, _prefix: str = "HydroRoll"): - """在指定目录生成水系文件结构""" - self._init_directory(_prefix=_prefix) - - def _load_model(self, path: str, model_file: str): - if model_file is None: - model_file = "" - return joblib.load(join(path, f"{model_file}")) - - def _load_models(self, model_path_list: list, model_dict: dict) -> dict: - """加载指定模型, 当然也可能是数据集""" - models = {} - for path in model_path_list: - for model_name, model_file in model_dict.items(): - if os.path.exists(join(path, model_file)): - models[model_name] = self._load_model(path, model_file) - logger.success(f'Succeeded to load model "{model_name}"') - return models - - def load_models(self): - """我想睡觉, 但我失眠了。""" - self.models = self._load_models(self.model_path_list, self.model_dict) diff --git a/example/plugins/HydroRoll/command.py b/example/plugins/HydroRoll/command.py deleted file mode 100644 index dcb10b0..0000000 --- a/example/plugins/HydroRoll/command.py +++ /dev/null @@ -1,7 +0,0 @@ -class Get: - ... - - - -class Set: - ...
\ No newline at end of file diff --git a/example/plugins/HydroRoll/config.py b/example/plugins/HydroRoll/config.py deleted file mode 100644 index c380a01..0000000 --- a/example/plugins/HydroRoll/config.py +++ /dev/null @@ -1,110 +0,0 @@ -from randomgen import AESCounter -from numpy.random import Generator -import argparse -import sys -from os.path import dirname, dirname, join, abspath -import platform -from importlib.metadata import version -import os -from typing import Set, Optional -from iamai import ConfigModel -import datetime - -from typing import Set - -from pydantic import Field - - -class BasePluginConfig(ConfigModel): - message_str: str = "{message}" - """最终发送消息的格式。""" - - -class RegexPluginConfig(BasePluginConfig): - pass - - -class CommandPluginConfig(RegexPluginConfig): - __config_name__ = "HydroRoll" - command_prefix: Set[str] = Field(default_factory=lambda: {".", "。"}) - """命令前缀。""" - command: Set[str] = Field(default_factory=set) - """命令文本。""" - ignore_case: bool = True - """忽略大小写。""" - - -class Color: - # 定义ANSI转义序列 - RESET = "\033[0m" - BLUE_BASE = "\033[36m" - BLUE_DARK = "\033[34m" - BLUE_DARKER = "\033[32m" - BLUE_DARKEST = "\033[30m" - - -# 定义全局配置类 -class GlobalConfig: - _name = "HydroRoll[水系]" - _version = "0.1.0" - _svn = "2" - _author = "简律纯" - _iamai_version = version("iamai") - _python_ver = sys.version - _python_ver_raw = ".".join(map(str, platform.python_version_tuple()[:3])) - _base_dir = dirname(abspath("__file__")) - _hydro_dir = dirname(abspath(__file__)) - _copyright = f"""\033[36m - _ __ _ _ - /\ /\_ _ __| |_ __ ___ /__\ ___ | | | - / /_/ / | | |/ _` | '__/ _ \ / \/// _ \| | | -/ __ /| |_| | (_| | | | (_) / _ \ (_) | | | -\/ /_/ \__, |\__,_|_| \___/\/ \_/\___/|_|_| - |___/ - -\033[4m{_name} [版本 {_version}]\033[0m\033[36m -(c) HydroRoll-Team contributors, {_author}。 -Github: https://github.com/HydroRoll-Team -Under the MIT License, see LICENSE for more details. -""" - - -class Directory(object): - def __init__(self, _path: str) -> None: - self.current_path = _path - - def get_dice_dir_list(self, _prefix: str) -> list: - return [ - os.path.join(self.current_path, f"{_prefix}", *dirs) - for dirs in [ - ["config"], - ["data"], - ["rules"], - ["scripts", "lua"], - ["scripts", "js"], - ["scripts", "psi"], - ["web", "frontend"], - ["web", "backend"], - ] - ] - - -class FileManager(object): - def __init__(self, _path: str) -> None: - self.current_path = _path - - def get_file_list(self, _dir: str): - return { - "web;frontend": "index.html", - "data": "censor.json", - } - - -class Models: - """模型管理类""" - - def __init__(self) -> None: - self.builtin_models = {"hola": "hola.pkl"} - - def get_models_dict(self) -> dict: - return self.builtin_models diff --git a/example/plugins/HydroRoll/exceptions.py b/example/plugins/HydroRoll/exceptions.py deleted file mode 100644 index e69de29..0000000 --- a/example/plugins/HydroRoll/exceptions.py +++ /dev/null diff --git a/example/plugins/HydroRoll/models/Transformer.py b/example/plugins/HydroRoll/models/Transformer.py deleted file mode 100644 index b52422e..0000000 --- a/example/plugins/HydroRoll/models/Transformer.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests - -API_URL = "https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2" -headers = {"Authorization": "Bearer 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"} - -def query(payload): - response = requests.post(API_URL, headers=headers, json=payload) - return response.json() - diff --git a/example/plugins/HydroRoll/models/__init__.py b/example/plugins/HydroRoll/models/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/example/plugins/HydroRoll/models/__init__.py +++ /dev/null diff --git a/example/plugins/HydroRoll/models/cos_sim.py b/example/plugins/HydroRoll/models/cos_sim.py deleted file mode 100644 index 24b743d..0000000 --- a/example/plugins/HydroRoll/models/cos_sim.py +++ /dev/null @@ -1,100 +0,0 @@ -"""余弦相似度比较""" - - -import joblib -import jieba -import numpy as np - -from sklearn.feature_extraction.text import TfidfTransformer -from sklearn.feature_extraction.text import CountVectorizer -from sklearn.metrics.pairwise import cosine_similarity - - -class cosSim: - def __init__(self, simple: list = [], test_data: list = []): - self.simple = simple - self.inputs = test_data - self.texts = self.simple - self.texts.extend(self.inputs) - - @property - def corpuss(self): - return [" ".join(jieba.cut(text)) for text in self.simple] - - @property - def vocabulary(self): - return self.getVocabulary(self.corpuss) - - @property - def vectors(self): - return self.getVectors(self.corpuss, self.vocabulary) - - @property - def input_corpuss(self): - return [" ".join(jieba.cut(text)) for text in self.inputs] - - @property - def input_vocabulary(self): - return self.getVocabulary(self.input_corpuss) - - @property - def input_vector(self): - return self.getVectors(self.input_corpuss, self.input_vocabulary) - - def append(self, add_test_data: list = []): - self.inputs.extend(add_test_data) - - @property - def similarities(self): - similarities = [] - corpuss = [" ".join(jieba.cut(text)) for text in self.texts] - vocabulary = self.getVocabulary(corpuss) - vector = self.getVectors(corpuss, vocabulary) - for v in vector[len(self.texts)-1:]: - sim = [] - for v1 in vector[:len(self.simple)+1]: - sim.append(self.cos_sim(v1, v)) - print('sim', sim) - similarities.append(max(sim)) - - return similarities - - @staticmethod - def cos_sim(vector_a, vector_b): - """ - 计算两个向量之间的余弦相似度 - :param vector_a: 向量 a - :param vector_b: 向量 b - :return: sim - """ - vector_a = np.array(vector_a).reshape(1, -1) - vector_b = np.array(vector_b).reshape(1, -1) - return cosine_similarity(vector_a, vector_b)[0][0] - - @staticmethod - def getVocabulary(corpuss): - vectorizer = CountVectorizer(max_features=500) - transformer = TfidfTransformer() - tfidf = transformer.fit_transform(vectorizer.fit_transform(corpuss)) - words = vectorizer.get_feature_names_out() - return words - - @staticmethod - def getVectors(corpus, vocabulary): - vectorizer = CountVectorizer(vocabulary=vocabulary) - transformer = TfidfTransformer() - tfidf = transformer.fit_transform(vectorizer.fit_transform(corpus)) - vectors = tfidf.toarray() - return vectors - - def save(self, filename): - joblib.dump(self, filename) - - @staticmethod - def load(filename): - return joblib.load(filename) - - def reload(self): - self.texts = self.simple - self.texts.extend(self.inputs) - self.similarities
\ No newline at end of file diff --git a/example/plugins/HydroRoll/models/hola.py b/example/plugins/HydroRoll/models/hola.py deleted file mode 100644 index 255f5dd..0000000 --- a/example/plugins/HydroRoll/models/hola.py +++ /dev/null @@ -1,62 +0,0 @@ -from cos_sim import cosSim -import numpy as np - -texts = [ - "你好 HydroRoll", - "你好 水系", - "水系你好", - "HydroRoll hi~", - "水系, hola" - "你好呀 水系", - "hi 水系", - "hi HydroRoll", - "hello 水系", - "hello HydroRoll", - "hola 水系", - "hola HydroRoll", -] - -# print(model.corpuss) - -# print(model.vocabulary) - - - -model = cosSim( - simple=texts, - test_data=[ - # 'Hi! HydroRoll is a roll system.', - # 'Hello, this is a system which named HydroRoll', - # '短文本匹配技术应用是很广泛的,包括搜索、问答、推荐、计算广告等领域,相关技术也沉淀多年,从无监督方法到有监督方法层出不穷,工业界也是都有应用,短文本匹配算是自然语言处理领域的重要技术了,虽然任务简单,但是想要做好并不是那么容易的事情。', - # '短文本匹配技术在搜索、问答、推荐和计算广告等领域有广泛的应用。这项技术已经发展多年,从无监督方法到有监督方法层出不穷。在工业界,短文本匹配技术已经得到了广泛的应用。虽然短文本匹配任务看起来简单,但要做好并不容易。', - # '你好~水系。', - # 'hola~~~~~~~hola水系!' - ] -) - -# print(model.vectors) - -# print(model.input_vector) - -# print(model.input_vocabulary) - -# print(cosSim.cos_sim(vector_a=model.input_vector[4], vector_b=model.input_vector[5])) - - -print(model.similarities) - -print(model.inputs) - -# model.append(['你好水']) - -# model.append(['你好']) - -print(model.inputs) - -print(model.similarities) - -model.reload() - -print(model.input_corpuss) - -print(model.similarities)
\ No newline at end of file diff --git a/example/plugins/HydroRoll/utils.py b/example/plugins/HydroRoll/utils.py deleted file mode 100644 index 2b4c7bf..0000000 --- a/example/plugins/HydroRoll/utils.py +++ /dev/null @@ -1,170 +0,0 @@ -import difflib -import re -import time -import random -from abc import ABC, abstractmethod -from typing import Type, Union, Generic, TypeVar -from iamai import Plugin - -import re -from abc import ABC, abstractmethod -from typing import Any, Generic, TypeVar - -from iamai import MessageEvent, Plugin -from iamai.typing import StateT - -from .config import BasePluginConfig, CommandPluginConfig, RegexPluginConfig - -ConfigT = TypeVar("ConfigT", bound=BasePluginConfig) -RegexPluginConfigT = TypeVar("RegexPluginConfigT", bound=RegexPluginConfig) -CommandPluginConfigT = TypeVar("CommandPluginConfigT", bound=CommandPluginConfig) - - -class BasePlugin( - Plugin[MessageEvent[Any], StateT, ConfigT], - ABC, - Generic[StateT, ConfigT], -): - def format_str(self, format_str: str, message_str: str = "") -> str: - return format_str.format( - message=message_str, - user_name=self.get_event_sender_name(), - user_id=self.get_event_sender_id(), - ) - - def get_event_sender_name(self) -> str: - from iamai.adapter.onebot11.event import MessageEvent as OneBotMessageEvent - - if isinstance(self.event, OneBotMessageEvent): - return self.event.sender.nickname or "" - return "" - - def get_event_sender_id(self) -> str: - from iamai.adapter.onebot11.event import MessageEvent as OneBotMessageEvent - - if isinstance(self.event, OneBotMessageEvent): - if self.event.sender.user_id is not None: - return str(self.event.sender.user_id) - return "" - return "" - - async def rule(self) -> bool: - return isinstance(self.event, MessageEvent) and self.str_match( - self.event.get_plain_text() - ) - - @abstractmethod - def str_match(self, msg_str: str) -> bool: - raise NotImplementedError - - -class RegexPluginBase(BasePlugin[StateT, RegexPluginConfigT], ABC): - msg_match: re.Match[str] - re_pattern: re.Pattern[str] - - def str_match(self, msg_str: str) -> bool: - msg_str = msg_str.strip() - msg_match = self.re_pattern.fullmatch(msg_str) - if msg_match is None: - return False - self.msg_match = msg_match - return bool(self.msg_match) - - -class CommandPluginBase(RegexPluginBase[StateT, CommandPluginConfigT], ABC): - command_match: re.Match[str] - command_re_pattern: re.Pattern[str] - - def str_match(self, msg_str: str) -> bool: - if not hasattr(self, "re_pattern"): - self.re_pattern = re.compile( - f'[{"".join(self.config.command_prefix)}]' - f'({"|".join(self.config.command)})' - r"\s*(?P<command_args>.*)", - flags=re.I if self.config.ignore_case else 0, - ) - msg_str = msg_str.strip() - msg_match = self.re_pattern.fullmatch(msg_str) - if not msg_match: - return False - self.msg_match = msg_match - command_match = self.re_pattern.fullmatch(self.msg_match.group("command_args")) - if not command_match: - return False - self.command_match = command_match - return True - - -class PseudoRandomGenerator: - """线性同余法随机数生成器""" - - def __init__(self, seed): - self.seed = seed - - def generate(self): - while True: - self.seed = (self.seed * 1103515245 + 12345) % (2**31) - yield self.seed - - -class HydroDice: - """水系掷骰组件 - - 一些 API 相关的工具函数 - - """ - - def __init__(self, seed): - self.generator = PseudoRandomGenerator(seed) - - def roll_dice( - self, - _counts: int | str, - _sides: int | str, - is_reversed: bool = False, - streamline: bool = False, - threshold: int | str = 5, - ) -> str: - """普通掷骰 - Args: - _counts (int | str): 掷骰个数. - _sides (int | str): 每个骰子的面数. - is_reversed (bool, optional): 倒序输出. Defaults to False. - streamline (bool, optional): 忽略过程. Defaults to False. - threshold (int | str, optional): streamline 的阈值. Defaults to 5. - - Returns: - str: 表达式结果. - """ - rolls = [] - for _ in range(int(_counts)): - roll = next(self.generator.generate()) % _sides + 1 - rolls.append(roll) - total = sum(rolls) - - if streamline: - return str(total) - if len(rolls) > int(threshold): - return str(total) - rolls_str = " + ".join(str(r) for r in rolls) - return f"{total} = {rolls_str}" if is_reversed else f"{rolls_str} = {total}" - - -def find_max_similarity(input_string, string_list): - """寻找最大的相似度""" - max_similarity = 0 - max_string = "" - - for string in string_list: - similarity = difflib.SequenceMatcher(None, input_string, string).quick_ratio() - if similarity > max_similarity: - max_similarity = similarity - max_string = string - - return max_string, max_similarity - - -def check_file(filename: str) -> bool: - """根据给定参数校验文件夹内文件完整性""" - - return False diff --git a/example/plugins/_bradge-kook-cqhttp.py b/example/plugins/_bradge-kook-cqhttp.py deleted file mode 100644 index 4b2712d..0000000 --- a/example/plugins/_bradge-kook-cqhttp.py +++ /dev/null @@ -1,25 +0,0 @@ -from iamai import Plugin - - -class Bradge(Plugin): - async def handle(self) -> None: - if self.event.adapter.name == "kook": - await self.bot.get_adapter("cqhttp").call_api( - "send_group_msg", - group_id=971050440, - message=f"[{self.event.adapter.name} - {self.event.extra.author.username}]\n{self.event.message}" - ) - elif self.event.adapter.name == "cqhttp": - if self.event.group_id == 971050440: - await self.bot.get_adapter("kook").call_api( - api="message/create", - target_id=1661426334688259, - content=f"[{self.event.adapter.name} - {self.event.sender.nickname}]\n{self.event.message}" - ) - - async def rule(self) -> bool: - if self.event.adapter.name not in ["cqhttp","kook"]: - return False - if self.event.type not in ["message","9",9]: - return False - return True
\ No newline at end of file diff --git a/example/plugins/psi.py b/example/plugins/psi.py deleted file mode 100644 index d768115..0000000 --- a/example/plugins/psi.py +++ /dev/null @@ -1,294 +0,0 @@ -""" SPI - Simple Pascal Interpreter """ - -############################################################################### -# # -# LEXER # -# # -############################################################################### - -# Token types -# -# EOF (end-of-file) token is used to indicate that -# there is no more input left for lexical analysis -from iamai import Plugin -from HydroRoll.utils import HydroDice -INTEGER, PLUS, MINUS, MUL, DIV, LPAREN, RPAREN, EOF = ( - "INTEGER", - "PLUS", - "MINUS", - "MUL", - "DIV", - "(", - ")", - "EOF", -) - -DICE = "DICE" - - -class Token(object): - """A single token from the lexer.""" - - def __init__(self, _type, _value): - self.type = _type - self.value = _value - - def __str__(self): - """String representation of the class instance. - - Examples: - Token(INTEGER, 3) - Token(PLUS, '+') - Token(MUL, '*') - """ - return f"Token({self.type}, {repr(self.value)})" - - def __repr__(self): - return self.__str__() - - -class Lexer(object): - """A lexer for the Psi language.""" - - def __init__(self, text): - # client string input, e.g. "4 + 2 * 3 - 6 / 2" - self.text = text - # self.pos is an index into self.text - self.pos = 0 - self.current_char = self.text[self.pos] - - def error(self): - """Raise an exception at the current position.""" - raise ValueError("Invalid character") - - def advance(self): - """Advance the `pos` pointer and set the `current_char` variable.""" - self.pos += 1 - if self.pos > len(self.text) - 1: - self.current_char = None # Indicates end of input - else: - self.current_char = self.text[self.pos] - - def skip_whitespace(self): - while self.current_char is not None and self.current_char.isspace(): - self.advance() - - def integer(self): - """Return a (multidigit) integer consumed from the input.""" - result = "" - while self.current_char is not None and self.current_char.isdigit(): - result += self.current_char - self.advance() - return int(result) - - def get_next_token(self): - """Lexical analyzer (also known as scanner or tokenizer)""" - while self.current_char is not None: - if self.current_char.isspace(): - self.skip_whitespace() - continue - - token_type = { - "+": PLUS, - "-": MINUS, - "d": DICE, - "*": MUL, - "/": DIV, - "(": LPAREN, - ")": RPAREN, - }.get(self.current_char) - - if token_type: - self.advance() - return Token(token_type, self.current_char) - - if self.current_char.isdigit(): - return Token(INTEGER, self.integer()) - - self.error() - - return Token(EOF, None) - - -############################################################################### -# # -# PARSER # -# # -############################################################################### - - -class AST(object): - pass - - -class BinOp(AST): - def __init__(self, left, op, right): - self.left = left - self.token = self.op = op - self.right = right - - -class Num(AST): - def __init__(self, token): - self.token = token - self.value = token.value - - -class UnaryOp(AST): - def __init__(self, op, expr): - self.token = self.op = op - self.expr = expr - - -class Parser(object): - def __init__(self, lexer): - self.lexer = lexer - # set current token to the first token taken from the input - self.current_token = self.lexer.get_next_token() - - def error(self): - raise Exception("Invalid syntax") - - def eat(self, token_type): - # compare the current token type with the passed token - # type and if they match then "eat" the current token - # and assign the next token to the self.current_token, - # otherwise raise an exception. - if self.current_token.type == token_type: - self.current_token = self.lexer.get_next_token() - else: - self.error() - - def factor(self): - """factor : (PLUS | MINUS | DICE) factor | INTEGER | LPAREN expr RPAREN""" - token = self.current_token - if token.type == PLUS: - self.eat(PLUS) - node = UnaryOp(token, self.factor()) - return node - elif token.type == MINUS: - self.eat(MINUS) - node = UnaryOp(token, self.factor()) - return node - elif token.type == DICE: - self.eat(DICE) - left = Num(Token(INTEGER, 1)) # 默认骰子个数为1 - right = self.factor() - node = BinOp(left, token, right) - return node - elif token.type == INTEGER: - self.eat(INTEGER) - return Num(token) - elif token.type == LPAREN: - self.eat(LPAREN) - node = self.expr() - self.eat(RPAREN) - return node - - def term(self): - """term : factor ((MUL | DIV) factor)*""" - node = self.factor() - - while self.current_token.type in (MUL, DIV): - token = self.current_token - if token.type == MUL: - self.eat(MUL) - elif token.type == DIV: - self.eat(DIV) - - node = BinOp(left=node, op=token, right=self.factor()) - - return node - - def expr(self): - """ - expr : term ((PLUS | MINUS) term)* - term : factor ((MUL | DIV) factor)* - factor : (PLUS | MINUS) factor | INTEGER | LPAREN expr RPAREN - """ - node = self.term() - - while self.current_token.type in (PLUS, MINUS): - token = self.current_token - if token.type == PLUS: - self.eat(PLUS) - elif token.type == MINUS: - self.eat(MINUS) - - node = BinOp(left=node, op=token, right=self.term()) - - return node - - def parse(self): - node = self.expr() - if self.current_token.type != EOF: - self.error() - return node - - -############################################################################### -# # -# INTERPRETER # -# # -############################################################################### - - -class NodeVisitor(object): - def visit(self, node): - method_name = "visit_" + type(node).__name__ - visitor = getattr(self, method_name, self.generic_visit) - return visitor(node) - - def generic_visit(self, node): - raise Exception("No visit_{} method".format(type(node).__name__)) - - -class Interpreter(NodeVisitor): - def __init__(self, parser): - self.parser = parser - - def visit_BinOp(self, node): - if node.op.type == PLUS: - return self.visit(node.left) + self.visit(node.right) - elif node.op.type == MINUS: - return self.visit(node.left) - self.visit(node.right) - elif node.op.type == DICE: - return int( - HydroDice(1).roll_dice( - _counts=self.visit(node.left), - _sides=self.visit(node.right), - streamline=True, - ) - ) - elif node.op.type == MUL: - return self.visit(node.left) * self.visit(node.right) - elif node.op.type == DIV: - return self.visit(node.left) // self.visit(node.right) - - def visit_Num(self, node): - return node.value - - def visit_UnaryOp(self, node): - op = node.op.type - if op == PLUS: - return +self.visit(node.expr) - elif op == MINUS: - return -self.visit(node.expr) - - def interpret(self): - tree = self.parser.parse() - if tree is None: - return "" - return self.visit(tree) - - -class Psi(Plugin): - async def handle(self) -> None: - lexer = Lexer(self.event.message.get_plain_text()[4:]) - parser = Parser(lexer) - interpreter = Interpreter(parser) - result = interpreter.interpret() - await self.event.reply(str(result)) - - async def rule(self) -> bool: - return self.event.type == "message" and self.event.message.startswith(".psi") diff --git a/example/plugins/r.py b/example/plugins/r.py deleted file mode 100644 index b2248a4..0000000 --- a/example/plugins/r.py +++ /dev/null @@ -1,23 +0,0 @@ -from iamai import Plugin -from numpy.random import Generator -from iamai.adapter.onebot11.message import CQHTTPMessage, CQHTTPMessageSegment -from iamai.log import logger - -ms = CQHTTPMessageSegment - - -class R(Plugin): - priority = 1 - - async def handle(self) -> None: - try: - await self.event.reply("test") - except Exception as e: - # await self.event.reply(f"ERROR!{e!r}") - logger.info("ERROR with message: {}".format(e)) - - async def rule(self) -> bool: - return ( - self.event.type == "message" - and self.event.message.get_plain_text().startswith(".show") - ) diff --git a/example/plugins/show.py b/example/plugins/show.py deleted file mode 100644 index 21c6722..0000000 --- a/example/plugins/show.py +++ /dev/null @@ -1,44 +0,0 @@ -from iamai import Plugin -from numpy.random import Generator -from iamai.adapter.onebot11.message import CQHTTPMessage, CQHTTPMessageSegment - -ms = CQHTTPMessageSegment - - - - -class Exec(Plugin): - - priority = 1 - - async def handle(self) -> None: - try: - content = [ - { - "type": "node", - "data": { - "name": f"{self.event.sender.nickname}", - "uin": f"{self.event.sender.user_id}", - "content": [ - { - "type": "text", - "data": { - "text": f"{eval(self.event.message.get_plain_text()[6:])}" - } - }, - # eval(self.event.message.get_plain_text()[6:]) - ] - } - } - ] - res = await self.event.adapter.send_group_forward_msg(group_id=int(self.event.group_id), messages=content) - except Exception as e: - # await self.event.reply(f"ERROR!{e!r}") - await self.bot.get_adapter('onebot11').send_guild_channel_msg(f"{eval(self.event.message.get_plain_text()[6:])}") - - async def rule(self) -> bool: - return ( - self.event.type == "message" - and - self.event.message.get_plain_text().startswith(".show") - )
\ No newline at end of file |
