1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
from collections import defaultdict
from itertools import chain
from pathlib import Path
from typing import Dict, List, Optional, Type, Union
from iamai import ConfigModel, Plugin
from iamai.log import logger, error_or_exception
from iamai.utils import ModulePathFinder, get_classes_from_module_name, get_classes_from_dir, get_classes_from_module
from .config import GlobalConfig
from iamai.exceptions import LoadModuleError
from .utils import BasePlugin, RegexPluginBase, CommandPluginBase
import os
# from .core import Rule, RuleLoadType
from HydroRollCore import Rule, RuleLoadType
class HydroRoll(Plugin):
class Config(ConfigModel):
__config_name__ = "HydroRoll"
priority = 0
rules_priority_dict: Dict[int, List[Type[Rule]]] = defaultdict(list)
_module_path_finder = ModulePathFinder()
def _load_rule_class(
self,
rule_class: Type[Rule],
rule_load_type: RuleLoadType,
rule_file_path: Optional[str],
):
logger.info(f'Loading rule from class "{rule_class!r}"')
"""加载规则类。"""
priority = getattr(rule_class, "priority", None)
if type(priority) is int and priority >= 0:
for _rule in self.rules:
if _rule.__name__ == rule_class.__name__:
logger.warning(
f'Already have a same name rule "{_rule.__name__}"'
)
rule_class.__rule_load_type__ = rule_load_type
rule_class.__rule_file_path__ = rule_file_path
self.rules_priority_dict[priority].append(rule_class)
logger.success(
f'Succeeded to load rule "{rule_class.__name__}" '
f'from class "{rule_class!r}"'
)
else:
error_or_exception(
f'Load rule from class "{rule_class!r}" failed:',
LoadModuleError(
f'Rule priority incorrect in the class "{rule_class!r}"'
),
self.bot.config.bot.log.verbose_exception,
)
def _load_rules_from_module_name(
self, module_name: str, rule_load_type: RuleLoadType
):
logger.info(f'Loading rules from module "{module_name}"')
"""从模块名称中规则包模块。"""
try:
rule_classes = get_classes_from_module_name(module_name, Rule)
except ImportError as e:
error_or_exception(
f'Import module "{module_name}" failed:',
e,
self.bot.config.bot.log.verbose_exception,
)
else:
for rule_class, module in rule_classes:
self._load_rule_class(
rule_class,
rule_load_type,
module.__file__,
)
def _load_rules(
self,
*rules: Union[Type[Plugin], str, Path],
rule_load_type: Optional[RuleLoadType] = None,
):
"""加载规则包。
Args:
*rules: 规则类、规则包模块名称或者规则包模块文件路径。类型可以是 `Type[Rule]`, `str` 或 `pathlib.Path`。
如果为 `Type[Rule]` 类型时,将作为规则类进行加载。
如果为 `str` 类型时,将作为规则包模块名称进行加载,格式和 Python `import` 语句相同。
例如:`path.of.rule`。
如果为 `pathlib.Path` 类型时,将作为规则包模块文件路径进行加载。
例如:`pathlib.Path("path/of/rule")`。
rule_load_type: 规则加载类型,如果为 None 则自动判断,否则使用指定的类型。
"""
logger.info("Loading rules...")
for rule_ in rules:
if isinstance(rule_, type):
if issubclass(rule_, Rule):
self._load_rule_class(
rule_, rule_load_type or RuleLoadType.CLASS, None
)
else:
logger.error(
f'The rule class "{rule_!r}" must be a subclass of Rule'
)
elif isinstance(rule_, str):
logger.warning(f'Loading rules from module "{rule_}"')
self._load_rules_from_module_name(
rule_, rule_load_type or RuleLoadType.NAME
)
elif isinstance(rule_, Path):
logger.warning(f'Loading rules from path "{rule_}"')
if rule_.is_file():
if rule_.suffix != ".py":
logger.error(f'The path "{rule_}" must endswith ".py"')
return
rule_module_name = None
for path in self._module_path_finder.path:
try:
if rule_.stem == "__init__":
if rule_.resolve().parent.parent.samefile(Path(path)):
rule_module_name = rule_.resolve().parent.name
break
elif rule_.resolve().parent.samefile(Path(path)):
rule_module_name = rule_.stem
break
except OSError:
continue
if rule_module_name is None:
rel_path = rule_.resolve().relative_to(Path(".").resolve())
if rel_path.stem == "__init__":
rule_module_name = ".".join(rel_path.parts[:-1])
else:
rule_module_name = ".".join(
rel_path.parts[:-1] + (rel_path.stem,)
)
self._load_rules_from_module_name(
rule_module_name, rule_load_type or RuleLoadType.FILE
)
else:
logger.error(f'The rule path "{rule_}" must be a file')
else:
logger.error(f"Type error: {rule_} can not be loaded as plugin")
def load_rules(self, *rules: Union[Type[Rule], str, Path]):
"""加载规则。
Args:
*rules: 规则类、规则包x模块名称或者规则包模块文件路径。类型可以是 `Type[Rule]`, `str` 或 `pathlib.Path`。
如果为 `Type[Rule]` 类型时,将作为规则类进行加载。
如果为 `str` 类型时,将作为规则包模块名称进行加载,格式和 Python `import` 语句相同。
例如:`path.of.rule`。
如果为 `pathlib.Path` 类型时,将作为规则包模块文件路径进行加载。
例如:`pathlib.Path("path/of/rule")`。
"""
# self._extend_rules.extend(rules)
return self._load_rules(*rules)
def _load_rules_from_dirs(self, *dirs: Path):
"""从目录中加载规则包,以 `_` 开头的模块中的规则不会被导入。路径可以是相对路径或绝对路径。
Args:
*dirs: 储存包含规则的模块的模块路径。
例如:`pathlib.Path("path/of/rules/")` 。
"""
logger.info("Loading rules from dirs...")
dirs = list(map(lambda x: str(x.resolve()), dirs)) # type: ignore maybe remove?
logger.warning(f'Loading rules from dirs "{", ".join(map(str, dirs))}"')
self._module_path_finder.path.extend(dirs) # type: ignore
# type: ignore
for rule_class, module in get_classes_from_dir(dirs, Rule): # type: ignore
self._load_rule_class(rule_class, RuleLoadType.DIR, module.__file__)
def load_rules_from_dirs(self, *dirs: Path):
"""从目录中加载规则,以 `_` 开头的模块中的规则不会被导入。路径可以是相对路径或绝对路径。
Args:
*dirs: 储存包含rule的模块的模块路径。
例如:`pathlib.Path("path/of/rules/")` 。
"""
# self._extend_rule_dirs.extend(dirs)
self._load_rules_from_dirs(*dirs)
@property
def rules(self) -> List[Type[Plugin]]:
"""当前已经加载的规则包的列表。"""
return list(chain(*self.rules_priority_dict.values()))
def __post_init__(self):
if not self.bot.global_state.get('init', False):
self.bot.global_state = dict()
self.bot.global_state['init'] = True
self._load_rules_from_dirs(Path(os.path.join("\\".join(os.path.dirname(__file__).split('\\')[:-2]),"rules"))) #*self.config.rule['rule_dirs'])
# self._load_rules(*self.config.rule.rules)
...
async def handle(self) -> None:
"""
@TODO: HydroRollCore should be able to handle all signals and tokens from Psi.
@BODY: HydroRollCore actives the rule-packages.
"""
if self.event.message.get_plain_text() == '.core':
await self.event.reply("HydroRollCore is running.")
elif self.event.message.startswith('.show'):
try:
await self.event.reply(eval(self.event.message.get_plain_text()[6:]))
except Exception as e:
await self.event.reply(f"{e!r}")
async def rule(self) -> bool:
"""
@TODO: Psi should be able to handle all message first.
@BODY: lexer module will return a list of tokens, parser module will parse the tokens into a tree, and executor module will execute the tokens with a stack with a bool return value.
"""
if self.event.type != "message":
return False
return self.event.message.get_plain_text().startswith(".")
|