aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/hrc/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'hrc/core.py')
-rw-r--r--hrc/core.py135
1 files changed, 117 insertions, 18 deletions
diff --git a/hrc/core.py b/hrc/core.py
index 86f8927..66ad211 100644
--- a/hrc/core.py
+++ b/hrc/core.py
@@ -25,21 +25,21 @@ from typing import (
from pydantic import ValidationError, create_model
-from .config import ConfigModel, MainConfig, RuleConfig
-from .dependencies import solve_dependencies
-from .log import logger
-from .rule import Rule, RuleLoadType
-from .event import Event
-from .typing import CoreHook, EventHook, EventT, RuleHook # noqa: F401
-from .utils import (
+from hrc.config import ConfigModel, MainConfig, RuleConfig, ServiceConfig
+from hrc.dependencies import solve_dependencies
+from hrc.log import logger, error_or_exception
+from hrc.rule import Rule, RuleLoadType
+from hrc.event import Event
+from hrc.typing import CoreHook, EventHook, EventT, RuleHook, ServiceT, ServiceHook # noqa: F401
+from hrc.utils import (
ModulePathFinder,
get_classes_from_module_name,
is_config_class,
samefile,
wrap_get_func, # noqa: F401
)
-from .exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401
-
+from hrc.exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401
+from hrc.service import Service
if sys.version_info >= (3, 11): # pragma: no cover
import tomllib
@@ -51,7 +51,6 @@ HANDLED_SIGNALS = (
signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
)
-
class Core:
config: MainConfig
_current_event: Optional[Event[Any]]
@@ -60,21 +59,35 @@ class Core:
# pyright: ignore[reportUninitializedInstanceVariable]
should_exit: asyncio.Event
_restart_flag: bool # Restart flag
+ _extend_services: List[
+ Union[Type[Service[Any, Any]], str]
+ ] # A list of services loaded programmatically using the ``load_service()`` method
_extend_rules: List[Union[Type[Rule[Any, Any, Any]], str, Path]]
_extend_rule_dirs: List[Path]
rules_priority_dict: Dict[int, List[Type[Rule[Any, Any, Any]]]]
_config_file: Optional[str] # Configuration file
_config_dict: Optional[Dict[str, Any]] # Configuration dictionary
-
+
+ _core_run_hooks: List[CoreHook]
+ _core_exit_hooks: List[CoreHook]
+ _service_startup_hooks: List[ServiceHook]
+ _service_run_hooks: List[ServiceHook]
+ _service_shutdown_hooks: List[ServiceHook]
+ _event_preprocessor_hooks: List[EventHook]
+ _event_postprocessor_hooks: List[EventHook]
+
+ _service_tasks: Set[
+ "asyncio.Task[None]"
+ ] # Server task collection, used to hold references to server tasks
_condition: (
asyncio.Condition
) # Condition used to handle get # pyright: ignore[reportUninitializedInstanceVariable]
_rule_tasks: Set[
"asyncio.Task[None]"
- ] # Adapter task collection, used to hold references to adapter tasks
+ ] # Server task collection, used to hold references to server tasks
_handle_event_tasks: Set[
"asyncio.Task[None]"
- ] # Event handling task, used to keep a reference to the adapter task
+ ] # Event handling task, used to keep a reference to the service task
def __init__(
self,
@@ -84,6 +97,8 @@ class Core:
hot_reload: bool = False,
) -> None:
self.config = MainConfig()
+
+ self.services = []
self._current_event = None
self._config_file = config_file
self._config_dict = config_dict
@@ -92,13 +107,18 @@ class Core:
self._module_path_finder = ModulePathFinder()
self.rules_priority_dict = defaultdict(list)
self._raw_config_dict = {}
+ self._service_tasks = set()
self._rule_tasks = set()
self._handle_event_tasks = set()
+ self._extend_services = []
self._extend_rules = []
self._extend_rule_dirs = []
self._core_run_hooks = []
self._core_exit_hooks = []
+ self._service_startup_hooks = []
+ self._service_run_hooks = []
+ self._service_shutdown_hooks = []
self._rule_enable_hooks = []
self._rule_run_hooks = []
self._rule_disable_hooks = []
@@ -119,6 +139,7 @@ class Core:
if self._restart_flag:
self._load_rules_from_dirs(*self._extend_rule_dirs)
self._load_rules(*self._extend_rules)
+ self._load_services(*self._extend_services)
def restart(self) -> None:
logger.info("Restarting...")
@@ -146,10 +167,11 @@ class Core:
self._load_rules_from_dirs(*self.config.core.rule_dirs)
self._load_rules(*self.config.core.rules)
+ self._load_services(*self.config.core.services)
self._update_config()
logger.info("Running...")
-
+
hot_reload_task = None
if self._hot_reload: # pragma: no cover
hot_reload_task = asyncio.create_task(self._run_hot_reload())
@@ -158,6 +180,21 @@ class Core:
await core_run_hook_func(self)
try:
+ for _service in self.services:
+ for _service_startup_hook_func in self._service_startup_hooks:
+ await _service_startup_hook_func(_service)
+ try:
+ await _service.startup()
+ except Exception as e:
+ self.error_or_exception(f"Start service {_service!r} failed:", e)
+
+ for _service in self.services:
+ for _service_run_hook_func in self._service_run_hooks:
+ await _service_run_hook_func(_service)
+ _service_task = asyncio.create_task(_service.safe_run())
+ self._service_tasks.add(_service_task)
+ _service_task.add_done_callback(self._service_tasks.discard)
+
# TODO(简律纯): builtin rule enable hook function in every rules packages.
# for _rule in self.rules:
# for rule_enable_hook_func in self._rule_enable_hooks:
@@ -180,11 +217,19 @@ class Core:
if hot_reload_task is not None: # pragma: no cover
await hot_reload_task
finally:
+ for _service in self.services:
+ for service_shutdown_hook_func in self._service_shutdown_hooks:
+ await service_shutdown_hook_func(_service)
+ await _service.shutdown()
+
# TODO(简律纯): builtin rule disable hook function in every rules packages.
# for _rule in self.rules:
# for rule_disable_hook_func in self._rule_disable_hooks:
# await rule_disable_hook_func(_rule)
# await _rule.disable()
+
+ while self._service_tasks:
+ await asyncio.sleep(0)
while self._rule_tasks:
await asyncio.sleep(0)
@@ -192,6 +237,7 @@ class Core:
for core_exit_hook_func in self._core_exit_hooks:
await core_exit_hook_func(self)
+ self.services.clear()
self.rules.clear()
self.rules_priority_dict.clear()
self._module_path_finder.path.clear()
@@ -295,7 +341,7 @@ class Core:
def _update_config(self) -> None:
def update_config(
- source: List[Type[Rule[Any, Any, Any]]],
+ source: Union[List[Type[Rule[Any, Any, Any]]], List[Service[Any, Any]]],
name: str,
base: Type[ConfigModel],
) -> Tuple[Type[ConfigModel], ConfigModel]:
@@ -319,6 +365,7 @@ class Core:
self.config = create_model(
"Config",
rule=update_config(self.rules, "RuleConfig", RuleConfig),
+ service=update_config(self.services, "ServiceConfig", ServiceConfig),
__base__=MainConfig,
)(**self._raw_config_dict)
# Update the level of logging
@@ -382,7 +429,7 @@ class Core:
) -> None:
if show_log:
logger.info(
- f"Rule {current_event.rule.name} received: {current_event!r}")
+ f"Service {current_event.service.name} received: {current_event!r}")
if handle_get:
_handle_event_task = asyncio.create_task(self._handle_event())
@@ -594,6 +641,58 @@ class Core:
def load_rules_from_dirs(self, *dirs: Path) -> None:
self._extend_rule_dirs.extend(dirs)
self._load_rules_from_dirs(*dirs)
+
+ def _load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None:
+ for service_ in services:
+ service_object: Service[Any, Any]
+ try:
+ if isinstance(service_, type) and issubclass(service_, Service):
+ service_object = service_(self)
+ elif isinstance(service_, str):
+ service_classes = get_classes_from_module_name(service_, Service)
+ if not service_classes:
+ raise LoadModuleError( # noqa: TRY301
+ f"Can not find Service class in the {service_} module"
+ )
+ if len(service_classes) > 1:
+ raise LoadModuleError( # noqa: TRY301
+ f"More then one Service class in the {service_} module"
+ )
+ service_object = service_classes[0][0](self) # type: ignore
+ else:
+ raise TypeError( # noqa: TRY301
+ f"{service_} can not be loaded as service"
+ )
+ except Exception as e:
+ self.error_or_exception(f'Load service "{service_}" failed:', e)
+ else:
+ self.services.append(service_object)
+ logger.info(
+ f'Succeeded to load service "{service_object.__class__.__name__}" '
+ f'from "{service_}"'
+ )
+
+ def load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None:
+ self._extend_services.extend(services)
+ self._load_services(*services)
+
+ @overload
+ def get_service(self, service: str) -> Service[Any, Any]: ...
+
+ @overload
+ def get_service(self, service: Type[ServiceT]) -> ServiceT: ...
+
+ def get_service(
+ self, service: Union[str, Type[ServiceT]]
+ ) -> Union[Service[Any, Any], ServiceT]:
+ for _service in self.services:
+ if isinstance(service, str):
+ if _service.name == service:
+ return _service
+ elif isinstance(_service, service):
+ return _service
+ raise LookupError(f'Can not find service named "{service}"')
+
def get_rule(self, name: str) -> Type[Rule[Any, Any, Any]]:
for _rule in self.rules:
@@ -605,9 +704,9 @@ class Core:
self, message: str, exception: Exception
) -> None: # pragma: no cover
if self.config.core.log.verbose_exception:
- logger.exception(message)
+ error_or_exception(message)
else:
- logger.error(f"{message} {exception!r}")
+ error_or_exception(message, exception, verbose=False)
def core_run_hook(self, func: CoreHook) -> CoreHook:
self._core_run_hooks.append(func)