diff options
Diffstat (limited to 'src/hrc/service/__init__.py')
| -rw-r--r-- | src/hrc/service/__init__.py | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/src/hrc/service/__init__.py b/src/hrc/service/__init__.py new file mode 100644 index 0000000..f153d5b --- /dev/null +++ b/src/hrc/service/__init__.py @@ -0,0 +1,111 @@ +import os +from abc import ABC, abstractmethod +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Generic, + Optional, + Type, + TypeVar, + Union, + final, + overload, +) + +from hrc.event import Event +from hrc.typing import ConfigT, EventT +from hrc.utils import is_config_class + +if TYPE_CHECKING: + from ..core import Core + +__all__ = ["Server"] + +if os.getenv("IAMAI_DEV") == "1": # pragma: no cover + # 当处于开发环境时,使用 pkg_resources 风格的命名空间包 + __import__("pkg_resources").declare_namespace(__name__) + + +_EventT = TypeVar("_EventT", bound="Event[Any]") + + +class Service(Generic[EventT, ConfigT], ABC): + name: str + core: "Core" + Config: Type[ConfigT] + + def __init__(self, core: "Core") -> None: + if not hasattr(self, "name"): + self.name = self.__class__.__name__ + self.core: Core = core + self.handle_event = self.core.handle_event + + @property + def config(self) -> ConfigT: + default: Any = None + config_class = getattr(self, "Config", None) + if is_config_class(config_class): + return getattr( + self.core.config.service, + config_class.__config_name__, + default, + ) + return default + + @final + async def safe_run(self) -> None: + try: + await self.run() + except Exception as e: + self.core.error_or_exception( + f"Run service {self.__class__.__name__} failed:", e + ) + + @abstractmethod + async def run(self) -> None: + raise NotImplementedError + + async def startup(self) -> None: + ... + + async def shutdown(self) -> None: + ... + + @overload + async def get( + self, + func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: None = None, + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> EventT: ... + + @overload + async def get( + self, + func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: Type[_EventT], + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> _EventT: ... + + @final + async def get( + self, + func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: Any = None, + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> Event[Any]: + return await self.core.get( + func, + event_type=event_type, + server_type=type(self), + max_try_times=max_try_times, + timeout=timeout, + )
\ No newline at end of file |
