aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/hrc/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'hrc/core.py')
-rw-r--r--hrc/core.py737
1 files changed, 0 insertions, 737 deletions
diff --git a/hrc/core.py b/hrc/core.py
deleted file mode 100644
index 66ad211..0000000
--- a/hrc/core.py
+++ /dev/null
@@ -1,737 +0,0 @@
-import asyncio
-import json
-import pkgutil
-import signal
-import sys
-import threading
-import time # noqa: F401
-from collections import defaultdict
-from contextlib import AsyncExitStack
-from itertools import chain
-from pathlib import Path
-from typing import (
- Any,
- Awaitable, # noqa: F401
- Callable, # noqa: F401
- Dict,
- List,
- Optional,
- Set, # noqa: F401
- Tuple,
- Type,
- Union,
- overload, # noqa: F401
-)
-
-from pydantic import ValidationError, create_model
-
-from hrc.config import ConfigModel, MainConfig, RuleConfig, ServiceConfig
-from hrc.dependencies import solve_dependencies
-from hrc.log import logger, error_or_exception
-from hrc.rule import Rule, RuleLoadType
-from hrc.event import Event
-from hrc.typing import CoreHook, EventHook, EventT, RuleHook, ServiceT, ServiceHook # noqa: F401
-from hrc.utils import (
- ModulePathFinder,
- get_classes_from_module_name,
- is_config_class,
- samefile,
- wrap_get_func, # noqa: F401
-)
-from hrc.exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401
-from hrc.service import Service
-
-if sys.version_info >= (3, 11): # pragma: no cover
- import tomllib
-else: # pragma: no cover
- import tomli as tomllib
-
-HANDLED_SIGNALS = (
- signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
- signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
-)
-
-class Core:
- config: MainConfig
- _current_event: Optional[Event[Any]]
- _module_path_finder: ModulePathFinder
- _hot_reload: bool
- # pyright: ignore[reportUninitializedInstanceVariable]
- should_exit: asyncio.Event
- _restart_flag: bool # Restart flag
- _extend_services: List[
- Union[Type[Service[Any, Any]], str]
- ] # A list of services loaded programmatically using the ``load_service()`` method
- _extend_rules: List[Union[Type[Rule[Any, Any, Any]], str, Path]]
- _extend_rule_dirs: List[Path]
- rules_priority_dict: Dict[int, List[Type[Rule[Any, Any, Any]]]]
- _config_file: Optional[str] # Configuration file
- _config_dict: Optional[Dict[str, Any]] # Configuration dictionary
-
- _core_run_hooks: List[CoreHook]
- _core_exit_hooks: List[CoreHook]
- _service_startup_hooks: List[ServiceHook]
- _service_run_hooks: List[ServiceHook]
- _service_shutdown_hooks: List[ServiceHook]
- _event_preprocessor_hooks: List[EventHook]
- _event_postprocessor_hooks: List[EventHook]
-
- _service_tasks: Set[
- "asyncio.Task[None]"
- ] # Server task collection, used to hold references to server tasks
- _condition: (
- asyncio.Condition
- ) # Condition used to handle get # pyright: ignore[reportUninitializedInstanceVariable]
- _rule_tasks: Set[
- "asyncio.Task[None]"
- ] # Server task collection, used to hold references to server tasks
- _handle_event_tasks: Set[
- "asyncio.Task[None]"
- ] # Event handling task, used to keep a reference to the service task
-
- def __init__(
- self,
- *,
- config_file: Optional[str] = "config.toml",
- config_dict: Optional[Dict[str, Any]] = None,
- hot_reload: bool = False,
- ) -> None:
- self.config = MainConfig()
-
- self.services = []
- self._current_event = None
- self._config_file = config_file
- self._config_dict = config_dict
- self._hot_reload = hot_reload
- self._restart_flag = False
- self._module_path_finder = ModulePathFinder()
- self.rules_priority_dict = defaultdict(list)
- self._raw_config_dict = {}
- self._service_tasks = set()
- self._rule_tasks = set()
- self._handle_event_tasks = set()
-
- self._extend_services = []
- self._extend_rules = []
- self._extend_rule_dirs = []
- self._core_run_hooks = []
- self._core_exit_hooks = []
- self._service_startup_hooks = []
- self._service_run_hooks = []
- self._service_shutdown_hooks = []
- self._rule_enable_hooks = []
- self._rule_run_hooks = []
- self._rule_disable_hooks = []
- self._event_preprocessor_hooks = []
- self._event_postprocessor_hooks = []
-
- sys.meta_path.insert(0, self._module_path_finder)
-
- @property
- def rules(self) -> List[Type[Rule[Any, Any, Any]]]:
- return list(chain(*self.rules_priority_dict.values()))
-
- def run(self) -> None:
- self._restart_flag = True
- while self._restart_flag:
- self._restart_flag = False
- asyncio.run(self._run())
- if self._restart_flag:
- self._load_rules_from_dirs(*self._extend_rule_dirs)
- self._load_rules(*self._extend_rules)
- self._load_services(*self._extend_services)
-
- def restart(self) -> None:
- logger.info("Restarting...")
- self._restart_flag = True
- self.should_exit.set()
-
- async def _run(self) -> None:
- self.should_exit = asyncio.Event()
- self._condition = asyncio.Condition()
-
- # Monitor and intercept system exit signals to complete some aftermath work before closing the program
- if threading.current_thread() is threading.main_thread(): # pragma: no cover
- # Signals can only be processed in the main thread
- try:
- loop = asyncio.get_running_loop()
- for sig in HANDLED_SIGNALS:
- loop.add_signal_handler(sig, self._handle_exit)
- except NotImplementedError:
- # add_signal_handler is only available under Unix, below for Windows
- for sig in HANDLED_SIGNALS:
- signal.signal(sig, self._handle_exit)
-
- # Load configuration file
- self._reload_config_dict()
-
- self._load_rules_from_dirs(*self.config.core.rule_dirs)
- self._load_rules(*self.config.core.rules)
- self._load_services(*self.config.core.services)
- self._update_config()
-
- logger.info("Running...")
-
- hot_reload_task = None
- if self._hot_reload: # pragma: no cover
- hot_reload_task = asyncio.create_task(self._run_hot_reload())
-
- for core_run_hook_func in self._core_run_hooks:
- await core_run_hook_func(self)
-
- try:
- for _service in self.services:
- for _service_startup_hook_func in self._service_startup_hooks:
- await _service_startup_hook_func(_service)
- try:
- await _service.startup()
- except Exception as e:
- self.error_or_exception(f"Start service {_service!r} failed:", e)
-
- for _service in self.services:
- for _service_run_hook_func in self._service_run_hooks:
- await _service_run_hook_func(_service)
- _service_task = asyncio.create_task(_service.safe_run())
- self._service_tasks.add(_service_task)
- _service_task.add_done_callback(self._service_tasks.discard)
-
- # TODO(简律纯): builtin rule enable hook function in every rules packages.
- # for _rule in self.rules:
- # for rule_enable_hook_func in self._rule_enable_hooks:
- # await rule_enable_hook_func(_rule)
- # try:
- # await _rule.enable()
- # except Exception as e:
- # self.error_or_exception(
- # f"Enable rule {_rule!r} failed:", e)
- # TODO(简律纯): builtin rule run hook function in every rules packages.
- # for _rule in self.rules:
- # for rule_run_hook_func in self._rule_run_hooks:
- # await rule_run_hook_func(_rule)
- # _rule_task = asyncio.create_task(_rule.safe_run())
- # self._rule_tasks.add(_rule_task)
- # _rule_task.add_done_callback(self._rule_tasks.discard)
-
- await self.should_exit.wait()
-
- if hot_reload_task is not None: # pragma: no cover
- await hot_reload_task
- finally:
- for _service in self.services:
- for service_shutdown_hook_func in self._service_shutdown_hooks:
- await service_shutdown_hook_func(_service)
- await _service.shutdown()
-
- # TODO(简律纯): builtin rule disable hook function in every rules packages.
- # for _rule in self.rules:
- # for rule_disable_hook_func in self._rule_disable_hooks:
- # await rule_disable_hook_func(_rule)
- # await _rule.disable()
-
- while self._service_tasks:
- await asyncio.sleep(0)
-
- while self._rule_tasks:
- await asyncio.sleep(0)
-
- for core_exit_hook_func in self._core_exit_hooks:
- await core_exit_hook_func(self)
-
- self.services.clear()
- self.rules.clear()
- self.rules_priority_dict.clear()
- self._module_path_finder.path.clear()
-
- def _remove_rule_by_path(
- self, file: Path
- ) -> List[Type[Rule[Any, Any, Any]]]: # pragma: no cover
- removed_rules: List[Type[Rule[Any, Any, Any]]] = []
- for rules in self.rules_priority_dict.values():
- _removed_rules = list(
- filter(
- lambda x: x.__rule_load_type__ != RuleLoadType.CLASS
- and x.__rule_file_path__ is not None
- and samefile(x.__rule_file_path__, file),
- rules,
- )
- )
- removed_rules.extend(_removed_rules)
- for rule_ in _removed_rules:
- rules.remove(rule_)
- logger.info(
- "Succeeded to remove rule " f'"{rule_.__name__}" from file "{file}"'
- )
- return removed_rules
-
- async def _run_hot_reload(self) -> None: # pragma: no cover
- """Hot reload."""
- try:
- from watchfiles import Change, awatch
- except ImportError:
- logger.warning(
- 'Hot reload needs to install "watchfiles", try "pip install watchfiles"'
- )
- return
-
- logger.info("Hot reload is working!")
- async for changes in awatch(
- *(
- x.resolve()
- for x in set(self._extend_rule_dirs)
- .union(self.config.core.rule_dirs)
- .union(
- {Path(self._config_file)}
- if self._config_dict is None and self._config_file is not None
- else set()
- )
- ),
- stop_event=self.should_exit,
- ):
- # Processed in the order of Change.deleted, Change.modified, Change.added
- # To ensure that when renaming occurs, deletions are processed first and then additions are processed
- for change_type, file_ in sorted(changes, key=lambda x: x[0], reverse=True):
- file = Path(file_)
- # Change configuration file
- if (
- self._config_file is not None
- and samefile(self._config_file, file)
- and change_type == change_type.modified
- ):
- logger.info(f'Reload config file "{self._config_file}"')
- old_config = self.config
- self._reload_config_dict()
- if (
- self.config.core != old_config.core
- or self.config.rule != old_config.rule
- ):
- self.restart()
- continue
-
- # Change rule folder
- if change_type == Change.deleted:
- # Special handling for deletion operations
- if file.suffix != ".py":
- file = file / "__init__.py"
- else:
- if file.is_dir() and (file / "__init__.py").is_file():
- # When a new directory is added and this directory contains the ``__init__.py`` file
- # It means that what happens at this time is that a Python package is added, and the ``__init__.py`` file of this package is deemed to be added
- file = file / "__init__.py"
- if not (file.is_file() and file.suffix == ".py"):
- continue
-
- if change_type == Change.added:
- logger.info(f"Hot reload: Added file: {file}")
- self._load_rules(
- Path(file), rule_load_type=RuleLoadType.DIR, reload=True
- )
- self._update_config()
- continue
- if change_type == Change.deleted:
- logger.info(f"Hot reload: Deleted file: {file}")
- self._remove_rule_by_path(file)
- self._update_config()
- elif change_type == Change.modified:
- logger.info(f"Hot reload: Modified file: {file}")
- self._remove_rule_by_path(file)
- self._load_rules(
- Path(file), rule_load_type=RuleLoadType.DIR, reload=True
- )
- self._update_config()
-
- def _update_config(self) -> None:
- def update_config(
- source: Union[List[Type[Rule[Any, Any, Any]]], List[Service[Any, Any]]],
- name: str,
- base: Type[ConfigModel],
- ) -> Tuple[Type[ConfigModel], ConfigModel]:
- config_update_dict: Dict[str, Any] = {}
- for i in source:
- config_class = getattr(i, "Config", None)
- if is_config_class(config_class):
- default_value: Any
- try:
- default_value = config_class()
- except ValidationError:
- default_value = ...
- config_update_dict[config_class.__config_name__] = (
- config_class,
- default_value,
- )
- config_model = create_model(
- name, **config_update_dict, __base__=base)
- return config_model, config_model()
-
- self.config = create_model(
- "Config",
- rule=update_config(self.rules, "RuleConfig", RuleConfig),
- service=update_config(self.services, "ServiceConfig", ServiceConfig),
- __base__=MainConfig,
- )(**self._raw_config_dict)
- # Update the level of logging
- logger.remove()
- logger.add(sys.stderr, level=self.config.core.log.level)
-
- def _reload_config_dict(self) -> None:
- """Reload the configuration file."""
- self._raw_config_dict = {}
-
- if self._config_dict is not None:
- self._raw_config_dict = self._config_dict
- elif self._config_file is not None:
- try:
- with Path(self._config_file).open("rb") as f:
- if self._config_file.endswith(".json"):
- self._raw_config_dict = json.load(f)
- elif self._config_file.endswith(".toml"):
- self._raw_config_dict = tomllib.load(f)
- else:
- self.error_or_exception(
- "Read config file failed:",
- OSError("Unable to determine config file type"),
- )
- except OSError as e:
- self.error_or_exception("Can not open config file:", e)
- except (ValueError, json.JSONDecodeError, tomllib.TOMLDecodeError) as e:
- self.error_or_exception("Read config file failed:", e)
-
- try:
- self.config = MainConfig(**self._raw_config_dict)
- except ValidationError as e:
- self.config = MainConfig()
- self.error_or_exception("Config dict parse error:", e)
- self._update_config()
-
- def reload_rules(self) -> None:
- """Manually reload all rules."""
- self.rules_priority_dict.clear()
- self._load_rules(*self.config.core.rules)
- self._load_rules_from_dirs(*self.config.core.rule_dirs)
- self._load_rules(*self._extend_rules)
- self._load_rules_from_dirs(*self._extend_rule_dirs)
- self._update_config()
-
- def _handle_exit(self, *_args: Any) -> None: # pragma: no cover
- """When the robot receives the exit signal, it will handle it according to the situation."""
- logger.info("Stopping...")
- if self.should_exit.is_set():
- logger.warning("Force Exit...")
- sys.exit()
- else:
- self.should_exit.set()
-
- async def handle_event(
- self,
- current_event: Event[Any],
- *,
- handle_get: bool = True,
- show_log: bool = True,
- ) -> None:
- if show_log:
- logger.info(
- f"Service {current_event.service.name} received: {current_event!r}")
-
- if handle_get:
- _handle_event_task = asyncio.create_task(self._handle_event())
- self._handle_event_tasks.add(_handle_event_task)
- _handle_event_task.add_done_callback(
- self._handle_event_tasks.discard)
- await asyncio.sleep(0)
- async with self._condition:
- self._current_event = current_event
- self._condition.notify_all()
- else:
- _handle_event_task = asyncio.create_task(
- self._handle_event(current_event))
- self._handle_event_tasks.add(_handle_event_task)
- _handle_event_task.add_done_callback(
- self._handle_event_tasks.discard)
-
- async def _handle_event(self, current_event: Optional[Event[Any]] = None) -> None:
- if current_event is None:
- async with self._condition:
- await self._condition.wait()
- assert self._current_event is not None
- current_event = self._current_event
- if current_event.__handled__:
- return
-
- for _hook_func in self._event_preprocessor_hooks:
- await _hook_func(current_event)
-
- for rule_priority in sorted(self.rules_priority_dict.keys()):
- logger.debug(
- f"Checking for matching rules with priority {rule_priority!r}"
- )
- stop = False
- for rule in self.rules_priority_dict[rule_priority]:
- try:
- async with AsyncExitStack() as stack:
- _rule = await solve_dependencies(
- rule,
- use_cache=True,
- stack=stack,
- dependency_cache={
- Core: self,
- Event: current_event,
- },
- )
- if _rule.name not in self.rule_state:
- rule_state = _rule.__init_state__()
- if rule_state is not None:
- self.rule_state[_rule.name] = rule_state
- # TODO(简律纯): Refactor event handle process with General Rules Package Standard
- if await _rule.rule():
- logger.info(f"Event will be handled by {_rule!r}")
- try:
- await _rule.handle()
- finally:
- if _rule.block:
- stop = True
- except SkipException:
- # The plug-in requires that it skips itself and continues the current event propagation
- continue
- except StopException:
- # rule requires stopping current event propagation
- stop = True
- except Exception as e:
- self.error_or_exception(f'Exception in rule "{rule}":', e)
- if stop:
- break
-
- for _hook_func in self._event_postprocessor_hooks:
- await _hook_func(current_event)
-
- logger.info("Event Finished")
-
- def _load_rule_class(
- self,
- rule_class: Type[Rule[Any, Any, Any]],
- rule_load_type: RuleLoadType,
- rule_file_path: Optional[str],
- ) -> None:
- """Load a rule class"""
- priority = getattr(rule_class, "priority", None)
- if isinstance(priority, int) and priority >= 0:
- for _rule in self.rules:
- if _rule.__name__ == rule_class.__name__:
- logger.warning(
- f'Already have a same name rule "{_rule.__name__}"'
- )
- rule_class.__rule_load_type__ = rule_load_type
- rule_class.__rule_file_path__ = rule_file_path
- self.rules_priority_dict[priority].append(rule_class)
- logger.info(
- f'Succeeded to load rule "{rule_class.__name__}" '
- f'from class "{rule_class!r}"'
- )
- else:
- self.error_or_exception(
- f'Load rule from class "{rule_class!r}" failed:',
- LoadModuleError(
- f'rule priority incorrect in the class "{rule_class!r}"'
- ),
- )
-
- def _load_rules_from_module_name(
- self,
- module_name: str,
- *,
- rule_load_type: RuleLoadType,
- reload: bool = False,
- ) -> None:
- """Load rules from the given module."""
- try:
- rule_classes = get_classes_from_module_name(
- module_name, Rule, reload=reload
- )
- except ImportError as e:
- self.error_or_exception(
- f'Import module "{module_name}" failed:', e)
- else:
- for rule_class, module in rule_classes:
- self._load_rule_class(
- rule_class, # type: ignore
- rule_load_type,
- module.__file__,
- )
-
- def _load_rules(
- self,
- *rules: Union[Type[Rule[Any, Any, Any]], str, Path],
- rule_load_type: Optional[RuleLoadType] = None,
- reload: bool = False,
- ) -> None:
- for rule_ in rules:
- try:
- if isinstance(rule_, type) and issubclass(rule_, Rule):
- self._load_rule_class(
- rule_, rule_load_type or RuleLoadType.CLASS, None
- )
- elif isinstance(rule_, str):
- logger.info(f'Loading rules from module "{rule_}"')
- self._load_rules_from_module_name(
- rule_,
- rule_load_type=rule_load_type or RuleLoadType.NAME,
- reload=reload,
- )
- elif isinstance(rule_, Path):
- logger.info(f'Loading rules from path "{rule_}"')
- if not rule_.is_file():
- raise LoadModuleError( # noqa: TRY301
- f'The rule path "{rule_}" must be a file'
- )
-
- if rule_.suffix != ".py":
- raise LoadModuleError( # noqa: TRY301
- f'The path "{rule_}" must endswith ".py"'
- )
-
- rule_module_name = None
- for path in self._module_path_finder.path:
- try:
- if rule_.stem == "__init__":
- if rule_.resolve().parent.parent.samefile(Path(path)):
- rule_module_name = rule_.resolve().parent.name
- break
- elif rule_.resolve().parent.samefile(Path(path)):
- rule_module_name = rule_.stem
- break
- except OSError:
- continue
- if rule_module_name is None:
- rel_path = rule_.resolve().relative_to(Path().resolve())
- if rel_path.stem == "__init__":
- rule_module_name = ".".join(rel_path.parts[:-1])
- else:
- rule_module_name = ".".join(
- rel_path.parts[:-1] + (rel_path.stem,)
- )
-
- self._load_rules_from_module_name(
- rule_module_name,
- rule_load_type=rule_load_type or RuleLoadType.FILE,
- reload=reload,
- )
- else:
- raise TypeError( # noqa: TRY301
- f"{rule_} can not be loaded as rule"
- )
- except Exception as e:
- self.error_or_exception(f'Load rule "{rule_}" failed:', e)
-
- def load_rules(
- self, *rules: Union[Type[Rule[Any, Any, Any]], str, Path]
- ) -> None:
- self._extend_rules.extend(rules)
-
- return self._load_rules(*rules)
-
- def _load_rules_from_dirs(self, *dirs: Path) -> None:
- dir_list = [str(x.resolve()) for x in dirs]
- logger.info(
- f'Loading rules from dirs "{", ".join(map(str, dir_list))}"')
- self._module_path_finder.path.extend(dir_list)
- for module_info in pkgutil.iter_modules(dir_list):
- if not module_info.name.startswith("_"):
- self._load_rules_from_module_name(
- module_info.name, rule_load_type=RuleLoadType.DIR
- )
-
- def load_rules_from_dirs(self, *dirs: Path) -> None:
- self._extend_rule_dirs.extend(dirs)
- self._load_rules_from_dirs(*dirs)
-
- def _load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None:
- for service_ in services:
- service_object: Service[Any, Any]
- try:
- if isinstance(service_, type) and issubclass(service_, Service):
- service_object = service_(self)
- elif isinstance(service_, str):
- service_classes = get_classes_from_module_name(service_, Service)
- if not service_classes:
- raise LoadModuleError( # noqa: TRY301
- f"Can not find Service class in the {service_} module"
- )
- if len(service_classes) > 1:
- raise LoadModuleError( # noqa: TRY301
- f"More then one Service class in the {service_} module"
- )
- service_object = service_classes[0][0](self) # type: ignore
- else:
- raise TypeError( # noqa: TRY301
- f"{service_} can not be loaded as service"
- )
- except Exception as e:
- self.error_or_exception(f'Load service "{service_}" failed:', e)
- else:
- self.services.append(service_object)
- logger.info(
- f'Succeeded to load service "{service_object.__class__.__name__}" '
- f'from "{service_}"'
- )
-
- def load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None:
- self._extend_services.extend(services)
- self._load_services(*services)
-
- @overload
- def get_service(self, service: str) -> Service[Any, Any]: ...
-
- @overload
- def get_service(self, service: Type[ServiceT]) -> ServiceT: ...
-
- def get_service(
- self, service: Union[str, Type[ServiceT]]
- ) -> Union[Service[Any, Any], ServiceT]:
- for _service in self.services:
- if isinstance(service, str):
- if _service.name == service:
- return _service
- elif isinstance(_service, service):
- return _service
- raise LookupError(f'Can not find service named "{service}"')
-
-
- def get_rule(self, name: str) -> Type[Rule[Any, Any, Any]]:
- for _rule in self.rules:
- if _rule.__name__ == name:
- return _rule
- raise LookupError(f'Can not find rule named "{name}"')
-
- def error_or_exception(
- self, message: str, exception: Exception
- ) -> None: # pragma: no cover
- if self.config.core.log.verbose_exception:
- error_or_exception(message)
- else:
- error_or_exception(message, exception, verbose=False)
-
- def core_run_hook(self, func: CoreHook) -> CoreHook:
- self._core_run_hooks.append(func)
- return func
-
- def core_exit_hook(self, func: CoreHook) -> CoreHook:
- self._core_exit_hooks.append(func)
- return func
-
- def rule_enable_hook(self, func: RuleHook) -> RuleHook:
- self._rule_enable_hooks.append(func)
- return func
-
- def rule_run_hook(self, func: RuleHook) -> RuleHook:
- self._rule_run_hooks.append(func)
- return func
-
- def rule_disable_hook(self, func: RuleHook) -> RuleHook:
- self._rule_disable_hooks.append(func)
- return func
-
- def event_preprocessor_hook(self, func: EventHook) -> EventHook:
- self._event_preprocessor_hooks.append(func)
- return func
-
- def event_postprocessor_hook(self, func: EventHook) -> EventHook:
- self._event_postprocessor_hooks.append(func)
- return func