diff options
Diffstat (limited to 'hrc')
| -rw-r--r-- | hrc/__init__.py | 14 | ||||
| -rw-r--r-- | hrc/config.py | 15 | ||||
| -rw-r--r-- | hrc/core.py | 135 | ||||
| -rw-r--r-- | hrc/dependencies.py | 11 | ||||
| -rw-r--r-- | hrc/dev/__init__.py | 2 | ||||
| -rw-r--r-- | hrc/dev/api/__init__.py | 0 | ||||
| -rw-r--r-- | hrc/dev/echo.py | 98 | ||||
| -rw-r--r-- | hrc/dev/grps/v1.py | 22 | ||||
| -rw-r--r-- | hrc/doc/__init__.py | 1 | ||||
| -rw-r--r-- | hrc/event.py | 16 | ||||
| -rw-r--r-- | hrc/log.py | 9 | ||||
| -rw-r--r-- | hrc/rule/__init__.py | 18 | ||||
| -rw-r--r-- | hrc/service/__init__.py | 111 | ||||
| -rw-r--r-- | hrc/service/console/__init__.py | 41 | ||||
| -rw-r--r-- | hrc/service/http/__init__.py | 33 | ||||
| -rw-r--r-- | hrc/service/utils.py | 256 | ||||
| -rw-r--r-- | hrc/service/websocket/__init__.py | 30 | ||||
| -rw-r--r-- | hrc/typing.py | 11 | ||||
| -rw-r--r-- | hrc/utils.py | 4 |
19 files changed, 742 insertions, 85 deletions
diff --git a/hrc/__init__.py b/hrc/__init__.py index b607349..89bc832 100644 --- a/hrc/__init__.py +++ b/hrc/__init__.py @@ -1,13 +1 @@ -from .LibCore import * # noqa: F403 - -from . import rule # noqa: F401 -from . import core # noqa: F401 -from . import log # noqa: F401 -from . import exceptions # noqa: F401 -from . import config # noqa: F401 -from . import dependencies # noqa: F401 -from . import event # noqa: F401 -from . import perf # noqa: F401 -from . import feat # noqa: F401 -from . import doc # noqa: F401 -from . import dev # noqa: F401 +from .LibCore import * # noqa: F403
\ No newline at end of file diff --git a/hrc/config.py b/hrc/config.py index b6458b2..d179258 100644 --- a/hrc/config.py +++ b/hrc/config.py @@ -1,4 +1,4 @@ -from typing import Set, Union +from typing import Literal, Optional, Set, Union from pydantic import BaseModel, ConfigDict, DirectoryPath, Field @@ -14,16 +14,21 @@ class LogConfig(ConfigModel): verbose_exception: bool = False +class ServiceConfig(ConfigModel): + """Service configuration.""" + + class CoreConfig(ConfigModel): rules: Set[str] = Field(default_factory=set) rule_dirs: Set[DirectoryPath] = Field(default_factory=set) log: LogConfig = LogConfig() - + services: Set[str] = Field(default_factory=set) class RuleConfig(ConfigModel): - """rule configuration.""" - - + """Rule configuration.""" + + class MainConfig(ConfigModel): core: CoreConfig = CoreConfig() rule: RuleConfig = RuleConfig() + service: ServiceConfig = ServiceConfig() diff --git a/hrc/core.py b/hrc/core.py index 86f8927..66ad211 100644 --- a/hrc/core.py +++ b/hrc/core.py @@ -25,21 +25,21 @@ from typing import ( from pydantic import ValidationError, create_model -from .config import ConfigModel, MainConfig, RuleConfig -from .dependencies import solve_dependencies -from .log import logger -from .rule import Rule, RuleLoadType -from .event import Event -from .typing import CoreHook, EventHook, EventT, RuleHook # noqa: F401 -from .utils import ( +from hrc.config import ConfigModel, MainConfig, RuleConfig, ServiceConfig +from hrc.dependencies import solve_dependencies +from hrc.log import logger, error_or_exception +from hrc.rule import Rule, RuleLoadType +from hrc.event import Event +from hrc.typing import CoreHook, EventHook, EventT, RuleHook, ServiceT, ServiceHook # noqa: F401 +from hrc.utils import ( ModulePathFinder, get_classes_from_module_name, is_config_class, samefile, wrap_get_func, # noqa: F401 ) -from .exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401 - +from hrc.exceptions import StopException, SkipException, GetEventTimeout, LoadModuleError # noqa: F401 +from hrc.service import Service if sys.version_info >= (3, 11): # pragma: no cover import tomllib @@ -51,7 +51,6 @@ HANDLED_SIGNALS = ( signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`. ) - class Core: config: MainConfig _current_event: Optional[Event[Any]] @@ -60,21 +59,35 @@ class Core: # pyright: ignore[reportUninitializedInstanceVariable] should_exit: asyncio.Event _restart_flag: bool # Restart flag + _extend_services: List[ + Union[Type[Service[Any, Any]], str] + ] # A list of services loaded programmatically using the ``load_service()`` method _extend_rules: List[Union[Type[Rule[Any, Any, Any]], str, Path]] _extend_rule_dirs: List[Path] rules_priority_dict: Dict[int, List[Type[Rule[Any, Any, Any]]]] _config_file: Optional[str] # Configuration file _config_dict: Optional[Dict[str, Any]] # Configuration dictionary - + + _core_run_hooks: List[CoreHook] + _core_exit_hooks: List[CoreHook] + _service_startup_hooks: List[ServiceHook] + _service_run_hooks: List[ServiceHook] + _service_shutdown_hooks: List[ServiceHook] + _event_preprocessor_hooks: List[EventHook] + _event_postprocessor_hooks: List[EventHook] + + _service_tasks: Set[ + "asyncio.Task[None]" + ] # Server task collection, used to hold references to server tasks _condition: ( asyncio.Condition ) # Condition used to handle get # pyright: ignore[reportUninitializedInstanceVariable] _rule_tasks: Set[ "asyncio.Task[None]" - ] # Adapter task collection, used to hold references to adapter tasks + ] # Server task collection, used to hold references to server tasks _handle_event_tasks: Set[ "asyncio.Task[None]" - ] # Event handling task, used to keep a reference to the adapter task + ] # Event handling task, used to keep a reference to the service task def __init__( self, @@ -84,6 +97,8 @@ class Core: hot_reload: bool = False, ) -> None: self.config = MainConfig() + + self.services = [] self._current_event = None self._config_file = config_file self._config_dict = config_dict @@ -92,13 +107,18 @@ class Core: self._module_path_finder = ModulePathFinder() self.rules_priority_dict = defaultdict(list) self._raw_config_dict = {} + self._service_tasks = set() self._rule_tasks = set() self._handle_event_tasks = set() + self._extend_services = [] self._extend_rules = [] self._extend_rule_dirs = [] self._core_run_hooks = [] self._core_exit_hooks = [] + self._service_startup_hooks = [] + self._service_run_hooks = [] + self._service_shutdown_hooks = [] self._rule_enable_hooks = [] self._rule_run_hooks = [] self._rule_disable_hooks = [] @@ -119,6 +139,7 @@ class Core: if self._restart_flag: self._load_rules_from_dirs(*self._extend_rule_dirs) self._load_rules(*self._extend_rules) + self._load_services(*self._extend_services) def restart(self) -> None: logger.info("Restarting...") @@ -146,10 +167,11 @@ class Core: self._load_rules_from_dirs(*self.config.core.rule_dirs) self._load_rules(*self.config.core.rules) + self._load_services(*self.config.core.services) self._update_config() logger.info("Running...") - + hot_reload_task = None if self._hot_reload: # pragma: no cover hot_reload_task = asyncio.create_task(self._run_hot_reload()) @@ -158,6 +180,21 @@ class Core: await core_run_hook_func(self) try: + for _service in self.services: + for _service_startup_hook_func in self._service_startup_hooks: + await _service_startup_hook_func(_service) + try: + await _service.startup() + except Exception as e: + self.error_or_exception(f"Start service {_service!r} failed:", e) + + for _service in self.services: + for _service_run_hook_func in self._service_run_hooks: + await _service_run_hook_func(_service) + _service_task = asyncio.create_task(_service.safe_run()) + self._service_tasks.add(_service_task) + _service_task.add_done_callback(self._service_tasks.discard) + # TODO(简律纯): builtin rule enable hook function in every rules packages. # for _rule in self.rules: # for rule_enable_hook_func in self._rule_enable_hooks: @@ -180,11 +217,19 @@ class Core: if hot_reload_task is not None: # pragma: no cover await hot_reload_task finally: + for _service in self.services: + for service_shutdown_hook_func in self._service_shutdown_hooks: + await service_shutdown_hook_func(_service) + await _service.shutdown() + # TODO(简律纯): builtin rule disable hook function in every rules packages. # for _rule in self.rules: # for rule_disable_hook_func in self._rule_disable_hooks: # await rule_disable_hook_func(_rule) # await _rule.disable() + + while self._service_tasks: + await asyncio.sleep(0) while self._rule_tasks: await asyncio.sleep(0) @@ -192,6 +237,7 @@ class Core: for core_exit_hook_func in self._core_exit_hooks: await core_exit_hook_func(self) + self.services.clear() self.rules.clear() self.rules_priority_dict.clear() self._module_path_finder.path.clear() @@ -295,7 +341,7 @@ class Core: def _update_config(self) -> None: def update_config( - source: List[Type[Rule[Any, Any, Any]]], + source: Union[List[Type[Rule[Any, Any, Any]]], List[Service[Any, Any]]], name: str, base: Type[ConfigModel], ) -> Tuple[Type[ConfigModel], ConfigModel]: @@ -319,6 +365,7 @@ class Core: self.config = create_model( "Config", rule=update_config(self.rules, "RuleConfig", RuleConfig), + service=update_config(self.services, "ServiceConfig", ServiceConfig), __base__=MainConfig, )(**self._raw_config_dict) # Update the level of logging @@ -382,7 +429,7 @@ class Core: ) -> None: if show_log: logger.info( - f"Rule {current_event.rule.name} received: {current_event!r}") + f"Service {current_event.service.name} received: {current_event!r}") if handle_get: _handle_event_task = asyncio.create_task(self._handle_event()) @@ -594,6 +641,58 @@ class Core: def load_rules_from_dirs(self, *dirs: Path) -> None: self._extend_rule_dirs.extend(dirs) self._load_rules_from_dirs(*dirs) + + def _load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None: + for service_ in services: + service_object: Service[Any, Any] + try: + if isinstance(service_, type) and issubclass(service_, Service): + service_object = service_(self) + elif isinstance(service_, str): + service_classes = get_classes_from_module_name(service_, Service) + if not service_classes: + raise LoadModuleError( # noqa: TRY301 + f"Can not find Service class in the {service_} module" + ) + if len(service_classes) > 1: + raise LoadModuleError( # noqa: TRY301 + f"More then one Service class in the {service_} module" + ) + service_object = service_classes[0][0](self) # type: ignore + else: + raise TypeError( # noqa: TRY301 + f"{service_} can not be loaded as service" + ) + except Exception as e: + self.error_or_exception(f'Load service "{service_}" failed:', e) + else: + self.services.append(service_object) + logger.info( + f'Succeeded to load service "{service_object.__class__.__name__}" ' + f'from "{service_}"' + ) + + def load_services(self, *services: Union[Type[Service[Any, Any]], str]) -> None: + self._extend_services.extend(services) + self._load_services(*services) + + @overload + def get_service(self, service: str) -> Service[Any, Any]: ... + + @overload + def get_service(self, service: Type[ServiceT]) -> ServiceT: ... + + def get_service( + self, service: Union[str, Type[ServiceT]] + ) -> Union[Service[Any, Any], ServiceT]: + for _service in self.services: + if isinstance(service, str): + if _service.name == service: + return _service + elif isinstance(_service, service): + return _service + raise LookupError(f'Can not find service named "{service}"') + def get_rule(self, name: str) -> Type[Rule[Any, Any, Any]]: for _rule in self.rules: @@ -605,9 +704,9 @@ class Core: self, message: str, exception: Exception ) -> None: # pragma: no cover if self.config.core.log.verbose_exception: - logger.exception(message) + error_or_exception(message) else: - logger.error(f"{message} {exception!r}") + error_or_exception(message, exception, verbose=False) def core_run_hook(self, func: CoreHook) -> CoreHook: self._core_run_hooks.append(func) diff --git a/hrc/dependencies.py b/hrc/dependencies.py index 3a662fd..e176e14 100644 --- a/hrc/dependencies.py +++ b/hrc/dependencies.py @@ -15,7 +15,7 @@ from typing import ( cast, ) -from .utils import get_annotations, sync_ctx_manager_wrapper +from hrc.utils import get_annotations, sync_ctx_manager_wrapper _T = TypeVar("_T") Dependency = Union[ @@ -45,12 +45,9 @@ class InnerDepends: attr = getattr(self.dependency, "__name__", type(self.dependency).__name__) cache = "" if self.use_cache else ", use_cache=False" return f"InnerDepends({attr}{cache})" - - -def Depends( # noqa: N802 # pylint: disable=invalid-name - dependency: Optional[Dependency[_T]] = None, *, use_cache: bool = True -) -> _T: - + + +def Depends(dependency: Optional[Dependency[_T]] = None, *, use_cache: bool = True) -> _T: return InnerDepends(dependency=dependency, use_cache=use_cache) # type: ignore diff --git a/hrc/dev/__init__.py b/hrc/dev/__init__.py index cf99d7f..ea8f56b 100644 --- a/hrc/dev/__init__.py +++ b/hrc/dev/__init__.py @@ -1 +1 @@ -from .grps import v1
\ No newline at end of file +from hrc.dev.grps import v1
\ No newline at end of file diff --git a/hrc/dev/api/__init__.py b/hrc/dev/api/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/hrc/dev/api/__init__.py diff --git a/hrc/dev/echo.py b/hrc/dev/echo.py index 7107159..5bdeab7 100644 --- a/hrc/dev/echo.py +++ b/hrc/dev/echo.py @@ -4,12 +4,102 @@ :ref: https://echo.hydroroll.team """ -class WorkFlow(object): +class Event(object): + """事件基类 + :ref: https://echo.hydroroll.team/Event/#0_event + """ + def __init__(self, event_type, data, metadata): + self.event_type = event_type + self.data = data + self.metadata = metadata + +class WorkFlow(Event): """workflow :ref: https://echo.hydroroll.team/Event/#1_workflow """ - -class CallBack(object): + def __init__(self, data, metadata): + super().__init__('workflow', data, metadata) + +class CallBack(Event): """callback :ref: https://echo.hydroroll.team/Event/#4_callback - """
\ No newline at end of file + """ + def __init__(self, data, metadata): + super().__init__('callback', data, metadata) + +class Message(Event): + """message + :ref: https://echo.hydroroll.team/Event/#2_message + """ + def __init__(self, data, metadata): + super().__init__('message', data, metadata) + +class Reaction(Event): + """reaction + :ref: https://echo.hydroroll.team/Event/#3_reaction + """ + def __init__(self, data, metadata): + super().__init__('reaction', data, metadata) + +class Typing(Event): + """typing + :ref: https://echo.hydroroll.team/Event/#5_typing + """ + def __init__(self, data, metadata): + super().__init__('typing', data, metadata) + +class UserJoin(Event): + """user join + :ref: https://echo.hydroroll.team/Event/#6_user_join + """ + def __init__(self, data, metadata): + super().__init__('user_join', data, metadata) + +class UserLeave(Event): + """user leave + :ref: https://echo.hydroroll.team/Event/#7_user_leave + """ + def __init__(self, data, metadata): + super().__init__('user_leave', data, metadata) + +class FileShare(Event): + """file share + :ref: https://echo.hydroroll.team/Event/#8_file_share + """ + def __init__(self, data, metadata): + super().__init__('file_share', data, metadata) + +class Mention(Event): + """mention + :ref: https://echo.hydroroll.team/Event/#9_mention + """ + def __init__(self, data, metadata): + super().__init__('mention', data, metadata) + +class ChannelCreate(Event): + """channel create + :ref: https://echo.hydroroll.team/Event/#10_channel_create + """ + def __init__(self, data, metadata): + super().__init__('channel_create', data, metadata) + +class ChannelDelete(Event): + """channel delete + :ref: https://echo.hydroroll.team/Event/#11_channel_delete + """ + def __init__(self, data, metadata): + super().__init__('channel_delete', data, metadata) + +class ChannelUpdate(Event): + """channel update + :ref: https://echo.hydroroll.team/Event/#12_channel_update + """ + def __init__(self, data, metadata): + super().__init__('channel_update', data, metadata) + +class UserUpdate(Event): + """user update + :ref: https://echo.hydroroll.team/Event/#13_user_update + """ + def __init__(self, data, metadata): + super().__init__('user_update', data, metadata)
\ No newline at end of file diff --git a/hrc/dev/grps/v1.py b/hrc/dev/grps/v1.py index 5af118c..9402c4b 100644 --- a/hrc/dev/grps/v1.py +++ b/hrc/dev/grps/v1.py @@ -1 +1,21 @@ -__version__ = "1.0.0-alpha.1"
\ No newline at end of file +from pydantic import BaseModel + + +__version__ = "1.0.0-alpha.1" + +class GRPS(BaseModel): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def run(self): + pass + + def start(self): + pass + + def stop(self): + pass + + def restart(self): + pass diff --git a/hrc/doc/__init__.py b/hrc/doc/__init__.py index e69de29..998b999 100644 --- a/hrc/doc/__init__.py +++ b/hrc/doc/__init__.py @@ -0,0 +1 @@ +import sphinx
\ No newline at end of file diff --git a/hrc/event.py b/hrc/event.py index afdb00c..961a356 100644 --- a/hrc/event.py +++ b/hrc/event.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, Any, Generic, Optional, Union from typing_extensions import Self from pydantic import BaseModel, ConfigDict -from .typing import RuleT +from hrc.typing import RuleT class Event(ABC, BaseModel, Generic[RuleT]): @@ -62,20 +62,6 @@ class MessageEvent(Event[RuleT], Generic[RuleT]): max_try_times: Optional[int] = None, timeout: Optional[Union[int, float]] = None, ) -> Self: - """Get the user's reply message. - - Equivalent to `get()` of ``Bot``, the condition is that the adapter, event type and sender are the same. - - Args: - max_try_times: Maximum number of events. - timeout: timeout period. - - Returns: - Message event that the user replies to. - - Raises: - GetEventTimeout: Maximum number of events exceeded or timeout. - """ return await self.rule.get( self.is_same_sender, @@ -1,6 +1,7 @@ import os import sys from datetime import datetime +from typing import Optional from loguru import logger as _logger @@ -11,13 +12,9 @@ log_path = os.path.join( current_path, "logs", datetime.now().strftime("%Y-%m-%d") + ".log" ) - -def error_or_exception(message: str, exception: Exception, verbose: bool): +def error_or_exception(message: str, exception: Optional[Exception], verbose: bool = True): logger.remove() - logger.add( - sys.stderr, - format="<magenta>{time:YYYY-MM-DD HH:mm:ss.SSS}</magenta> <level>[{level}]</level> > <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>", - ) + logger.add(sys.stderr) logger.add(sink=log_path, level="INFO", rotation="10 MB") if verbose: logger.exception(message) diff --git a/hrc/rule/__init__.py b/hrc/rule/__init__.py index e144091..282e3e2 100644 --- a/hrc/rule/__init__.py +++ b/hrc/rule/__init__.py @@ -3,8 +3,8 @@ from typing import Generic, Any, Type from abc import ABC -from . import BaseRule # noqa: F401 -from ..typing import RuleT # noqa: F401 +from hrc.rule import BaseRule # noqa: F401 +from hrc.typing import RuleT # noqa: F401 import inspect from abc import abstractmethod # noqa: F401 @@ -20,16 +20,16 @@ from typing import ( ) from typing_extensions import Annotated, get_args, get_origin -from ..config import ConfigModel +from hrc.config import ConfigModel -from ..dependencies import Depends -from ..event import Event -from ..exceptions import SkipException, StopException -from ..typing import ConfigT, EventT, StateT -from ..utils import is_config_class +from hrc.dependencies import Depends +from hrc.event import Event +from hrc.exceptions import SkipException, StopException +from hrc.typing import ConfigT, EventT, StateT +from hrc.utils import is_config_class if TYPE_CHECKING: - from ..core import Core + from hrc.core import Core class RuleLoadType(Enum): diff --git a/hrc/service/__init__.py b/hrc/service/__init__.py new file mode 100644 index 0000000..f153d5b --- /dev/null +++ b/hrc/service/__init__.py @@ -0,0 +1,111 @@ +import os +from abc import ABC, abstractmethod +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Generic, + Optional, + Type, + TypeVar, + Union, + final, + overload, +) + +from hrc.event import Event +from hrc.typing import ConfigT, EventT +from hrc.utils import is_config_class + +if TYPE_CHECKING: + from ..core import Core + +__all__ = ["Server"] + +if os.getenv("IAMAI_DEV") == "1": # pragma: no cover + # 当处于开发环境时,使用 pkg_resources 风格的命名空间包 + __import__("pkg_resources").declare_namespace(__name__) + + +_EventT = TypeVar("_EventT", bound="Event[Any]") + + +class Service(Generic[EventT, ConfigT], ABC): + name: str + core: "Core" + Config: Type[ConfigT] + + def __init__(self, core: "Core") -> None: + if not hasattr(self, "name"): + self.name = self.__class__.__name__ + self.core: Core = core + self.handle_event = self.core.handle_event + + @property + def config(self) -> ConfigT: + default: Any = None + config_class = getattr(self, "Config", None) + if is_config_class(config_class): + return getattr( + self.core.config.service, + config_class.__config_name__, + default, + ) + return default + + @final + async def safe_run(self) -> None: + try: + await self.run() + except Exception as e: + self.core.error_or_exception( + f"Run service {self.__class__.__name__} failed:", e + ) + + @abstractmethod + async def run(self) -> None: + raise NotImplementedError + + async def startup(self) -> None: + ... + + async def shutdown(self) -> None: + ... + + @overload + async def get( + self, + func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: None = None, + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> EventT: ... + + @overload + async def get( + self, + func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: Type[_EventT], + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> _EventT: ... + + @final + async def get( + self, + func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: Any = None, + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> Event[Any]: + return await self.core.get( + func, + event_type=event_type, + server_type=type(self), + max_try_times=max_try_times, + timeout=timeout, + )
\ No newline at end of file diff --git a/hrc/service/console/__init__.py b/hrc/service/console/__init__.py new file mode 100644 index 0000000..e11768e --- /dev/null +++ b/hrc/service/console/__init__.py @@ -0,0 +1,41 @@ +import asyncio +import sys +from typing_extensions import override + +from hrc.event import MessageEvent +from hrc.service import Service + +class ConsoleServiceEvent(MessageEvent["ConsoleService"]): + message: str + + @override + def get_sender_id(self) -> None: + return None + + @override + def get_plain_text(self) -> str: + return self.message + + @override + async def reply(self, message: str) -> None: + return await self.service.send(message) + + async def is_same_sender(self) -> bool: + return True + +class ConsoleService(Service[ConsoleServiceEvent, None]): + name: str = "console" + + @override + async def run(self) -> None: + while not self.core.should_exit.is_set(): + print("Please input message: ") # noqa: T201 + message = await asyncio.get_event_loop().run_in_executor( + None, sys.stdin.readline + ) + await self.handle_event( + ConsoleServiceEvent(service=self, type="message", message=message, rule="") + ) + + async def send(self, message: str) -> None: + print(f"Send a message: {message}") # noqa: T201
\ No newline at end of file diff --git a/hrc/service/http/__init__.py b/hrc/service/http/__init__.py new file mode 100644 index 0000000..a8d938b --- /dev/null +++ b/hrc/service/http/__init__.py @@ -0,0 +1,33 @@ +from typing_extensions import override + +from aiohttp import web + +from hrc.service.utils import HttpServerService +from hrc.event import Event +from hrc.log import logger +from aiohttp import web + +class HttpServerTestEvent(Event["HttpServerTestService"]): + """HTTP 服务端示例适配器事件类。""" + + message: str + + +class HttpServerTestService(HttpServerService[HttpServerTestEvent, None]): + name: str = "http_server_service" + get_url: str = "/" + post_url: str = "/" + host: str = "127.0.0.1" + port: int = 8080 + + + @override + async def handle_response(self, request: web.Request) -> web.StreamResponse: + event = HttpServerTestEvent( + service=self, + type="message", + rule="", + message=await request.text(), + ) + await self.handle_event(event) + return web.Response()
\ No newline at end of file diff --git a/hrc/service/utils.py b/hrc/service/utils.py new file mode 100644 index 0000000..4f29e0c --- /dev/null +++ b/hrc/service/utils.py @@ -0,0 +1,256 @@ +import asyncio +from abc import ABCMeta, abstractmethod +from typing import Literal, Optional, Union + +import aiohttp +from aiohttp import web + +from hrc.service import Service +from hrc.log import logger +from hrc.typing import ConfigT, EventT + +__all__ = [ + "PollingService", + "HttpClientService", + "WebSocketClientService", + "HttpServerService", + "WebSocketServerService", + "WebSocketService", +] + + +class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta): + """轮询式适配器示例。""" + + delay: float = 0.1 + create_task: bool = False + _on_tick_task: Optional["asyncio.Task[None]"] = None + + async def run(self) -> None: + while not self.core.should_exit.is_set(): + await asyncio.sleep(self.delay) + if self.create_task: + self._on_tick_task = asyncio.create_task(self.on_tick()) + else: + await self.on_tick() + + @abstractmethod + async def on_tick(self) -> None: + """当轮询发生。""" + + +class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta): + session: aiohttp.ClientSession + + async def startup(self) -> None: + self.session = aiohttp.ClientSession() + + @abstractmethod + async def on_tick(self) -> None: + ... + async def shutdown(self) -> None: + """关闭并清理连接。""" + await self.session.close() + + +class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta): + url: str + + async def run(self) -> None: + async with aiohttp.ClientSession() as session, session.ws_connect( + self.url + ) as ws: + msg: aiohttp.WSMessage + async for msg in ws: + if self.core.should_exit.is_set(): + break + if msg.type == aiohttp.WSMsgType.ERROR: + break + await self.handle_response(msg) + + @abstractmethod + async def handle_response(self, msg: aiohttp.WSMessage) -> None: + """处理响应。""" + + +class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta): + app: web.Application + runner: web.AppRunner + site: web.TCPSite + host: str + port: int + get_url: str + post_url: str + + async def startup(self) -> None: + """初始化适配器。""" + self.app = web.Application() + self.app.add_routes( + [ + web.get(self.get_url, self.handle_response), + web.post(self.post_url, self.handle_response), + ] + ) + + async def run(self) -> None: + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.host, self.port) + await self.site.start() + + async def shutdown(self) -> None: + """关闭并清理连接。""" + await self.runner.cleanup() + + @abstractmethod + async def handle_response(self, request: web.Request) -> web.StreamResponse: + """处理响应。""" + + +class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta): + app: web.Application + runner: web.AppRunner + site: web.TCPSite + websocket: web.WebSocketResponse + host: str + port: int + url: str + + async def startup(self) -> None: + self.app = web.Application() + self.app.add_routes([web.get(self.url, self.handle_response)]) + + async def run(self) -> None: + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.host, self.port) + await self.site.start() + + async def shutdown(self) -> None: + """关闭并清理连接。""" + await self.websocket.close() + await self.site.stop() + await self.runner.cleanup() + + async def handle_response(self, request: web.Request) -> web.WebSocketResponse: + """处理 WebSocket。""" + ws = web.WebSocketResponse() + await ws.prepare(request) + self.websocket = ws + + msg: aiohttp.WSMessage + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self.handle_ws_response(msg) + elif msg.type == aiohttp.WSMsgType.ERROR: + break + + return ws + + @abstractmethod + async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None: + """处理 WebSocket 响应。""" + + +class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta): + """ + 同时支持 WebSocket 客户端和服务端。 + """ + + websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = ( + None + ) + + # ws + session: Optional[aiohttp.ClientSession] + + # reverse-ws + app: Optional[web.Application] + runner: Optional[web.AppRunner] + site: Optional[web.TCPSite] + + # config + service_type: Literal["ws", "reverse-ws"] + host: str + port: int + url: str + reconnect_interval: int = 3 + + async def startup(self) -> None: + if self.service_type == "ws": + self.session = aiohttp.ClientSession() + elif self.service_type == "reverse-ws": + self.app = web.Application() + self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)]) + else: + logger.error( + 'Config "service_type" must be "ws" or "reverse-ws", not ' + + self.service_type + ) + + async def run(self) -> None: + if self.service_type == "ws": + while True: + try: + await self.websocket_connect() + except aiohttp.ClientError as e: + self.core.error_or_exception("WebSocket connection error:", e) + if self.core.should_exit.is_set(): + break + await asyncio.sleep(self.reconnect_interval) + elif self.service_type == "reverse-ws": + assert self.app is not None + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.host, self.port) + await self.site.start() + + async def shutdown(self) -> None: + """关闭并清理连接。""" + if self.websocket is not None: + await self.websocket.close() + if self.service_type == "ws": + if self.session is not None: + await self.session.close() + elif self.service_type == "reverse-ws": + if self.site is not None: + await self.site.stop() + if self.runner is not None: + await self.runner.cleanup() + + async def handle_reverse_ws_response( + self, request: web.Request + ) -> web.WebSocketResponse: + """处理 aiohttp WebSocket 服务器的接收。""" + self.websocket = web.WebSocketResponse() + await self.websocket.prepare(request) + await self.reverse_ws_connection_hook() + await self.handle_websocket() + return self.websocket + + async def reverse_ws_connection_hook(self) -> None: + """反向 WebSocket 连接建立时的钩子函数。""" + logger.info("WebSocket connected!") + + async def websocket_connect(self) -> None: + """创建正向 WebSocket 连接。""" + assert self.session is not None + logger.info("Tying to connect to WebSocket server...") + async with self.session.ws_connect( + f"ws://{self.host}:{self.port}{self.url}" + ) as self.websocket: + await self.handle_websocket() + + async def handle_websocket(self) -> None: + """处理 WebSocket。""" + if self.websocket is None or self.websocket.closed: + return + async for msg in self.websocket: + await self.handle_websocket_msg(msg) + if not self.core.should_exit.is_set(): + logger.warning("WebSocket connection closed!") + + @abstractmethod + async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None: + """处理 WebSocket 消息。""" + raise NotImplementedError
\ No newline at end of file diff --git a/hrc/service/websocket/__init__.py b/hrc/service/websocket/__init__.py new file mode 100644 index 0000000..3a7f089 --- /dev/null +++ b/hrc/service/websocket/__init__.py @@ -0,0 +1,30 @@ +from typing import Any, Coroutine +from typing_extensions import override +from aiohttp import web, ClientWebSocketResponse + +from hrc.service.utils import WebSocketService +from hrc.event import Event +from hrc.log import logger + +from aiohttp import web + +class WebSocketTestEvent(Event["WebSocketTestEvent"]): + message: str + +class WebSocketTestService(WebSocketService[WebSocketTestEvent, None]): + name: str = "websocket_test_service" + service_type: str = "reverse-ws" + host: str = "127.0.0.1" + port: int = 8765 + url: str = "/" + + @override + async def handle_reverse_ws_response(self, request: web.Request) -> Coroutine[Any, Any, ClientWebSocketResponse]: + event = WebSocketTestEvent( + service=self, + type="message", + message=await request.text() + ) + logger.info(f"Receive {event}") + await self.handle_event(event) + return web.Response()
\ No newline at end of file diff --git a/hrc/typing.py b/hrc/typing.py index d74fd26..a207c80 100644 --- a/hrc/typing.py +++ b/hrc/typing.py @@ -4,17 +4,20 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Optional, TypeVar if TYPE_CHECKING: from typing import Any - from .core import Core - from .config import ConfigModel - from .event import Event - from .rule import Rule + from hrc.service import Service + from hrc.core import Core + from hrc.config import ConfigModel + from hrc.event import Event + from hrc.rule import Rule StateT = TypeVar("StateT") EventT = TypeVar("EventT", bound="Event[Any]") RuleT = TypeVar("RuleT", bound="Rule[Any, Any, Any]") ConfigT = TypeVar("ConfigT", bound=Optional["ConfigModel"]) +ServiceT = TypeVar("ServiceT", bound="Service[Any, Any]") CoreHook = Callable[["Core"], Awaitable[None]] RuleHook = Callable[["Rule"], Awaitable[None]] +ServiceHook = Callable[["Service[Any, Any]"], Awaitable[None]] EventHook = Callable[["Event[Any]"], Awaitable[None]] diff --git a/hrc/utils.py b/hrc/utils.py index e85cea4..e1399f5 100644 --- a/hrc/utils.py +++ b/hrc/utils.py @@ -37,8 +37,8 @@ from typing_extensions import ParamSpec, TypeAlias, TypeGuard from pydantic import BaseModel -from .config import ConfigModel -from .typing import EventT +from hrc.config import ConfigModel +from hrc.typing import EventT if TYPE_CHECKING: from os import PathLike |
