aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/hrc/service
diff options
context:
space:
mode:
Diffstat (limited to 'src/hrc/service')
-rw-r--r--src/hrc/service/__init__.py111
-rw-r--r--src/hrc/service/console/__init__.py41
-rw-r--r--src/hrc/service/http/__init__.py33
-rw-r--r--src/hrc/service/utils.py256
-rw-r--r--src/hrc/service/websocket/__init__.py30
5 files changed, 471 insertions, 0 deletions
diff --git a/src/hrc/service/__init__.py b/src/hrc/service/__init__.py
new file mode 100644
index 0000000..f153d5b
--- /dev/null
+++ b/src/hrc/service/__init__.py
@@ -0,0 +1,111 @@
+import os
+from abc import ABC, abstractmethod
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Callable,
+ Generic,
+ Optional,
+ Type,
+ TypeVar,
+ Union,
+ final,
+ overload,
+)
+
+from hrc.event import Event
+from hrc.typing import ConfigT, EventT
+from hrc.utils import is_config_class
+
+if TYPE_CHECKING:
+ from ..core import Core
+
+__all__ = ["Server"]
+
+if os.getenv("IAMAI_DEV") == "1": # pragma: no cover
+ # 当处于开发环境时,使用 pkg_resources 风格的命名空间包
+ __import__("pkg_resources").declare_namespace(__name__)
+
+
+_EventT = TypeVar("_EventT", bound="Event[Any]")
+
+
+class Service(Generic[EventT, ConfigT], ABC):
+ name: str
+ core: "Core"
+ Config: Type[ConfigT]
+
+ def __init__(self, core: "Core") -> None:
+ if not hasattr(self, "name"):
+ self.name = self.__class__.__name__
+ self.core: Core = core
+ self.handle_event = self.core.handle_event
+
+ @property
+ def config(self) -> ConfigT:
+ default: Any = None
+ config_class = getattr(self, "Config", None)
+ if is_config_class(config_class):
+ return getattr(
+ self.core.config.service,
+ config_class.__config_name__,
+ default,
+ )
+ return default
+
+ @final
+ async def safe_run(self) -> None:
+ try:
+ await self.run()
+ except Exception as e:
+ self.core.error_or_exception(
+ f"Run service {self.__class__.__name__} failed:", e
+ )
+
+ @abstractmethod
+ async def run(self) -> None:
+ raise NotImplementedError
+
+ async def startup(self) -> None:
+ ...
+
+ async def shutdown(self) -> None:
+ ...
+
+ @overload
+ async def get(
+ self,
+ func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: None = None,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> EventT: ...
+
+ @overload
+ async def get(
+ self,
+ func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: Type[_EventT],
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> _EventT: ...
+
+ @final
+ async def get(
+ self,
+ func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None,
+ *,
+ event_type: Any = None,
+ max_try_times: Optional[int] = None,
+ timeout: Optional[Union[int, float]] = None,
+ ) -> Event[Any]:
+ return await self.core.get(
+ func,
+ event_type=event_type,
+ server_type=type(self),
+ max_try_times=max_try_times,
+ timeout=timeout,
+ ) \ No newline at end of file
diff --git a/src/hrc/service/console/__init__.py b/src/hrc/service/console/__init__.py
new file mode 100644
index 0000000..e11768e
--- /dev/null
+++ b/src/hrc/service/console/__init__.py
@@ -0,0 +1,41 @@
+import asyncio
+import sys
+from typing_extensions import override
+
+from hrc.event import MessageEvent
+from hrc.service import Service
+
+class ConsoleServiceEvent(MessageEvent["ConsoleService"]):
+ message: str
+
+ @override
+ def get_sender_id(self) -> None:
+ return None
+
+ @override
+ def get_plain_text(self) -> str:
+ return self.message
+
+ @override
+ async def reply(self, message: str) -> None:
+ return await self.service.send(message)
+
+ async def is_same_sender(self) -> bool:
+ return True
+
+class ConsoleService(Service[ConsoleServiceEvent, None]):
+ name: str = "console"
+
+ @override
+ async def run(self) -> None:
+ while not self.core.should_exit.is_set():
+ print("Please input message: ") # noqa: T201
+ message = await asyncio.get_event_loop().run_in_executor(
+ None, sys.stdin.readline
+ )
+ await self.handle_event(
+ ConsoleServiceEvent(service=self, type="message", message=message, rule="")
+ )
+
+ async def send(self, message: str) -> None:
+ print(f"Send a message: {message}") # noqa: T201 \ No newline at end of file
diff --git a/src/hrc/service/http/__init__.py b/src/hrc/service/http/__init__.py
new file mode 100644
index 0000000..a8d938b
--- /dev/null
+++ b/src/hrc/service/http/__init__.py
@@ -0,0 +1,33 @@
+from typing_extensions import override
+
+from aiohttp import web
+
+from hrc.service.utils import HttpServerService
+from hrc.event import Event
+from hrc.log import logger
+from aiohttp import web
+
+class HttpServerTestEvent(Event["HttpServerTestService"]):
+ """HTTP 服务端示例适配器事件类。"""
+
+ message: str
+
+
+class HttpServerTestService(HttpServerService[HttpServerTestEvent, None]):
+ name: str = "http_server_service"
+ get_url: str = "/"
+ post_url: str = "/"
+ host: str = "127.0.0.1"
+ port: int = 8080
+
+
+ @override
+ async def handle_response(self, request: web.Request) -> web.StreamResponse:
+ event = HttpServerTestEvent(
+ service=self,
+ type="message",
+ rule="",
+ message=await request.text(),
+ )
+ await self.handle_event(event)
+ return web.Response() \ No newline at end of file
diff --git a/src/hrc/service/utils.py b/src/hrc/service/utils.py
new file mode 100644
index 0000000..4f29e0c
--- /dev/null
+++ b/src/hrc/service/utils.py
@@ -0,0 +1,256 @@
+import asyncio
+from abc import ABCMeta, abstractmethod
+from typing import Literal, Optional, Union
+
+import aiohttp
+from aiohttp import web
+
+from hrc.service import Service
+from hrc.log import logger
+from hrc.typing import ConfigT, EventT
+
+__all__ = [
+ "PollingService",
+ "HttpClientService",
+ "WebSocketClientService",
+ "HttpServerService",
+ "WebSocketServerService",
+ "WebSocketService",
+]
+
+
+class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ """轮询式适配器示例。"""
+
+ delay: float = 0.1
+ create_task: bool = False
+ _on_tick_task: Optional["asyncio.Task[None]"] = None
+
+ async def run(self) -> None:
+ while not self.core.should_exit.is_set():
+ await asyncio.sleep(self.delay)
+ if self.create_task:
+ self._on_tick_task = asyncio.create_task(self.on_tick())
+ else:
+ await self.on_tick()
+
+ @abstractmethod
+ async def on_tick(self) -> None:
+ """当轮询发生。"""
+
+
+class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta):
+ session: aiohttp.ClientSession
+
+ async def startup(self) -> None:
+ self.session = aiohttp.ClientSession()
+
+ @abstractmethod
+ async def on_tick(self) -> None:
+ ...
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ await self.session.close()
+
+
+class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ url: str
+
+ async def run(self) -> None:
+ async with aiohttp.ClientSession() as session, session.ws_connect(
+ self.url
+ ) as ws:
+ msg: aiohttp.WSMessage
+ async for msg in ws:
+ if self.core.should_exit.is_set():
+ break
+ if msg.type == aiohttp.WSMsgType.ERROR:
+ break
+ await self.handle_response(msg)
+
+ @abstractmethod
+ async def handle_response(self, msg: aiohttp.WSMessage) -> None:
+ """处理响应。"""
+
+
+class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ app: web.Application
+ runner: web.AppRunner
+ site: web.TCPSite
+ host: str
+ port: int
+ get_url: str
+ post_url: str
+
+ async def startup(self) -> None:
+ """初始化适配器。"""
+ self.app = web.Application()
+ self.app.add_routes(
+ [
+ web.get(self.get_url, self.handle_response),
+ web.post(self.post_url, self.handle_response),
+ ]
+ )
+
+ async def run(self) -> None:
+ self.runner = web.AppRunner(self.app)
+ await self.runner.setup()
+ self.site = web.TCPSite(self.runner, self.host, self.port)
+ await self.site.start()
+
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ await self.runner.cleanup()
+
+ @abstractmethod
+ async def handle_response(self, request: web.Request) -> web.StreamResponse:
+ """处理响应。"""
+
+
+class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ app: web.Application
+ runner: web.AppRunner
+ site: web.TCPSite
+ websocket: web.WebSocketResponse
+ host: str
+ port: int
+ url: str
+
+ async def startup(self) -> None:
+ self.app = web.Application()
+ self.app.add_routes([web.get(self.url, self.handle_response)])
+
+ async def run(self) -> None:
+ self.runner = web.AppRunner(self.app)
+ await self.runner.setup()
+ self.site = web.TCPSite(self.runner, self.host, self.port)
+ await self.site.start()
+
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ await self.websocket.close()
+ await self.site.stop()
+ await self.runner.cleanup()
+
+ async def handle_response(self, request: web.Request) -> web.WebSocketResponse:
+ """处理 WebSocket。"""
+ ws = web.WebSocketResponse()
+ await ws.prepare(request)
+ self.websocket = ws
+
+ msg: aiohttp.WSMessage
+ async for msg in ws:
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ await self.handle_ws_response(msg)
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ break
+
+ return ws
+
+ @abstractmethod
+ async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None:
+ """处理 WebSocket 响应。"""
+
+
+class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ """
+ 同时支持 WebSocket 客户端和服务端。
+ """
+
+ websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = (
+ None
+ )
+
+ # ws
+ session: Optional[aiohttp.ClientSession]
+
+ # reverse-ws
+ app: Optional[web.Application]
+ runner: Optional[web.AppRunner]
+ site: Optional[web.TCPSite]
+
+ # config
+ service_type: Literal["ws", "reverse-ws"]
+ host: str
+ port: int
+ url: str
+ reconnect_interval: int = 3
+
+ async def startup(self) -> None:
+ if self.service_type == "ws":
+ self.session = aiohttp.ClientSession()
+ elif self.service_type == "reverse-ws":
+ self.app = web.Application()
+ self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)])
+ else:
+ logger.error(
+ 'Config "service_type" must be "ws" or "reverse-ws", not '
+ + self.service_type
+ )
+
+ async def run(self) -> None:
+ if self.service_type == "ws":
+ while True:
+ try:
+ await self.websocket_connect()
+ except aiohttp.ClientError as e:
+ self.core.error_or_exception("WebSocket connection error:", e)
+ if self.core.should_exit.is_set():
+ break
+ await asyncio.sleep(self.reconnect_interval)
+ elif self.service_type == "reverse-ws":
+ assert self.app is not None
+ self.runner = web.AppRunner(self.app)
+ await self.runner.setup()
+ self.site = web.TCPSite(self.runner, self.host, self.port)
+ await self.site.start()
+
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ if self.websocket is not None:
+ await self.websocket.close()
+ if self.service_type == "ws":
+ if self.session is not None:
+ await self.session.close()
+ elif self.service_type == "reverse-ws":
+ if self.site is not None:
+ await self.site.stop()
+ if self.runner is not None:
+ await self.runner.cleanup()
+
+ async def handle_reverse_ws_response(
+ self, request: web.Request
+ ) -> web.WebSocketResponse:
+ """处理 aiohttp WebSocket 服务器的接收。"""
+ self.websocket = web.WebSocketResponse()
+ await self.websocket.prepare(request)
+ await self.reverse_ws_connection_hook()
+ await self.handle_websocket()
+ return self.websocket
+
+ async def reverse_ws_connection_hook(self) -> None:
+ """反向 WebSocket 连接建立时的钩子函数。"""
+ logger.info("WebSocket connected!")
+
+ async def websocket_connect(self) -> None:
+ """创建正向 WebSocket 连接。"""
+ assert self.session is not None
+ logger.info("Tying to connect to WebSocket server...")
+ async with self.session.ws_connect(
+ f"ws://{self.host}:{self.port}{self.url}"
+ ) as self.websocket:
+ await self.handle_websocket()
+
+ async def handle_websocket(self) -> None:
+ """处理 WebSocket。"""
+ if self.websocket is None or self.websocket.closed:
+ return
+ async for msg in self.websocket:
+ await self.handle_websocket_msg(msg)
+ if not self.core.should_exit.is_set():
+ logger.warning("WebSocket connection closed!")
+
+ @abstractmethod
+ async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None:
+ """处理 WebSocket 消息。"""
+ raise NotImplementedError \ No newline at end of file
diff --git a/src/hrc/service/websocket/__init__.py b/src/hrc/service/websocket/__init__.py
new file mode 100644
index 0000000..3a7f089
--- /dev/null
+++ b/src/hrc/service/websocket/__init__.py
@@ -0,0 +1,30 @@
+from typing import Any, Coroutine
+from typing_extensions import override
+from aiohttp import web, ClientWebSocketResponse
+
+from hrc.service.utils import WebSocketService
+from hrc.event import Event
+from hrc.log import logger
+
+from aiohttp import web
+
+class WebSocketTestEvent(Event["WebSocketTestEvent"]):
+ message: str
+
+class WebSocketTestService(WebSocketService[WebSocketTestEvent, None]):
+ name: str = "websocket_test_service"
+ service_type: str = "reverse-ws"
+ host: str = "127.0.0.1"
+ port: int = 8765
+ url: str = "/"
+
+ @override
+ async def handle_reverse_ws_response(self, request: web.Request) -> Coroutine[Any, Any, ClientWebSocketResponse]:
+ event = WebSocketTestEvent(
+ service=self,
+ type="message",
+ message=await request.text()
+ )
+ logger.info(f"Receive {event}")
+ await self.handle_event(event)
+ return web.Response() \ No newline at end of file