aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--examples/COC7/Character.py48
-rw-r--r--examples/COC7/Wiki.py7
-rw-r--r--examples/COC7/__init__.py12
-rw-r--r--hrc/config.py28
-rw-r--r--hrc/core.py689
-rw-r--r--hrc/event.py107
-rw-r--r--hrc/log.py25
-rw-r--r--hrc/rule/BaseRule/CharacterCard.py (renamed from hrc/rules/BaseRule/CharacterCard.py)0
-rw-r--r--hrc/rule/BaseRule/CustomRule.py (renamed from hrc/rules/BaseRule/CustomRule.py)0
-rw-r--r--hrc/rule/BaseRule/Wiki.py (renamed from hrc/rules/BaseRule/Wiki.py)0
-rw-r--r--hrc/rule/__init__.py172
-rw-r--r--hrc/rules/__init__.py19
-rw-r--r--hrc/rules/config.py4
-rw-r--r--hrc/rules/py.typed0
-rw-r--r--hrc/typing.py22
-rw-r--r--hrc/utils.py299
-rw-r--r--pdm.lock26
-rw-r--r--pyproject.toml1
18 files changed, 1396 insertions, 63 deletions
diff --git a/examples/COC7/Character.py b/examples/COC7/Character.py
index d2a6b9f..b5f07c5 100644
--- a/examples/COC7/Character.py
+++ b/examples/COC7/Character.py
@@ -1,12 +1,11 @@
# MyRule
import math
-import dataclasses
+from typing import Union
from dataclasses import dataclass
-from typing import Literal, Optional, Union
-from pydantic import Field, BaseModel
-from hrc.rules import aliases, BaseRule
-from hrc.rules.BaseRule import CharacterCard
+
+from hrc.rule import aliases
+from hrc.rule.BaseRule import CharacterCard
@dataclass
@@ -14,28 +13,33 @@ class Attributes(CharacterCard.Attribute):
@property
@aliases(['luck', '运气'], ignore_case=True)
- def LUK(self) -> Union[str, int]: ...
+ def LUK(self) -> Union[str, int, None]: ...
@property
- def DB(self) -> Union[str, int]:
- sum = self.player_card.STR + self.player_card.SIZ
- if sum == 164:
- return math.ceil((sum-164)/80) + "D6"
- elif sum == 124:
- return "1D4"
+ @aliases(['伤害加值', 'DamageBonus'], ignore_case=True)
+ def DB(self) -> Union[str, int, None]:
+ sum = self.STR + self.SIZ
+ return (
+ str(math.ceil((sum - 164) / 80)) + "D6" if sum > 164 else
+ "1D4" if sum > 124 else
+ "0" if sum > 84 else
+ "-1" if sum > 64 else
+ "-2" if sum > 0 else
+ None
+ )
@property
@aliases(['年龄', 'age'], ignore_case=True)
- def AGE(self) -> Union[str, int]: ...
+ def AGE(self) -> Union[str, int, None]: ...
@property
@aliases(['HitPoints', '生命值', '生命'], ignore_case=True)
- def HP(self) -> Union[str, int]:
+ def HP(self) -> Union[str, int, None]:
return self.MAX_HP
@property
@aliases(['最大生命值', 'HitPointTotal', '总生命值'], ignore_case=True)
- def MAX_HP(self) -> Union[str, int]:
+ def MAX_HP(self) -> Union[str, int, None]:
if hasattr(self, 'CON') and hasattr(self, 'SIZ'):
return (self.CON + self.SIZ) // 10
else:
@@ -43,17 +47,17 @@ class Attributes(CharacterCard.Attribute):
@property
@aliases(['理智', 'Sanity', 'SanityPoint', '理智值', 'san值'], ignore_case=True)
- def SAN(self) -> Union[str, int]:
+ def SAN(self) -> Union[str, int, None]:
return self.POW
@property
@aliases(['最大理智值', 'MaximumSanity'], ignore_case=True)
- def MAX_SAN(self) -> Union[str, int]:
+ def MAX_SAN(self) -> Union[str, int, None]:
return 99 - self.player_card.CM
@property
@aliases(['魔法', '魔法值', 'MagicPoints'], ignore_case=True)
- def MP(self) -> Union[str, int]:
+ def MP(self) -> Union[str, int, None]:
if hasattr(self, 'POW'):
return math.floor(self.POW / 5)
else:
@@ -89,9 +93,9 @@ class Attributes(CharacterCard.Attribute):
@aliases(['移动速度'], ignore_case=True)
def MOV(self) -> Union[str, int, None]:
mov = 8
- siz = self.SIZ
- str_val = self.STR
- dex = self.DEX
+ siz = self.player_card.SIZ
+ str_val = self.player_card.STR
+ dex = self.player_card.DEX
age = self.AGE
if age >= 40:
@@ -105,7 +109,7 @@ class Attributes(CharacterCard.Attribute):
return mov
@property
- @aliases(['兴趣技能点', 'PersonalInterests'], ignore_case=Ture)
+ @aliases(['兴趣技能点', 'PersonalInterests'], ignore_case=True)
def PI(self) -> Union[str, int, None]:
return self.player_card.INT*2
diff --git a/examples/COC7/Wiki.py b/examples/COC7/Wiki.py
index 060fb9d..2c786e7 100644
--- a/examples/COC7/Wiki.py
+++ b/examples/COC7/Wiki.py
@@ -5,8 +5,5 @@ import dataclasses
from dataclasses import dataclass
from typing import Literal, Optional, Union
from pydantic import Field, BaseModel
-from hrc.rules import aliases, BaseRule
-from hrc.rules.BaseRule import CharacterCard
-
-class Query(Wiki):
- \ No newline at end of file
+from hrc.rule import aliases, BaseRule
+from hrc.rule.BaseRule import CharacterCard \ No newline at end of file
diff --git a/examples/COC7/__init__.py b/examples/COC7/__init__.py
index f3d781f..ecd2091 100644
--- a/examples/COC7/__init__.py
+++ b/examples/COC7/__init__.py
@@ -15,10 +15,10 @@ async def auto_card(_event='T_Event'):
async def overview_card(pc: player_card):
- max_hp = math.floor((pc.get('CON', 0) + pc.get('SIZ', 0) / 10)
- max_san=math.floor(99 - pc.get('CM', 0))
- mp=pc.get('MP', 0)
- mp_show=" mp" + str(mp) + "/" + str(
- math.floor(pc.get('POW', 0) / 5)
- ) if mp and mp != math.floor(pc.get('POW', 0) / 5) else ""
+ max_hp = math.floor((pc.get('CON', 0) + pc.get('SIZ', 0) / 10))
+ max_san = math.floor(99 - pc.get('CM', 0))
+ mp = pc.get('MP', 0)
+ mp_show = " mp" + str(mp) + "/" + str(
+ math.floor(pc.get('POW', 0) / 5)
+ ) if mp and mp != math.floor(pc.get('POW', 0) / 5) else ""
return pc.get('__Name', "") + " hp" + str(pc.get('HP', max_hp)) + "/" + str(max_hp) + " san" + str(pc.get('SAN', "?")) + "/" + str(max_san) + mp_show + " DEX" + str(pc.get('DEX', "?"))
diff --git a/hrc/config.py b/hrc/config.py
index e69de29..7db4efe 100644
--- a/hrc/config.py
+++ b/hrc/config.py
@@ -0,0 +1,28 @@
+from typing import Set, Union
+
+from pydantic import BaseModel, ConfigDict, DirectoryPath, Field
+
+class ConfigModel(BaseModel):
+ model_config = ConfigDict(extra="allow")
+
+ __config_name__: str = ""
+
+
+class LogConfig(ConfigModel):
+ level: Union[str, int] = "DEBUG"
+ verbose_exception: bool = False
+
+
+class CoreConfig(ConfigModel):
+ rules: Set[str] = Field(default_factory=set)
+ rule_dirs: Set[DirectoryPath] = Field(default_factory=set)
+ log: LogConfig = LogConfig()
+
+
+class RuleConfig(ConfigModel):
+ """rule configuration."""
+
+
+class MainConfig(ConfigModel):
+ core: CoreConfig = CoreConfig()
+ rule: RuleConfig = RuleConfig() \ No newline at end of file
diff --git a/hrc/core.py b/hrc/core.py
index 020ccb9..862b8d1 100644
--- a/hrc/core.py
+++ b/hrc/core.py
@@ -1 +1,688 @@
-class Core: ...
+import asyncio
+import json
+import pkgutil
+import signal
+import sys
+import threading
+import time
+from collections import defaultdict
+from contextlib import AsyncExitStack
+from itertools import chain
+from pathlib import Path
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Type,
+ Union,
+ overload,
+)
+
+from pydantic import ValidationError, create_model
+
+from .config import ConfigModel, MainConfig, RuleConfig
+from .dependencies import solve_dependencies
+from .log import logger
+from .rules import Rule
+from .utils import (
+ ModulePathFinder,
+ get_classes_from_module_name,
+ is_config_class,
+ samefile,
+ wrap_get_func,
+)
+
+if sys.version_info >= (3, 11): # pragma: no cover
+ import tomllib
+else: # pragma: no cover
+ import tomli as tomllib
+
+HANDLED_SIGNALS = (
+ signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
+ signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
+)
+
+
+class Core:
+
+ should_exit: asyncio.Event
+ rules_priority_dict: Dict[int, List[Type[Rule[Any, Any, Any]]]]
+
+ _condition: (asyncio.Condition)
+ _current_event: Optional[Event[Any]]
+ _restart_flag: bool
+ _module_path_finder: ModulePathFinder
+ _raw_config_dict: Dict[str, Any]
+ _handle_event_tasks: Set[
+ "asyncio.Task[None]"
+ ] # Event handling task, used to keep a reference to the adapter task
+ # The following properties are not cleared on reboot
+ _config_file: Optional[str] # Configuration file
+ _config_dict: Optional[Dict[str, Any]] # Configuration dictionary
+ _hot_reload: bool # Hot-Reload
+ _extend_rules: List[
+ Union[Type[Rule[Any, Any, Any]], str, Path]
+ ] # A list of rules loaded programmatically using the ``load_rules()`` method
+ _extend_rule_dirs: List[
+ Path
+ ] # List of rule paths loaded programmatically using the ``load_rules_from_dirs()`` method
+ _core_run_hooks: List[CoreHook]
+ _core_exit_hooks: List[CoreHook]
+ _event_pre_processor_hooks: List[EventHook]
+ _event_post_processor_hooks: List[EventHook]
+
+ def __init__(
+ self,
+ *,
+ config_file: Optional[str] = "config.toml",
+ config_dict: Optional[Dict[str, Any]] = None,
+ hot_reload: bool = False,
+ ) -> None:
+ self.config = MainConfig()
+ self.rules_priority_dict = defaultdict(list)
+ self._current_event = None
+ self._restart_flag = False
+ self._module_path_finder = ModulePathFinder()
+ self._raw_config_dict = {}
+ self._handle_event_tasks = set()
+
+ self._config_file = config_file
+ self._config_dict = config_dict
+ self._hot_reload = hot_reload
+
+ self._extend_rules = []
+ self._extend_rule_dirs = []
+ self._core_run_hooks = []
+ self._core_exit_hooks = []
+ self._event_pre_processor_hooks = []
+ self._event_post_processor_hooks = []
+
+ sys.meta_path.insert(0, self._module_path_finder)
+
+ @property
+ def rules(self) -> List[Type[Rule[Any, Any, Any]]]:
+ """List of currently loaded rules."""
+ return list(chain(*self.rules_priority_dict.values()))
+
+ def run(self) -> None:
+ self._restart_flag = True
+ while self._restart_flag:
+ self._restart_flag = False
+ asyncio.run(self._run())
+ if self._restart_flag:
+ self._load_plugins_from_dirs(*self._extend_rule_dirs)
+ self._load_plugins(*self._extend_rules)
+
+ def restart(self) -> None:
+ logger.info("Restarting...")
+ self._restart_flag = True
+ self.should_exit.set()
+
+ async def _run(self) -> None:
+ self.should_exit = asyncio.Event()
+ self._condition = asyncio.Condition()
+
+ # Monitor and intercept system exit signals to complete some aftermath work before closing the program
+ if threading.current_thread() is threading.main_thread(): # pragma: no cover
+ # Signals can only be processed in the main thread
+ try:
+ loop = asyncio.get_running_loop()
+ for sig in HANDLED_SIGNALS:
+ loop.add_signal_handler(sig, self._handle_exit)
+ except NotImplementedError:
+ # add_signal_handler is only available under Unix, below for Windows
+ for sig in HANDLED_SIGNALS:
+ signal.signal(sig, self._handle_exit)
+
+ # Load configuration file
+ self._reload_config_dict()
+
+ self._load_rules_from_dirs(*self.config.bot.rule_dirs)
+ self._load_rules(*self.config.bot.rules)
+ self._update_config()
+
+ logger.info("Running...")
+
+ hot_reload_task = None
+ if self._hot_reload: # pragma: no cover
+ hot_reload_task = asyncio.create_task(self._run_hot_reload())
+
+ for core_run_hook_func in self._core_run_hooks:
+ await core_run_hook_func(self)
+
+ self.rules_priority_dict.clear()
+ self._module_path_finder.path.clear()
+
+ def _remove_rule_by_path(
+ self, file: Path
+ ) -> List[Type[Rule[Any, Any, Any]]]: # pragma: no cover
+ removed_rules: List[Type[Rule[Any, Any, Any]]] = []
+ for plugins in self.plugins_priority_dict.values():
+ _removed_rules = list(
+ filter(
+ lambda x: x.__rule_load_type__ != RuleLoadType.CLASS
+ and x.__rule_file_path__ is not None
+ and samefile(x.__rule_file_path__, file),
+ rules,
+ )
+ )
+ removed_rules.extend(_removed_rules)
+ for rule_ in _removed_rules:
+ rules.remove(rule_)
+ logger.info(
+ "Succeeded to remove rule "
+ f'"{rule_.__name__}" from file "{file}"'
+ )
+ return removed_rules
+
+ async def _run_hot_reload(self) -> None: # pragma: no cover
+ """Hot reload."""
+ try:
+ from watchfiles import Change, awatch
+ except ImportError:
+ logger.warning(
+ 'Hot reload needs to install "watchfiles", try "pip install watchfiles"'
+ )
+ return
+
+ logger.info("Hot reload is working!")
+ async for changes in awatch(
+ *(
+ x.resolve()
+ for x in set(self._extend_rule_dirs)
+ .union(self.config.core.rule_dirs)
+ .union(
+ {Path(self._config_file)}
+ if self._config_dict is None and self._config_file is not None
+ else set()
+ )
+ ),
+ stop_event=self.should_exit,
+ ):
+ # Processed in the order of Change.deleted, Change.modified, Change.added
+ # To ensure that when renaming occurs, deletions are processed first and then additions are processed
+ for change_type, file_ in sorted(changes, key=lambda x: x[0], reverse=True):
+ file = Path(file_)
+ # Change configuration file
+ if (
+ self._config_file is not None
+ and samefile(self._config_file, file)
+ and change_type == change_type.modified
+ ):
+ logger.info(f'Reload config file "{self._config_file}"')
+ old_config = self.config
+ self._reload_config_dict()
+ if (
+ self.config.bot != old_config.bot
+ or self.config.adapter != old_config.adapter
+ ):
+ self.restart()
+ continue
+
+ # Change rule folder
+ if change_type == Change.deleted:
+ # Special handling for deletion operations
+ if file.suffix != ".py":
+ file = file / "__init__.py"
+ else:
+ if file.is_dir() and (file / "__init__.py").is_file():
+ # When a new directory is added and this directory contains the ``__init__.py`` file
+ # It means that what happens at this time is that a Python package is added, and the ``__init__.py`` file of this package is deemed to be added
+ file = file / "__init__.py"
+ if not (file.is_file() and file.suffix == ".py"):
+ continue
+
+ if change_type == Change.added:
+ logger.info(f"Hot reload: Added file: {file}")
+ self._load_plugins(
+ Path(file), rule_load_type=RuleLoadType.DIR, reload=True
+ )
+ self._update_config()
+ continue
+ if change_type == Change.deleted:
+ logger.info(f"Hot reload: Deleted file: {file}")
+ self._remove_rule_by_path(file)
+ self._update_config()
+ elif change_type == Change.modified:
+ logger.info(f"Hot reload: Modified file: {file}")
+ self._remove_rule_by_path(file)
+ self._load_plugins(
+ Path(file), rule_load_type=RuleLoadType.DIR, reload=True
+ )
+ self._update_config()
+
+ def _update_config(self) -> None:
+ def update_config(
+ source: List[Type[Rule[Any, Any, Any]]],
+ name: str,
+ base: Type[ConfigModel],
+ ) -> Tuple[Type[ConfigModel], ConfigModel]:
+ config_update_dict: Dict[str, Any] = {}
+ for i in source:
+ config_class = getattr(i, "Config", None)
+ if is_config_class(config_class):
+ default_value: Any
+ try:
+ default_value = config_class()
+ except ValidationError:
+ default_value = ...
+ config_update_dict[config_class.__config_name__] = (
+ config_class,
+ default_value,
+ )
+ config_model = create_model(
+ name, **config_update_dict, __base__=base)
+ return config_model, config_model()
+
+ self.config = create_model(
+ "Config",
+ rule=update_config(self.rules, "RuleConfig", RuleConfig),
+ __base__=MainConfig,
+ )(**self._raw_config_dict)
+ # Update the level of logging
+ logger.remove()
+ logger.add(sys.stderr, level=self.config.bot.log.level)
+
+ def _reload_config_dict(self) -> None:
+ """Reload the configuration file."""
+ self._raw_config_dict = {}
+
+ if self._config_dict is not None:
+ self._raw_config_dict = self._config_dict
+ elif self._config_file is not None:
+ try:
+ with Path(self._config_file).open("rb") as f:
+ if self._config_file.endswith(".json"):
+ self._raw_config_dict = json.load(f)
+ elif self._config_file.endswith(".toml"):
+ self._raw_config_dict = tomllib.load(f)
+ else:
+ self.error_or_exception(
+ "Read config file failed:",
+ OSError("Unable to determine config file type"),
+ )
+ except OSError as e:
+ self.error_or_exception("Can not open config file:", e)
+ except (ValueError, json.JSONDecodeError, tomllib.TOMLDecodeError) as e:
+ self.error_or_exception("Read config file failed:", e)
+
+ try:
+ self.config = MainConfig(**self._raw_config_dict)
+ except ValidationError as e:
+ self.config = MainConfig()
+ self.error_or_exception("Config dict parse error:", e)
+ self._update_config()
+
+ def reload_rules(self) -> None:
+ self.rules_priority_dict.clear()
+ self._load_rules(*self.config.core.rules)
+ self._load_rules_from_dirs(*self.config.core.rule_dirs)
+ self._load_rules(*self._extend_rules)
+ self._load_rules_from_dirs(*self._extend_rule_dirs)
+ self._update_config()
+
+ def _handle_exit(self, *_args: Any) -> None: # pragma: no cover
+ """When the robot receives the exit signal, it will handle it according to the situation."""
+ logger.info("Stopping...")
+ if self.should_exit.is_set():
+ logger.warning("Force Exit...")
+ sys.exit()
+ else:
+ self.should_exit.set()
+
+ async def handle_event(
+ self,
+ current_event: Event[Any],
+ *,
+ handle_get: bool = True,
+ show_log: bool = True,
+ ) -> None:
+ if show_log:
+ logger.info(
+ f"Rule {current_event.rule.name} received: {current_event!r}"
+ )
+
+ if handle_get:
+ _handle_event_task = asyncio.create_task(self._handle_event())
+ self._handle_event_tasks.add(_handle_event_task)
+ _handle_event_task.add_done_callback(
+ self._handle_event_tasks.discard)
+ await asyncio.sleep(0)
+ async with self._condition:
+ self._current_event = current_event
+ self._condition.notify_all()
+ else:
+ _handle_event_task = asyncio.create_task(
+ self._handle_event(current_event))
+ self._handle_event_tasks.add(_handle_event_task)
+ _handle_event_task.add_done_callback(
+ self._handle_event_tasks.discard)
+
+ async def _handle_event(self, current_event: Optional[Event[Any]] = None) -> None:
+ if current_event is None:
+ async with self._condition:
+ await self._condition.wait()
+ assert self._current_event is not None
+ current_event = self._current_event
+ if current_event.__handled__:
+ return
+
+ for _hook_func in self._event_pre_processor_hooks:
+ await _hook_func(current_event)
+
+ for rule_priority in sorted(self.rules_priority_dict.keys()):
+ logger.debug(
+ f"Checking for matching rules with priority {rule_priority!r}"
+ )
+ stop = False
+ for rule in self.rules_priority_dict[rule_priority]:
+ try:
+ async with AsyncExitStack() as stack:
+ _rule = await solve_dependencies(
+ rule,
+ use_cache=True,
+ stack=stack,
+ dependency_cache={
+ Core: self,
+ Event: current_event,
+ },
+ )
+ if await _rule.rule():
+ logger.info(f"Event will be handled by {_rule!r}")
+ try:
+ await _rule.handle()
+ finally:
+ if _rule.block:
+ stop = True
+ except SkipException:
+ # The plug-in requires that it skips itself and continues the current event propagation
+ continue
+ except StopException:
+ # Plugin requires stopping current event propagation
+ stop = True
+ except Exception as e:
+ self.error_or_exception(f'Exception in rule "{rule}":', e)
+ if stop:
+ break
+
+ for _hook_func in self._event_post_processor_hooks:
+ await _hook_func(current_event)
+
+ logger.info("Event Finished")
+
+ @overload
+ async def get(
+ self,
+ func: Optional[Callable[[Event[Any]],
+ Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: None = None,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> Event[Any]: ...
+
+ @overload
+ async def get(
+ self,
+ func: Optional[Callable[[EventT],
+ Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: None = None,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> EventT: ...
+
+ @overload
+ async def get(
+ self,
+ func: Optional[Callable[[EventT],
+ Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: Type[EventT],
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> EventT: ...
+
+ async def get(
+ self,
+ func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: Optional[Type[Event[Any]]] = None,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> Event[Any]:
+ """Get events that meet the specified conditions. The coroutine will wait until the adapter receives events that meet the conditions, exceeds the maximum number of events, or times out.
+
+ Args:
+ func: Coroutine or function, the function will be automatically packaged as a coroutine for execution.
+ Requires an event to be accepted as a parameter and returns a Boolean value. Returns the current event when the coroutine returns ``True``.
+ When ``None`` is equivalent to the input coroutine returning true for any event, that is, returning the next event received by the adapter.
+ event_type: When specified, only events of the specified type are accepted, taking effect before the func condition. Defaults to ``None``.
+ adapter_type: When specified, only events generated by the specified adapter will be accepted, taking effect before the func condition. Defaults to ``None``.
+ max_try_times: Maximum number of events.
+ timeout: timeout period.
+
+ Returns:
+ Returns events that satisfy the condition of ``func``.
+
+ Raises:
+ GetEventTimeout: Maximum number of events exceeded or timeout.
+ """
+ _func = wrap_get_func(func)
+
+ try_times = 0
+ start_time = time.time()
+ while not self.should_exit.is_set():
+ if max_try_times is not None and try_times > max_try_times:
+ break
+ if timeout is not None and time.time() - start_time > timeout:
+ break
+
+ async with self._condition:
+ if timeout is None:
+ await self._condition.wait()
+ else:
+ try:
+ await asyncio.wait_for(
+ self._condition.wait(),
+ timeout=start_time + timeout - time.time(),
+ )
+ except asyncio.TimeoutError:
+ break
+
+ if (
+ self._current_event is not None
+ and not self._current_event.__handled__
+ and (
+ event_type is None
+ or isinstance(self._current_event, event_type)
+ )
+ and await _func(self._current_event)
+ ):
+ self._current_event.__handled__ = True
+ return self._current_event
+
+ try_times += 1
+
+ raise GetEventTimeout
+
+ def _load_rule_class(
+ self,
+ rule_class: Type[Rule[Any, Any, Any]],
+ rule_load_type: RuleLoadType,
+ rule_file_path: Optional[str],
+ ) -> None:
+ """Load a rule class"""
+ priority = getattr(rule_class, "priority", None)
+ if isinstance(priority, int) and priority >= 0:
+ for _rule in self.rules:
+ if _rule.__name__ == rule_class.__name__:
+ logger.warning(
+ f'Already have a same name rule pack "{
+ _rule.__name__}"'
+ )
+ rule_class.__rule_load_type__ = rule_load_type
+ rule_class.__rule_file_path__ = rule_file_path
+ self.rules_priority_dict[priority].append(rule_class)
+ logger.info(
+ f'Succeeded to load rule "{rule_class.__name__}" '
+ f'from class "{rule_class!r}"'
+ )
+ else:
+ self.error_or_exception(
+ f'Load rule from class "{rule_class!r}" failed:',
+ LoadModuleError(
+ f'Rule priority incorrect in the class "{
+ rule_class!r}"'
+ ),
+ )
+
+ def _load_rules_from_module_name(
+ self,
+ module_name: str,
+ *,
+ rule_load_type: RuleLoadType,
+ reload: bool = False,
+ ) -> None:
+ """Load rules from the given module."""
+ try:
+ rule_classes = get_classes_from_module_name(
+ module_name, Rule, reload=reload
+ )
+ except ImportError as e:
+ self.error_or_exception(
+ f'Import module "{module_name}" failed:', e)
+ else:
+ for rule_class, module in rule_classes:
+ self._load_rule_class(
+ rule_class, # type: ignore
+ rule_load_type,
+ module.__file__,
+ )
+
+ def _load_rules(
+ self,
+ *rules: Union[Type[Rule[Any, Any, Any]], str, Path],
+ rule_load_type: Optional[RuleLoadType] = None,
+ reload: bool = False,
+ ) -> None:
+ for rule_ in rules:
+ try:
+ if isinstance(rule_, type) and issubclass(rule_, Rule):
+ self._load_plugin_class(
+ rule_, rule_load_type or RuleLoadType.CLASS, None
+ )
+ elif isinstance(rule_, str):
+ logger.info(f'Loading rules from module "{rule_}"')
+ self._load_rules_from_module_name(
+ rule_,
+ rule_load_type=rule_load_type or RuleLoadType.NAME,
+ reload=reload,
+ )
+ elif isinstance(rule_, Path):
+ logger.info(f'Loading rules from path "{rule_}"')
+ if not rule_.is_file():
+ raise LoadModuleError( # noqa: TRY301
+ f'The rule path "{rule_}" must be a file'
+ )
+
+ if rule_.suffix != ".py":
+ raise LoadModuleError( # noqa: TRY301
+ f'The path "{rule_}" must endswith ".py"'
+ )
+
+ rule_module_name = None
+ for path in self._module_path_finder.path:
+ try:
+ if rule_.stem == "__init__":
+ if rule_.resolve().parent.parent.samefile(Path(path)):
+ rule_module_name = rule_.resolve().parent.name
+ break
+ elif rule_.resolve().parent.samefile(Path(path)):
+ rule_module_name = rule_.stem
+ break
+ except OSError:
+ continue
+ if rule_module_name is None:
+ rel_path = rule_.resolve().relative_to(Path().resolve())
+ if rel_path.stem == "__init__":
+ rule_module_name = ".".join(rel_path.parts[:-1])
+ else:
+ rule_module_name = ".".join(
+ rel_path.parts[:-1] + (rel_path.stem,)
+ )
+
+ self._load_rules_from_module_name(
+ rule_module_name,
+ rule_load_type=rule_load_type or RuleLoadType.FILE,
+ reload=reload,
+ )
+ else:
+ raise TypeError( # noqa: TRY301
+ f"{rule_} can not be loaded as rule"
+ )
+ except Exception as e:
+ self.error_or_exception(f'Load rule "{rule_}" failed:', e)
+
+ def load_rules(
+ self, *rules: Union[Type[Rule[Any, Any, Any]], str, Path]
+ ) -> None:
+ self._extend_plugins.extend(rules)
+
+ return self._load_plugins(*rules)
+
+ def _load_rules_from_dirs(self, *dirs: Path) -> None:
+ dir_list = [str(x.resolve()) for x in dirs]
+ logger.info(f'Loading rules from dirs "{
+ ", ".join(map(str, dir_list))}"')
+ self._module_path_finder.path.extend(dir_list)
+ for module_info in pkgutil.iter_modules(dir_list):
+ if not module_info.name.startswith("_"):
+ self._load_rules_from_module_name(
+ module_info.name, rule_load_type=RuleLoadType.DIR
+ )
+
+ def load_rules_from_dirs(self, *dirs: Path) -> None:
+ self._extend_rule_dirs.extend(dirs)
+ self._load_rules_from_dirs(*dirs)
+
+ def get_plugin(self, name: str) -> Type[Rule[Any, Any, Any]]:
+ for _rule in self.rules:
+ if _rule.__name__ == name:
+ return _rule
+ raise LookupError(f'Can not find rule named "{name}"')
+
+ def error_or_exception(
+ self, message: str, exception: Exception
+ ) -> None: # pragma: no cover
+ """Output error or exception logs based on the current Bot configuration.
+
+ Args:
+ message: message.
+ exception: Exception.
+ """
+ if self.config.bot.log.verbose_exception:
+ logger.exception(message)
+ else:
+ logger.error(f"{message} {exception!r}")
+
+ def core_run_hook(self, func: CoreHook) -> CoreHook:
+ self._core_run_hooks.append(func)
+ return func
+
+ def core_exit_hook(self, func: CoreHook) -> CoreHook:
+ self._core_exit_hooks.append(func)
+ return func
+
+ def event_pre_processor_hook(self, func: EventHook) -> EventHook:
+ self._event_preprocessor_hooks.append(func)
+ return func
+
+ def event_post_processor_hook(self, func: EventHook) -> EventHook:
+ self._event_post_processor_hooks.append(func)
+ return func
diff --git a/hrc/event.py b/hrc/event.py
new file mode 100644
index 0000000..7f6fb6d
--- /dev/null
+++ b/hrc/event.py
@@ -0,0 +1,107 @@
+from abc import ABC, abstractmethod
+from typing import TYPE_CHECKING, Any, Generic, Optional, Union
+from typing_extensions import Self
+
+from pydantic import BaseModel, ConfigDict
+from .typing import RuleT
+
+class Event(ABC, BaseModel, Generic[RuleT]):
+ model_config = ConfigDict(extra="allow")
+
+ if TYPE_CHECKING:
+ rule: RuleT
+ else:
+ rule: Any
+ type: Optional[str]
+ __handled__: bool = False
+
+ def __str__(self) -> str:
+ return f"Event<{self.type}>"
+
+ def __repr__(self) -> str:
+ return self.__str__()
+
+
+class MessageEvent(Event[RuleT], Generic[RuleT]):
+ """Base class for general message event classes."""
+
+ @abstractmethod
+ def get_plain_text(self) -> str:
+ """Get the plain text content of the message.
+
+ Returns:
+ The plain text content of the message.
+ """
+
+ @abstractmethod
+ async def reply(self, message: str) -> Any:
+ """Reply message.
+
+ Args:
+ message: The content of the reply message.
+
+ Returns:
+ The response to the reply message action.
+ """
+
+ @abstractmethod
+ async def is_same_sender(self, other: Self) -> bool:
+ """Determine whether itself and another event are the same sender.
+
+ Args:
+ other: another event.
+
+ Returns:
+ Is it the same sender?
+ """
+
+ async def get(
+ self,
+ *,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> Self:
+ """Get the user's reply message.
+
+ Equivalent to `get()` of ``Bot``, the condition is that the adapter, event type and sender are the same.
+
+ Args:
+ max_try_times: Maximum number of events.
+ timeout: timeout period.
+
+ Returns:
+ Message event that the user replies to.
+
+ Raises:
+ GetEventTimeout: Maximum number of events exceeded or timeout.
+ """
+
+ return await self.rule.get(
+ self.is_same_sender,
+ event_type=type(self),
+ max_try_times=max_try_times,
+ timeout=timeout,
+ )
+
+ async def ask(
+ self,
+ message: str,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> Self:
+ """Ask for news.
+
+ Indicates getting the user's reply after replying to a message.
+ Equivalent to executing ``get()`` after ``reply()``.
+
+ Args:
+ message: The content of the reply message.
+ max_try_times: Maximum number of events.
+ timeout: timeout period.
+
+ Returns:
+ Message event that the user replies to.
+ """
+
+ await self.reply(message)
+ return await self.get(max_try_times=max_try_times, timeout=timeout) \ No newline at end of file
diff --git a/hrc/log.py b/hrc/log.py
index e69de29..3a84bba 100644
--- a/hrc/log.py
+++ b/hrc/log.py
@@ -0,0 +1,25 @@
+import os
+import sys
+from datetime import datetime
+
+from loguru import logger as _logger
+
+logger = _logger
+
+current_path = os.path.dirname(os.path.abspath("__file__"))
+log_path = os.path.join(
+ current_path, "logs", datetime.now().strftime("%Y-%m-%d") + ".log"
+)
+
+
+def error_or_exception(message: str, exception: Exception, verbose: bool):
+ logger.remove()
+ logger.add(
+ sys.stderr,
+ format="<magenta>{time:YYYY-MM-DD HH:mm:ss.SSS}</magenta> <level>[{level}]</level> > <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
+ )
+ logger.add(sink=log_path, level="INFO", rotation="10 MB")
+ if verbose:
+ logger.exception(message)
+ else:
+ logger.critical(f"{message} {exception!r}") \ No newline at end of file
diff --git a/hrc/rules/BaseRule/CharacterCard.py b/hrc/rule/BaseRule/CharacterCard.py
index 6d09e5a..6d09e5a 100644
--- a/hrc/rules/BaseRule/CharacterCard.py
+++ b/hrc/rule/BaseRule/CharacterCard.py
diff --git a/hrc/rules/BaseRule/CustomRule.py b/hrc/rule/BaseRule/CustomRule.py
index e69de29..e69de29 100644
--- a/hrc/rules/BaseRule/CustomRule.py
+++ b/hrc/rule/BaseRule/CustomRule.py
diff --git a/hrc/rules/BaseRule/Wiki.py b/hrc/rule/BaseRule/Wiki.py
index e69de29..e69de29 100644
--- a/hrc/rules/BaseRule/Wiki.py
+++ b/hrc/rule/BaseRule/Wiki.py
diff --git a/hrc/rule/__init__.py b/hrc/rule/__init__.py
new file mode 100644
index 0000000..473d143
--- /dev/null
+++ b/hrc/rule/__init__.py
@@ -0,0 +1,172 @@
+import functools
+from typing import Generic, Any, Type
+
+from abc import ABC
+
+from . import BaseRule
+from ..typing import RulesT
+
+
+"""iamai 插件。
+
+所有 iamai 插件的基类。所有用户编写的插件必须继承自 `Plugin` 类。
+"""
+
+import inspect
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+ Generic,
+ NoReturn,
+ Optional,
+ Tuple,
+ Type,
+ cast,
+ final,
+)
+from typing_extensions import Annotated, get_args, get_origin
+
+from ..config import ConfigModel
+from ..dependencies import Depends
+from ..event import Event
+from ..exceptions import SkipException, StopException
+from ..typing import ConfigT, EventT, StateT
+from ..utils import is_config_class
+
+if TYPE_CHECKING:
+ from ..core import Core
+
+
+class RuleLoadType(Enum):
+ """Rules loaded types."""
+
+ DIR = "dir"
+ NAME = "name"
+ FILE = "file"
+ CLASS = "class"
+
+
+class Rule(ABC, Generic[EventT, StateT, ConfigT]):
+ priority: ClassVar[int] = 0
+ block: ClassVar[bool] = False
+
+ # Cannot use ClassVar because PEP 526 does not allow it
+ Config: Type[ConfigT]
+
+ __rule_load_type__: ClassVar[RuleLoadType]
+ __rule_file_path__: ClassVar[Optional[str]]
+
+ if TYPE_CHECKING:
+ event: EventT
+ else:
+ event = Depends(Event)
+
+ def __init_state__(self) -> Optional[StateT]:
+ """Initialize rule state."""
+
+ def __init_subclass__(
+ cls,
+ config: Optional[Type[ConfigT]] = None,
+ init_state: Optional[StateT] = None,
+ **_kwargs: Any,
+ ) -> None:
+ super().__init_subclass__()
+
+ orig_bases: Tuple[type, ...] = getattr(cls, "__orig_bases__", ())
+ for orig_base in orig_bases:
+ origin_class = get_origin(orig_base)
+ if inspect.isclass(origin_class) and issubclass(origin_class, Rule):
+ try:
+ _event_t, state_t, config_t = cast(
+ Tuple[EventT, StateT, ConfigT], get_args(orig_base)
+ )
+ except ValueError: # pragma: no cover
+ continue
+ if (
+ config is None
+ and inspect.isclass(config_t)
+ and issubclass(config_t, ConfigModel)
+ ):
+ config = config_t # pyright: ignore
+ if (
+ init_state is None
+ and get_origin(state_t) is Annotated
+ and hasattr(state_t, "__metadata__")
+ ):
+ init_state = state_t.__metadata__[0] # pyright: ignore
+
+ if not hasattr(cls, "Config") and config is not None:
+ cls.Config = config
+ if cls.__init_state__ is Rule.__init_state__ and init_state is not None:
+ cls.__init_state__ = lambda _: init_state # type: ignore
+
+ @final
+ @property
+ def name(self) -> str:
+ """rule class name."""
+ return self.__class__.__name__
+
+ @final
+ @property
+ def core(self) -> "Core":
+ """core object."""
+ return self.event.bot # pylint: disable=no-member
+
+ @final
+ @property
+ def config(self) -> ConfigT:
+ """rule configuration."""
+ default: Any = None
+ config_class = getattr(self, "Config", None)
+ if is_config_class(config_class):
+ return getattr(
+ self.core.config.rule,
+ config_class.__config_name__,
+ default,
+ )
+ return default
+
+ @final
+ def stop(self) -> NoReturn:
+ """Stop propagation of current events."""
+ raise StopException
+
+ @final
+ def skip(self) -> NoReturn:
+ """Skips itself and continues propagation of the current event."""
+ raise SkipException
+
+ @property
+ def state(self) -> StateT:
+ """plugin status."""
+ return self.bot.plugin_state[self.name]
+
+ @state.setter
+ @final
+ def state(self, value: StateT) -> None:
+ self.bot.plugin_state[self.name] = value
+
+ @abstractmethod
+ async def handle(self) -> None:
+ """Method to handle events. iamai will call this method when the ``rule()`` method returns ``True``. Each plugin must implement this method."""
+ raise NotImplementedError
+
+ @abstractmethod
+ async def rule(self) -> bool:
+ """Method to match the event. When the event is processed, this method will be called in sequence according to the priority of the plugin. When this method returns ``True``, the event will be handed over to this plugin for processing. Each plugin must implement this method.
+
+ .. note::
+ It is not recommended to implement event processing directly in this method. Please leave the specific processing of events to the ``handle()`` method.
+ """
+ raise NotImplementedError
+
+
+def aliases(names, ignore_case=False):
+ def decorator(func):
+ func._aliases = names
+ func._ignore_case = ignore_case
+ return func
+ return decorator
diff --git a/hrc/rules/__init__.py b/hrc/rules/__init__.py
deleted file mode 100644
index ff1c230..0000000
--- a/hrc/rules/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-import functools
-from typing import Generic, Any, Type
-
-from abc import ABC
-
-from . import BaseRule
-from ..typing import RulesT
-
-
-class Rules(ABC, Generic[RulesT]):
- ...
-
-
-def aliases(names, ignore_case=False):
- def decorator(func):
- func._aliases = names
- func._ignore_case = ignore_case
- return func
- return decorator
diff --git a/hrc/rules/config.py b/hrc/rules/config.py
deleted file mode 100644
index 2926a03..0000000
--- a/hrc/rules/config.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from pydantic import ConfigDict, BaseModel
-
-class ConfigModel(BaseModel):
- ... \ No newline at end of file
diff --git a/hrc/rules/py.typed b/hrc/rules/py.typed
deleted file mode 100644
index e69de29..0000000
--- a/hrc/rules/py.typed
+++ /dev/null
diff --git a/hrc/typing.py b/hrc/typing.py
index b3ced30..934fc98 100644
--- a/hrc/typing.py
+++ b/hrc/typing.py
@@ -1,7 +1,19 @@
-from typing import TypeVar, Generic, Any, TYPE_CHECKING, Awaitable, Callable, Optional
+# ruff: noqa: TCH001
+from typing import TYPE_CHECKING, Awaitable, Callable, Optional, TypeVar
if TYPE_CHECKING:
- from .rules import Rules
-
-
-RulesT = TypeVar("RulesT", bound="Rules[Any]") \ No newline at end of file
+ from typing import Any
+
+ from .core import Core
+ from .config import ConfigModel
+ from .event import Event
+ from .rule import Rule
+
+
+StateT = TypeVar("StateT")
+EventT = TypeVar("EventT", bound="Event[Any]")
+RuleT = TypeVar("RuleT", bound="Rule[Any, Any, Any]")
+ConfigT = TypeVar("ConfigT", bound=Optional["ConfigModel"])
+
+CoreHook = Callable[["Core"], Awaitable[None]]
+EventHook = Callable[["Event[Any]"], Awaitable[None]] \ No newline at end of file
diff --git a/hrc/utils.py b/hrc/utils.py
index e69de29..d053d4d 100644
--- a/hrc/utils.py
+++ b/hrc/utils.py
@@ -0,0 +1,299 @@
+"""A utility used internally by iamai."""
+
+import asyncio
+import importlib
+import inspect
+import json
+import os
+import os.path
+import sys
+import traceback
+from abc import ABC
+from contextlib import asynccontextmanager
+from functools import partial
+from importlib.abc import MetaPathFinder
+from importlib.machinery import ModuleSpec, PathFinder
+from types import GetSetDescriptorType, ModuleType
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ AsyncGenerator,
+ Awaitable,
+ Callable,
+ ClassVar,
+ ContextManager,
+ Coroutine,
+ Dict,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+)
+from typing_extensions import ParamSpec, TypeAlias, TypeGuard
+
+from pydantic import BaseModel
+
+from .config import ConfigModel
+from .typing import EventT
+
+if TYPE_CHECKING:
+ from os import PathLike
+
+__all__ = [
+ "ModulePathFinder",
+ "is_config_class",
+ "get_classes_from_module",
+ "get_classes_from_module_name",
+ "PydanticEncoder",
+ "samefile",
+ "sync_func_wrapper",
+ "sync_ctx_manager_wrapper",
+ "wrap_get_func",
+ "get_annotations",
+]
+
+_T = TypeVar("_T")
+_P = ParamSpec("_P")
+_R = TypeVar("_R")
+_TypeT = TypeVar("_TypeT", bound=Type[Any])
+
+StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
+
+
+class ModulePathFinder(MetaPathFinder):
+ """Meta path finder for finding iamai components."""
+
+ path: ClassVar[List[str]] = []
+
+ def find_spec(
+ self,
+ fullname: str,
+ path: Optional[Sequence[str]] = None,
+ target: Optional[ModuleType] = None,
+ ) -> Union[ModuleSpec, None]:
+ """Used to find the ``spec`` of a specified module."""
+ if path is None:
+ path = []
+ return PathFinder.find_spec(fullname, self.path + list(path), target)
+
+
+def is_config_class(config_class: Any) -> TypeGuard[Type[ConfigModel]]:
+ """Determine whether an object is a configuration class.
+
+ Args:
+ config_class: The object to be judged.
+
+ Returns:
+ Returns whether it is a configuration class.
+ """
+ return (
+ inspect.isclass(config_class)
+ and issubclass(config_class, ConfigModel)
+ and isinstance(getattr(config_class, "__config_name__", None), str)
+ and ABC not in config_class.__bases__
+ and not inspect.isabstract(config_class)
+ )
+
+
+def get_classes_from_module(module: ModuleType, super_class: _TypeT) -> List[_TypeT]:
+ """Find a class of the specified type from the module.
+
+ Args:
+ module: Python module.
+ super_class: The superclass of the class to be found.
+
+ Returns:
+ Returns a list of classes that meet the criteria.
+ """
+ classes: List[_TypeT] = []
+ for _, module_attr in inspect.getmembers(module, inspect.isclass):
+ if (
+ (inspect.getmodule(module_attr) or module) is module
+ and issubclass(module_attr, super_class)
+ and module_attr != super_class
+ and ABC not in module_attr.__bases__
+ and not inspect.isabstract(module_attr)
+ ):
+ classes.append(cast(_TypeT, module_attr))
+ return classes
+
+
+def get_classes_from_module_name(
+ name: str, super_class: _TypeT, *, reload: bool = False
+) -> List[Tuple[_TypeT, ModuleType]]:
+ """Find a class of the specified type from the module with the specified name.
+
+ Args:
+ name: module name, the format is the same as the Python ``import`` statement.
+ super_class: The superclass of the class to be found.
+ reload: Whether to reload the module.
+
+ Returns:
+ Returns a list of tuples consisting of classes and modules that meet the criteria.
+
+ Raises:
+ ImportError: An error occurred while importing the module.
+ """
+ try:
+ importlib.invalidate_caches()
+ module = importlib.import_module(name)
+ if reload:
+ importlib.reload(module)
+ return [(x, module) for x in get_classes_from_module(module, super_class)]
+ except KeyboardInterrupt:
+ # Do not capture KeyboardInterrupt
+ # Catching KeyboardInterrupt will prevent the user from closing Python when the module being imported is stuck in an infinite loop
+ raise
+ except BaseException as e:
+ raise ImportError(e, traceback.format_exc()) from e
+
+
+class PydanticEncoder(json.JSONEncoder):
+ """``JSONEncoder`` class for parsing ``pydantic.BaseModel``."""
+
+ def default(self, o: Any) -> Any:
+ """Returns a serializable object of ``o``."""
+ if isinstance(o, BaseModel):
+ return o.model_dump(mode="json")
+ return super().default(o)
+
+
+def samefile(path1: StrOrBytesPath, path2: StrOrBytesPath) -> bool:
+ """A simple wrapper around ``os.path.samefile``.
+
+ Args:
+ path1: path1.
+ path2: path 2.
+
+ Returns:
+ If two paths point to the same file or directory.
+ """
+ try:
+ return path1 == path2 or os.path.samefile(path1, path2) # noqa: PTH121
+ except OSError:
+ return False
+
+
+def sync_func_wrapper(
+ func: Callable[_P, _R], *, to_thread: bool = False
+) -> Callable[_P, Coroutine[None, None, _R]]:
+ """Wrap a synchronous function as an asynchronous function.
+
+ Args:
+ func: synchronous function to be packaged.
+ to_thread: Whether to run the synchronization function in a separate thread. Defaults to ``False``.
+
+ Returns:
+ Asynchronous functions.
+ """
+ if to_thread:
+
+ async def _wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
+ loop = asyncio.get_running_loop()
+ func_call = partial(func, *args, **kwargs)
+ return await loop.run_in_executor(None, func_call)
+
+ else:
+
+ async def _wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
+ return func(*args, **kwargs)
+
+ return _wrapper
+
+
+@asynccontextmanager
+async def sync_ctx_manager_wrapper(
+ cm: ContextManager[_T], *, to_thread: bool = False
+) -> AsyncGenerator[_T, None]:
+ """Wrap a synchronous context manager into an asynchronous context manager.
+
+ Args:
+ cm: The synchronization context manager to be wrapped.
+ to_thread: Whether to run the synchronization function in a separate thread. Defaults to ``False``.
+
+ Returns:
+ Asynchronous context manager.
+ """
+ try:
+ yield await sync_func_wrapper(cm.__enter__, to_thread=to_thread)()
+ except Exception as e:
+ if not await sync_func_wrapper(cm.__exit__, to_thread=to_thread)(
+ type(e), e, e.__traceback__
+ ):
+ raise
+ else:
+ await sync_func_wrapper(cm.__exit__, to_thread=to_thread)(None, None, None)
+
+
+def wrap_get_func(
+ func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]],
+) -> Callable[[EventT], Awaitable[bool]]:
+ """Wrap the parameters accepted by the ``get()`` function into an asynchronous function.
+
+ Args:
+ func: The parameters accepted by the ``get()`` function.
+
+ Returns:
+ Asynchronous functions.
+ """
+ if func is None:
+ return sync_func_wrapper(lambda _: True)
+ if not asyncio.iscoroutinefunction(func):
+ return sync_func_wrapper(func) # type: ignore
+ return func
+
+
+if sys.version_info >= (3, 10): # pragma: no cover
+ from inspect import get_annotations
+else: # pragma: no cover
+
+ def get_annotations(
+ obj: Union[Callable[..., object], Type[Any], ModuleType],
+ ) -> Dict[str, Any]:
+ """Compute the annotation dictionary of an object.
+
+ Args:
+ obj: A callable object, class, or module.
+
+ Raises:
+ TypeError: ``obj`` is not a callable object, class or module.
+ ValueError: Object's ``__annotations__`` is not a dictionary or ``None``.
+
+ Returns:
+ Annotation dictionary for objects.
+ """
+ ann: Union[Dict[str, Any], None]
+
+ if isinstance(obj, type):
+ # class
+ obj_dict = getattr(obj, "__dict__", None)
+ if obj_dict and hasattr(obj_dict, "get"):
+ ann = obj_dict.get("__annotations__", None)
+ if isinstance(ann, GetSetDescriptorType):
+ ann = None
+ else:
+ ann = None
+ elif isinstance(obj, ModuleType) or callable(obj):
+ # this includes types.ModuleType, types.Function, types.BuiltinFunctionType,
+ # types.BuiltinMethodType, functools.partial, functools.singledispatch,
+ # "class funclike" from Lib/test/test_inspect... on and on it goes.
+ ann = getattr(obj, "__annotations__", None)
+ else:
+ raise TypeError(f"{obj!r} is not a module, class, or callable.")
+
+ if ann is None:
+ return {}
+
+ if not isinstance(ann, dict):
+ raise ValueError( # noqa: TRY004
+ f"{obj!r}.__annotations__ is neither a dict nor None"
+ )
+
+ if not ann:
+ return {}
+
+ return dict(ann) \ No newline at end of file
diff --git a/pdm.lock b/pdm.lock
index 0315732..7ffee45 100644
--- a/pdm.lock
+++ b/pdm.lock
@@ -5,7 +5,7 @@
groups = ["default", "lint", "dev", "docs"]
strategy = ["cross_platform"]
lock_version = "4.4.1"
-content_hash = "sha256:01b1c497f1340087f8c5b728e930e69903dce579c47f26951fbe859693efabee"
+content_hash = "sha256:41672d2715aa8f2b55145fe7ef802655a8c3f0dafb1534e62294c9db9c4b03ed"
[[package]]
name = "alabaster"
@@ -422,6 +422,20 @@ files = [
]
[[package]]
+name = "loguru"
+version = "0.7.2"
+requires_python = ">=3.5"
+summary = "Python logging made (stupidly) simple"
+dependencies = [
+ "colorama>=0.3.4; sys_platform == \"win32\"",
+ "win32-setctime>=1.0.0; sys_platform == \"win32\"",
+]
+files = [
+ {file = "loguru-0.7.2-py3-none-any.whl", hash = "sha256:003d71e3d3ed35f0f8984898359d65b79e5b21943f78af86aa5491210429b8eb"},
+ {file = "loguru-0.7.2.tar.gz", hash = "sha256:e671a53522515f34fd406340ee968cb9ecafbc4b36c679da03c18fd8d0bd51ac"},
+]
+
+[[package]]
name = "lxml"
version = "5.2.1"
requires_python = ">=3.6"
@@ -1356,6 +1370,16 @@ files = [
]
[[package]]
+name = "win32-setctime"
+version = "1.1.0"
+requires_python = ">=3.5"
+summary = "A small Python utility to set file creation time on Windows"
+files = [
+ {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
+ {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
+]
+
+[[package]]
name = "zipp"
version = "3.18.1"
requires_python = ">=3.8"
diff --git a/pyproject.toml b/pyproject.toml
index df3b0c8..443f5e3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -10,6 +10,7 @@ authors = [{ name = "简律纯", email = "leader@hydroroll.team" }]
dependencies = [
"pdfquery>=0.4.3",
"pydantic>=2.7.4",
+ "loguru>=0.7.2",
]
requires-python = ">=3.9"
readme = "README.rst"