diff options
Diffstat (limited to 'hrc/service')
| -rw-r--r-- | hrc/service/__init__.py | 111 | ||||
| -rw-r--r-- | hrc/service/console/__init__.py | 41 | ||||
| -rw-r--r-- | hrc/service/http/__init__.py | 33 | ||||
| -rw-r--r-- | hrc/service/utils.py | 256 | ||||
| -rw-r--r-- | hrc/service/websocket/__init__.py | 30 |
5 files changed, 0 insertions, 471 deletions
diff --git a/hrc/service/__init__.py b/hrc/service/__init__.py deleted file mode 100644 index f153d5b..0000000 --- a/hrc/service/__init__.py +++ /dev/null @@ -1,111 +0,0 @@ -import os -from abc import ABC, abstractmethod -from typing import ( - TYPE_CHECKING, - Any, - Awaitable, - Callable, - Generic, - Optional, - Type, - TypeVar, - Union, - final, - overload, -) - -from hrc.event import Event -from hrc.typing import ConfigT, EventT -from hrc.utils import is_config_class - -if TYPE_CHECKING: - from ..core import Core - -__all__ = ["Server"] - -if os.getenv("IAMAI_DEV") == "1": # pragma: no cover - # 当处于开发环境时,使用 pkg_resources 风格的命名空间包 - __import__("pkg_resources").declare_namespace(__name__) - - -_EventT = TypeVar("_EventT", bound="Event[Any]") - - -class Service(Generic[EventT, ConfigT], ABC): - name: str - core: "Core" - Config: Type[ConfigT] - - def __init__(self, core: "Core") -> None: - if not hasattr(self, "name"): - self.name = self.__class__.__name__ - self.core: Core = core - self.handle_event = self.core.handle_event - - @property - def config(self) -> ConfigT: - default: Any = None - config_class = getattr(self, "Config", None) - if is_config_class(config_class): - return getattr( - self.core.config.service, - config_class.__config_name__, - default, - ) - return default - - @final - async def safe_run(self) -> None: - try: - await self.run() - except Exception as e: - self.core.error_or_exception( - f"Run service {self.__class__.__name__} failed:", e - ) - - @abstractmethod - async def run(self) -> None: - raise NotImplementedError - - async def startup(self) -> None: - ... - - async def shutdown(self) -> None: - ... - - @overload - async def get( - self, - func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None, - *, - event_type: None = None, - max_try_times: Optional[int] = None, - timeout: Optional[Union[int, float]] = None, - ) -> EventT: ... - - @overload - async def get( - self, - func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None, - *, - event_type: Type[_EventT], - max_try_times: Optional[int] = None, - timeout: Optional[Union[int, float]] = None, - ) -> _EventT: ... - - @final - async def get( - self, - func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None, - *, - event_type: Any = None, - max_try_times: Optional[int] = None, - timeout: Optional[Union[int, float]] = None, - ) -> Event[Any]: - return await self.core.get( - func, - event_type=event_type, - server_type=type(self), - max_try_times=max_try_times, - timeout=timeout, - )
\ No newline at end of file diff --git a/hrc/service/console/__init__.py b/hrc/service/console/__init__.py deleted file mode 100644 index e11768e..0000000 --- a/hrc/service/console/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -import asyncio -import sys -from typing_extensions import override - -from hrc.event import MessageEvent -from hrc.service import Service - -class ConsoleServiceEvent(MessageEvent["ConsoleService"]): - message: str - - @override - def get_sender_id(self) -> None: - return None - - @override - def get_plain_text(self) -> str: - return self.message - - @override - async def reply(self, message: str) -> None: - return await self.service.send(message) - - async def is_same_sender(self) -> bool: - return True - -class ConsoleService(Service[ConsoleServiceEvent, None]): - name: str = "console" - - @override - async def run(self) -> None: - while not self.core.should_exit.is_set(): - print("Please input message: ") # noqa: T201 - message = await asyncio.get_event_loop().run_in_executor( - None, sys.stdin.readline - ) - await self.handle_event( - ConsoleServiceEvent(service=self, type="message", message=message, rule="") - ) - - async def send(self, message: str) -> None: - print(f"Send a message: {message}") # noqa: T201
\ No newline at end of file diff --git a/hrc/service/http/__init__.py b/hrc/service/http/__init__.py deleted file mode 100644 index a8d938b..0000000 --- a/hrc/service/http/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing_extensions import override - -from aiohttp import web - -from hrc.service.utils import HttpServerService -from hrc.event import Event -from hrc.log import logger -from aiohttp import web - -class HttpServerTestEvent(Event["HttpServerTestService"]): - """HTTP 服务端示例适配器事件类。""" - - message: str - - -class HttpServerTestService(HttpServerService[HttpServerTestEvent, None]): - name: str = "http_server_service" - get_url: str = "/" - post_url: str = "/" - host: str = "127.0.0.1" - port: int = 8080 - - - @override - async def handle_response(self, request: web.Request) -> web.StreamResponse: - event = HttpServerTestEvent( - service=self, - type="message", - rule="", - message=await request.text(), - ) - await self.handle_event(event) - return web.Response()
\ No newline at end of file diff --git a/hrc/service/utils.py b/hrc/service/utils.py deleted file mode 100644 index 4f29e0c..0000000 --- a/hrc/service/utils.py +++ /dev/null @@ -1,256 +0,0 @@ -import asyncio -from abc import ABCMeta, abstractmethod -from typing import Literal, Optional, Union - -import aiohttp -from aiohttp import web - -from hrc.service import Service -from hrc.log import logger -from hrc.typing import ConfigT, EventT - -__all__ = [ - "PollingService", - "HttpClientService", - "WebSocketClientService", - "HttpServerService", - "WebSocketServerService", - "WebSocketService", -] - - -class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta): - """轮询式适配器示例。""" - - delay: float = 0.1 - create_task: bool = False - _on_tick_task: Optional["asyncio.Task[None]"] = None - - async def run(self) -> None: - while not self.core.should_exit.is_set(): - await asyncio.sleep(self.delay) - if self.create_task: - self._on_tick_task = asyncio.create_task(self.on_tick()) - else: - await self.on_tick() - - @abstractmethod - async def on_tick(self) -> None: - """当轮询发生。""" - - -class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta): - session: aiohttp.ClientSession - - async def startup(self) -> None: - self.session = aiohttp.ClientSession() - - @abstractmethod - async def on_tick(self) -> None: - ... - async def shutdown(self) -> None: - """关闭并清理连接。""" - await self.session.close() - - -class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta): - url: str - - async def run(self) -> None: - async with aiohttp.ClientSession() as session, session.ws_connect( - self.url - ) as ws: - msg: aiohttp.WSMessage - async for msg in ws: - if self.core.should_exit.is_set(): - break - if msg.type == aiohttp.WSMsgType.ERROR: - break - await self.handle_response(msg) - - @abstractmethod - async def handle_response(self, msg: aiohttp.WSMessage) -> None: - """处理响应。""" - - -class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta): - app: web.Application - runner: web.AppRunner - site: web.TCPSite - host: str - port: int - get_url: str - post_url: str - - async def startup(self) -> None: - """初始化适配器。""" - self.app = web.Application() - self.app.add_routes( - [ - web.get(self.get_url, self.handle_response), - web.post(self.post_url, self.handle_response), - ] - ) - - async def run(self) -> None: - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.site = web.TCPSite(self.runner, self.host, self.port) - await self.site.start() - - async def shutdown(self) -> None: - """关闭并清理连接。""" - await self.runner.cleanup() - - @abstractmethod - async def handle_response(self, request: web.Request) -> web.StreamResponse: - """处理响应。""" - - -class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta): - app: web.Application - runner: web.AppRunner - site: web.TCPSite - websocket: web.WebSocketResponse - host: str - port: int - url: str - - async def startup(self) -> None: - self.app = web.Application() - self.app.add_routes([web.get(self.url, self.handle_response)]) - - async def run(self) -> None: - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.site = web.TCPSite(self.runner, self.host, self.port) - await self.site.start() - - async def shutdown(self) -> None: - """关闭并清理连接。""" - await self.websocket.close() - await self.site.stop() - await self.runner.cleanup() - - async def handle_response(self, request: web.Request) -> web.WebSocketResponse: - """处理 WebSocket。""" - ws = web.WebSocketResponse() - await ws.prepare(request) - self.websocket = ws - - msg: aiohttp.WSMessage - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - await self.handle_ws_response(msg) - elif msg.type == aiohttp.WSMsgType.ERROR: - break - - return ws - - @abstractmethod - async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None: - """处理 WebSocket 响应。""" - - -class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta): - """ - 同时支持 WebSocket 客户端和服务端。 - """ - - websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = ( - None - ) - - # ws - session: Optional[aiohttp.ClientSession] - - # reverse-ws - app: Optional[web.Application] - runner: Optional[web.AppRunner] - site: Optional[web.TCPSite] - - # config - service_type: Literal["ws", "reverse-ws"] - host: str - port: int - url: str - reconnect_interval: int = 3 - - async def startup(self) -> None: - if self.service_type == "ws": - self.session = aiohttp.ClientSession() - elif self.service_type == "reverse-ws": - self.app = web.Application() - self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)]) - else: - logger.error( - 'Config "service_type" must be "ws" or "reverse-ws", not ' - + self.service_type - ) - - async def run(self) -> None: - if self.service_type == "ws": - while True: - try: - await self.websocket_connect() - except aiohttp.ClientError as e: - self.core.error_or_exception("WebSocket connection error:", e) - if self.core.should_exit.is_set(): - break - await asyncio.sleep(self.reconnect_interval) - elif self.service_type == "reverse-ws": - assert self.app is not None - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.site = web.TCPSite(self.runner, self.host, self.port) - await self.site.start() - - async def shutdown(self) -> None: - """关闭并清理连接。""" - if self.websocket is not None: - await self.websocket.close() - if self.service_type == "ws": - if self.session is not None: - await self.session.close() - elif self.service_type == "reverse-ws": - if self.site is not None: - await self.site.stop() - if self.runner is not None: - await self.runner.cleanup() - - async def handle_reverse_ws_response( - self, request: web.Request - ) -> web.WebSocketResponse: - """处理 aiohttp WebSocket 服务器的接收。""" - self.websocket = web.WebSocketResponse() - await self.websocket.prepare(request) - await self.reverse_ws_connection_hook() - await self.handle_websocket() - return self.websocket - - async def reverse_ws_connection_hook(self) -> None: - """反向 WebSocket 连接建立时的钩子函数。""" - logger.info("WebSocket connected!") - - async def websocket_connect(self) -> None: - """创建正向 WebSocket 连接。""" - assert self.session is not None - logger.info("Tying to connect to WebSocket server...") - async with self.session.ws_connect( - f"ws://{self.host}:{self.port}{self.url}" - ) as self.websocket: - await self.handle_websocket() - - async def handle_websocket(self) -> None: - """处理 WebSocket。""" - if self.websocket is None or self.websocket.closed: - return - async for msg in self.websocket: - await self.handle_websocket_msg(msg) - if not self.core.should_exit.is_set(): - logger.warning("WebSocket connection closed!") - - @abstractmethod - async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None: - """处理 WebSocket 消息。""" - raise NotImplementedError
\ No newline at end of file diff --git a/hrc/service/websocket/__init__.py b/hrc/service/websocket/__init__.py deleted file mode 100644 index 3a7f089..0000000 --- a/hrc/service/websocket/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import Any, Coroutine -from typing_extensions import override -from aiohttp import web, ClientWebSocketResponse - -from hrc.service.utils import WebSocketService -from hrc.event import Event -from hrc.log import logger - -from aiohttp import web - -class WebSocketTestEvent(Event["WebSocketTestEvent"]): - message: str - -class WebSocketTestService(WebSocketService[WebSocketTestEvent, None]): - name: str = "websocket_test_service" - service_type: str = "reverse-ws" - host: str = "127.0.0.1" - port: int = 8765 - url: str = "/" - - @override - async def handle_reverse_ws_response(self, request: web.Request) -> Coroutine[Any, Any, ClientWebSocketResponse]: - event = WebSocketTestEvent( - service=self, - type="message", - message=await request.text() - ) - logger.info(f"Receive {event}") - await self.handle_event(event) - return web.Response()
\ No newline at end of file |
