diff options
Diffstat (limited to 'src/hrc/service')
| -rw-r--r-- | src/hrc/service/__init__.py | 111 | ||||
| -rw-r--r-- | src/hrc/service/console/__init__.py | 41 | ||||
| -rw-r--r-- | src/hrc/service/http/__init__.py | 33 | ||||
| -rw-r--r-- | src/hrc/service/utils.py | 256 | ||||
| -rw-r--r-- | src/hrc/service/websocket/__init__.py | 30 |
5 files changed, 471 insertions, 0 deletions
diff --git a/src/hrc/service/__init__.py b/src/hrc/service/__init__.py new file mode 100644 index 0000000..f153d5b --- /dev/null +++ b/src/hrc/service/__init__.py @@ -0,0 +1,111 @@ +import os +from abc import ABC, abstractmethod +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Generic, + Optional, + Type, + TypeVar, + Union, + final, + overload, +) + +from hrc.event import Event +from hrc.typing import ConfigT, EventT +from hrc.utils import is_config_class + +if TYPE_CHECKING: + from ..core import Core + +__all__ = ["Server"] + +if os.getenv("IAMAI_DEV") == "1": # pragma: no cover + # 当处于开发环境时,使用 pkg_resources 风格的命名空间包 + __import__("pkg_resources").declare_namespace(__name__) + + +_EventT = TypeVar("_EventT", bound="Event[Any]") + + +class Service(Generic[EventT, ConfigT], ABC): + name: str + core: "Core" + Config: Type[ConfigT] + + def __init__(self, core: "Core") -> None: + if not hasattr(self, "name"): + self.name = self.__class__.__name__ + self.core: Core = core + self.handle_event = self.core.handle_event + + @property + def config(self) -> ConfigT: + default: Any = None + config_class = getattr(self, "Config", None) + if is_config_class(config_class): + return getattr( + self.core.config.service, + config_class.__config_name__, + default, + ) + return default + + @final + async def safe_run(self) -> None: + try: + await self.run() + except Exception as e: + self.core.error_or_exception( + f"Run service {self.__class__.__name__} failed:", e + ) + + @abstractmethod + async def run(self) -> None: + raise NotImplementedError + + async def startup(self) -> None: + ... + + async def shutdown(self) -> None: + ... + + @overload + async def get( + self, + func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: None = None, + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> EventT: ... + + @overload + async def get( + self, + func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: Type[_EventT], + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> _EventT: ... + + @final + async def get( + self, + func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None, + *, + event_type: Any = None, + max_try_times: Optional[int] = None, + timeout: Optional[Union[int, float]] = None, + ) -> Event[Any]: + return await self.core.get( + func, + event_type=event_type, + server_type=type(self), + max_try_times=max_try_times, + timeout=timeout, + )
\ No newline at end of file diff --git a/src/hrc/service/console/__init__.py b/src/hrc/service/console/__init__.py new file mode 100644 index 0000000..e11768e --- /dev/null +++ b/src/hrc/service/console/__init__.py @@ -0,0 +1,41 @@ +import asyncio +import sys +from typing_extensions import override + +from hrc.event import MessageEvent +from hrc.service import Service + +class ConsoleServiceEvent(MessageEvent["ConsoleService"]): + message: str + + @override + def get_sender_id(self) -> None: + return None + + @override + def get_plain_text(self) -> str: + return self.message + + @override + async def reply(self, message: str) -> None: + return await self.service.send(message) + + async def is_same_sender(self) -> bool: + return True + +class ConsoleService(Service[ConsoleServiceEvent, None]): + name: str = "console" + + @override + async def run(self) -> None: + while not self.core.should_exit.is_set(): + print("Please input message: ") # noqa: T201 + message = await asyncio.get_event_loop().run_in_executor( + None, sys.stdin.readline + ) + await self.handle_event( + ConsoleServiceEvent(service=self, type="message", message=message, rule="") + ) + + async def send(self, message: str) -> None: + print(f"Send a message: {message}") # noqa: T201
\ No newline at end of file diff --git a/src/hrc/service/http/__init__.py b/src/hrc/service/http/__init__.py new file mode 100644 index 0000000..a8d938b --- /dev/null +++ b/src/hrc/service/http/__init__.py @@ -0,0 +1,33 @@ +from typing_extensions import override + +from aiohttp import web + +from hrc.service.utils import HttpServerService +from hrc.event import Event +from hrc.log import logger +from aiohttp import web + +class HttpServerTestEvent(Event["HttpServerTestService"]): + """HTTP 服务端示例适配器事件类。""" + + message: str + + +class HttpServerTestService(HttpServerService[HttpServerTestEvent, None]): + name: str = "http_server_service" + get_url: str = "/" + post_url: str = "/" + host: str = "127.0.0.1" + port: int = 8080 + + + @override + async def handle_response(self, request: web.Request) -> web.StreamResponse: + event = HttpServerTestEvent( + service=self, + type="message", + rule="", + message=await request.text(), + ) + await self.handle_event(event) + return web.Response()
\ No newline at end of file diff --git a/src/hrc/service/utils.py b/src/hrc/service/utils.py new file mode 100644 index 0000000..4f29e0c --- /dev/null +++ b/src/hrc/service/utils.py @@ -0,0 +1,256 @@ +import asyncio +from abc import ABCMeta, abstractmethod +from typing import Literal, Optional, Union + +import aiohttp +from aiohttp import web + +from hrc.service import Service +from hrc.log import logger +from hrc.typing import ConfigT, EventT + +__all__ = [ + "PollingService", + "HttpClientService", + "WebSocketClientService", + "HttpServerService", + "WebSocketServerService", + "WebSocketService", +] + + +class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta): + """轮询式适配器示例。""" + + delay: float = 0.1 + create_task: bool = False + _on_tick_task: Optional["asyncio.Task[None]"] = None + + async def run(self) -> None: + while not self.core.should_exit.is_set(): + await asyncio.sleep(self.delay) + if self.create_task: + self._on_tick_task = asyncio.create_task(self.on_tick()) + else: + await self.on_tick() + + @abstractmethod + async def on_tick(self) -> None: + """当轮询发生。""" + + +class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta): + session: aiohttp.ClientSession + + async def startup(self) -> None: + self.session = aiohttp.ClientSession() + + @abstractmethod + async def on_tick(self) -> None: + ... + async def shutdown(self) -> None: + """关闭并清理连接。""" + await self.session.close() + + +class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta): + url: str + + async def run(self) -> None: + async with aiohttp.ClientSession() as session, session.ws_connect( + self.url + ) as ws: + msg: aiohttp.WSMessage + async for msg in ws: + if self.core.should_exit.is_set(): + break + if msg.type == aiohttp.WSMsgType.ERROR: + break + await self.handle_response(msg) + + @abstractmethod + async def handle_response(self, msg: aiohttp.WSMessage) -> None: + """处理响应。""" + + +class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta): + app: web.Application + runner: web.AppRunner + site: web.TCPSite + host: str + port: int + get_url: str + post_url: str + + async def startup(self) -> None: + """初始化适配器。""" + self.app = web.Application() + self.app.add_routes( + [ + web.get(self.get_url, self.handle_response), + web.post(self.post_url, self.handle_response), + ] + ) + + async def run(self) -> None: + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.host, self.port) + await self.site.start() + + async def shutdown(self) -> None: + """关闭并清理连接。""" + await self.runner.cleanup() + + @abstractmethod + async def handle_response(self, request: web.Request) -> web.StreamResponse: + """处理响应。""" + + +class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta): + app: web.Application + runner: web.AppRunner + site: web.TCPSite + websocket: web.WebSocketResponse + host: str + port: int + url: str + + async def startup(self) -> None: + self.app = web.Application() + self.app.add_routes([web.get(self.url, self.handle_response)]) + + async def run(self) -> None: + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.host, self.port) + await self.site.start() + + async def shutdown(self) -> None: + """关闭并清理连接。""" + await self.websocket.close() + await self.site.stop() + await self.runner.cleanup() + + async def handle_response(self, request: web.Request) -> web.WebSocketResponse: + """处理 WebSocket。""" + ws = web.WebSocketResponse() + await ws.prepare(request) + self.websocket = ws + + msg: aiohttp.WSMessage + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self.handle_ws_response(msg) + elif msg.type == aiohttp.WSMsgType.ERROR: + break + + return ws + + @abstractmethod + async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None: + """处理 WebSocket 响应。""" + + +class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta): + """ + 同时支持 WebSocket 客户端和服务端。 + """ + + websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = ( + None + ) + + # ws + session: Optional[aiohttp.ClientSession] + + # reverse-ws + app: Optional[web.Application] + runner: Optional[web.AppRunner] + site: Optional[web.TCPSite] + + # config + service_type: Literal["ws", "reverse-ws"] + host: str + port: int + url: str + reconnect_interval: int = 3 + + async def startup(self) -> None: + if self.service_type == "ws": + self.session = aiohttp.ClientSession() + elif self.service_type == "reverse-ws": + self.app = web.Application() + self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)]) + else: + logger.error( + 'Config "service_type" must be "ws" or "reverse-ws", not ' + + self.service_type + ) + + async def run(self) -> None: + if self.service_type == "ws": + while True: + try: + await self.websocket_connect() + except aiohttp.ClientError as e: + self.core.error_or_exception("WebSocket connection error:", e) + if self.core.should_exit.is_set(): + break + await asyncio.sleep(self.reconnect_interval) + elif self.service_type == "reverse-ws": + assert self.app is not None + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.host, self.port) + await self.site.start() + + async def shutdown(self) -> None: + """关闭并清理连接。""" + if self.websocket is not None: + await self.websocket.close() + if self.service_type == "ws": + if self.session is not None: + await self.session.close() + elif self.service_type == "reverse-ws": + if self.site is not None: + await self.site.stop() + if self.runner is not None: + await self.runner.cleanup() + + async def handle_reverse_ws_response( + self, request: web.Request + ) -> web.WebSocketResponse: + """处理 aiohttp WebSocket 服务器的接收。""" + self.websocket = web.WebSocketResponse() + await self.websocket.prepare(request) + await self.reverse_ws_connection_hook() + await self.handle_websocket() + return self.websocket + + async def reverse_ws_connection_hook(self) -> None: + """反向 WebSocket 连接建立时的钩子函数。""" + logger.info("WebSocket connected!") + + async def websocket_connect(self) -> None: + """创建正向 WebSocket 连接。""" + assert self.session is not None + logger.info("Tying to connect to WebSocket server...") + async with self.session.ws_connect( + f"ws://{self.host}:{self.port}{self.url}" + ) as self.websocket: + await self.handle_websocket() + + async def handle_websocket(self) -> None: + """处理 WebSocket。""" + if self.websocket is None or self.websocket.closed: + return + async for msg in self.websocket: + await self.handle_websocket_msg(msg) + if not self.core.should_exit.is_set(): + logger.warning("WebSocket connection closed!") + + @abstractmethod + async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None: + """处理 WebSocket 消息。""" + raise NotImplementedError
\ No newline at end of file diff --git a/src/hrc/service/websocket/__init__.py b/src/hrc/service/websocket/__init__.py new file mode 100644 index 0000000..3a7f089 --- /dev/null +++ b/src/hrc/service/websocket/__init__.py @@ -0,0 +1,30 @@ +from typing import Any, Coroutine +from typing_extensions import override +from aiohttp import web, ClientWebSocketResponse + +from hrc.service.utils import WebSocketService +from hrc.event import Event +from hrc.log import logger + +from aiohttp import web + +class WebSocketTestEvent(Event["WebSocketTestEvent"]): + message: str + +class WebSocketTestService(WebSocketService[WebSocketTestEvent, None]): + name: str = "websocket_test_service" + service_type: str = "reverse-ws" + host: str = "127.0.0.1" + port: int = 8765 + url: str = "/" + + @override + async def handle_reverse_ws_response(self, request: web.Request) -> Coroutine[Any, Any, ClientWebSocketResponse]: + event = WebSocketTestEvent( + service=self, + type="message", + message=await request.text() + ) + logger.info(f"Receive {event}") + await self.handle_event(event) + return web.Response()
\ No newline at end of file |
