diff options
Diffstat (limited to 'hrc/core.py')
| -rw-r--r-- | hrc/core.py | 135 |
1 files changed, 117 insertions, 18 deletions
diff --git a/hrc/core.py b/hrc/core.py index 86f8927..66ad211 100644 --- a/hrc/core.py +++ b/hrc/core.py @@ -25,21 +25,21 @@ from typing import ( from pydantic import ValidationError, create_model -from .config import ConfigModel, MainConfig, RuleConfig -from .dependencies import solve_dependencies -from .log import logger -from .rule import Rule, RuleLoadType -from .event import Event -from .typing import CoreHook, EventHook, EventT, RuleHook # noqa: F401 -from .utils import ( +from hrc.config import ConfigModel, MainConfig, RuleConfig, ServiceConfig +from hrc.dependencies import solve_dependencies +from hrc.log import logger, error_or_exception +from hrc.rule import Rule, RuleLoadType +from hrc.event import Event +from hrc.typing import CoreHook, EventHook, EventT, RuleHook, ServiceT, ServiceHook # noqa: F401 +from hrc.utils import ( ModulePathFinder, get_classes_from_module_name, is_config_class, samefile, wrap_get_func, # noqa: F401 ) -from .exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401 - +from hrc.exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401 +from hrc.service import Service if sys.version_info >= (3, 11): # pragma: no cover import tomllib @@ -51,7 +51,6 @@ HANDLED_SIGNALS = ( signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`. ) - class Core: config: MainConfig _current_event: Optional[Event[Any]] @@ -60,21 +59,35 @@ class Core: # pyright: ignore[reportUninitializedInstanceVariable] should_exit: asyncio.Event _restart_flag: bool # Restart flag + _extend_services: List[ + Union[Type[Service[Any, Any]], str] + ] # A list of services loaded programmatically using the ``load_service()`` method _extend_rules: List[Union[Type[Rule[Any, Any, Any]], str, Path]] _extend_rule_dirs: List[Path] rules_priority_dict: Dict[int, List[Type[Rule[Any, Any, Any]]]] _config_file: Optional[str] # Configuration file _config_dict: Optional[Dict[str, Any]] # Configuration dictionary - + + _core_run_hooks: List[CoreHook] + _core_exit_hooks: List[CoreHook] + _service_startup_hooks: List[ServiceHook] + _service_run_hooks: List[ServiceHook] + _service_shutdown_hooks: List[ServiceHook] + _event_preprocessor_hooks: List[EventHook] + _event_postprocessor_hooks: List[EventHook] + + _service_tasks: Set[ + "asyncio.Task[None]" + ] # Server task collection, used to hold references to server tasks _condition: ( asyncio.Condition ) # Condition used to handle get # pyright: ignore[reportUninitializedInstanceVariable] _rule_tasks: Set[ "asyncio.Task[None]" - ] # Adapter task collection, used to hold references to adapter tasks + ] # Server task collection, used to hold references to server tasks _handle_event_tasks: Set[ "asyncio.Task[None]" - ] # Event handling task, used to keep a reference to the adapter task + ] # Event handling task, used to keep a reference to the service task def __init__( self, @@ -84,6 +97,8 @@ class Core: hot_reload: bool = False, ) -> None: self.config = MainConfig() + + self.services = [] self._current_event = None self._config_file = config_file self._config_dict = config_dict @@ -92,13 +107,18 @@ class Core: self._module_path_finder = ModulePathFinder() self.rules_priority_dict = defaultdict(list) self._raw_config_dict = {} + self._service_tasks = set() self._rule_tasks = set() self._handle_event_tasks = set() + self._extend_services = [] self._extend_rules = [] self._extend_rule_dirs = [] self._core_run_hooks = [] self._core_exit_hooks = [] + self._service_startup_hooks = [] + self._service_run_hooks = [] + self._service_shutdown_hooks = [] self._rule_enable_hooks = [] self._rule_run_hooks = [] self._rule_disable_hooks = [] @@ -119,6 +139,7 @@ class Core: if self._restart_flag: self._load_rules_from_dirs(*self._extend_rule_dirs) self._load_rules(*self._extend_rules) + self._load_services(*self._extend_services) def restart(self) -> None: logger.info("Restarting...") @@ -146,10 +167,11 @@ class Core: self._load_rules_from_dirs(*self.config.core.rule_dirs) self._load_rules(*self.config.core.rules) + self._load_services(*self.config.core.services) self._update_config() logger.info("Running...") - + hot_reload_task = None if self._hot_reload: # pragma: no cover hot_reload_task = asyncio.create_task(self._run_hot_reload()) @@ -158,6 +180,21 @@ class Core: await core_run_hook_func(self) try: + for _service in self.services: + for _service_startup_hook_func in self._service_startup_hooks: + await _service_startup_hook_func(_service) + try: + await _service.startup() + except Exception as e: + self.error_or_exception(f"Start service {_service!r} failed:", e) + + for _service in self.services: + for _service_run_hook_func in self._service_run_hooks: + await _service_run_hook_func(_service) + _service_task = asyncio.create_task(_service.safe_run()) + self._service_tasks.add(_service_task) + _service_task.add_done_callback(self._service_tasks.discard) + # TODO(简律纯): builtin rule enable hook function in every rules packages. # for _rule in self.rules: # for rule_enable_hook_func in self._rule_enable_hooks: @@ -180,11 +217,19 @@ class Core: if hot_reload_task is not None: # pragma: no cover await hot_reload_task finally: + for _service in self.services: + for service_shutdown_hook_func in self._service_shutdown_hooks: + await service_shutdown_hook_func(_service) + await _service.shutdown() + # TODO(简律纯): builtin rule disable hook function in every rules packages. # for _rule in self.rules: # for rule_disable_hook_func in self._rule_disable_hooks: # await rule_disable_hook_func(_rule) # await _rule.disable() + + while self._service_tasks: + await asyncio.sleep(0) while self._rule_tasks: await asyncio.sleep(0) @@ -192,6 +237,7 @@ class Core: for core_exit_hook_func in self._core_exit_hooks: await core_exit_hook_func(self) + self.services.clear() self.rules.clear() self.rules_priority_dict.clear() self._module_path_finder.path.clear() @@ -295,7 +341,7 @@ class Core: def _update_config(self) -> None: def update_config( - source: List[Type[Rule[Any, Any, Any]]], + source: Union[List[Type[Rule[Any, Any, Any]]], List[Service[Any, Any]]], name: str, base: Type[ConfigModel], ) -> Tuple[Type[ConfigModel], ConfigModel]: @@ -319,6 +365,7 @@ class Core: self.config = create_model( "Config", rule=update_config(self.rules, "RuleConfig", RuleConfig), + service=update_config(self.services, "ServiceConfig", ServiceConfig), __base__=MainConfig, )(**self._raw_config_dict) # Update the level of logging @@ -382,7 +429,7 @@ class Core: ) -> None: if show_log: logger.info( - f"Rule {current_event.rule.name} received: {current_event!r}") + f"Service {current_event.service.name} received: {current_event!r}") if handle_get: _handle_event_task = asyncio.create_task(self._handle_event()) @@ -594,6 +641,58 @@ class Core: def load_rules_from_dirs(self, *dirs: Path) -> None: self._extend_rule_dirs.extend(dirs) self._load_rules_from_dirs(*dirs) + + def _load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None: + for service_ in services: + service_object: Service[Any, Any] + try: + if isinstance(service_, type) and issubclass(service_, Service): + service_object = service_(self) + elif isinstance(service_, str): + service_classes = get_classes_from_module_name(service_, Service) + if not service_classes: + raise LoadModuleError( # noqa: TRY301 + f"Can not find Service class in the {service_} module" + ) + if len(service_classes) > 1: + raise LoadModuleError( # noqa: TRY301 + f"More then one Service class in the {service_} module" + ) + service_object = service_classes[0][0](self) # type: ignore + else: + raise TypeError( # noqa: TRY301 + f"{service_} can not be loaded as service" + ) + except Exception as e: + self.error_or_exception(f'Load service "{service_}" failed:', e) + else: + self.services.append(service_object) + logger.info( + f'Succeeded to load service "{service_object.__class__.__name__}" ' + f'from "{service_}"' + ) + + def load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None: + self._extend_services.extend(services) + self._load_services(*services) + + @overload + def get_service(self, service: str) -> Service[Any, Any]: ... + + @overload + def get_service(self, service: Type[ServiceT]) -> ServiceT: ... + + def get_service( + self, service: Union[str, Type[ServiceT]] + ) -> Union[Service[Any, Any], ServiceT]: + for _service in self.services: + if isinstance(service, str): + if _service.name == service: + return _service + elif isinstance(_service, service): + return _service + raise LookupError(f'Can not find service named "{service}"') + def get_rule(self, name: str) -> Type[Rule[Any, Any, Any]]: for _rule in self.rules: @@ -605,9 +704,9 @@ class Core: self, message: str, exception: Exception ) -> None: # pragma: no cover if self.config.core.log.verbose_exception: - logger.exception(message) + error_or_exception(message) else: - logger.error(f"{message} {exception!r}") + error_or_exception(message, exception, verbose=False) def core_run_hook(self, func: CoreHook) -> CoreHook: self._core_run_hooks.append(func) |
