aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/hrc/service/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hrc/service/utils.py')
-rw-r--r--src/hrc/service/utils.py256
1 files changed, 256 insertions, 0 deletions
diff --git a/src/hrc/service/utils.py b/src/hrc/service/utils.py
new file mode 100644
index 0000000..4f29e0c
--- /dev/null
+++ b/src/hrc/service/utils.py
@@ -0,0 +1,256 @@
+import asyncio
+from abc import ABCMeta, abstractmethod
+from typing import Literal, Optional, Union
+
+import aiohttp
+from aiohttp import web
+
+from hrc.service import Service
+from hrc.log import logger
+from hrc.typing import ConfigT, EventT
+
+__all__ = [
+ "PollingService",
+ "HttpClientService",
+ "WebSocketClientService",
+ "HttpServerService",
+ "WebSocketServerService",
+ "WebSocketService",
+]
+
+
+class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ """轮询式适配器示例。"""
+
+ delay: float = 0.1
+ create_task: bool = False
+ _on_tick_task: Optional["asyncio.Task[None]"] = None
+
+ async def run(self) -> None:
+ while not self.core.should_exit.is_set():
+ await asyncio.sleep(self.delay)
+ if self.create_task:
+ self._on_tick_task = asyncio.create_task(self.on_tick())
+ else:
+ await self.on_tick()
+
+ @abstractmethod
+ async def on_tick(self) -> None:
+ """当轮询发生。"""
+
+
+class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta):
+ session: aiohttp.ClientSession
+
+ async def startup(self) -> None:
+ self.session = aiohttp.ClientSession()
+
+ @abstractmethod
+ async def on_tick(self) -> None:
+ ...
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ await self.session.close()
+
+
+class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ url: str
+
+ async def run(self) -> None:
+ async with aiohttp.ClientSession() as session, session.ws_connect(
+ self.url
+ ) as ws:
+ msg: aiohttp.WSMessage
+ async for msg in ws:
+ if self.core.should_exit.is_set():
+ break
+ if msg.type == aiohttp.WSMsgType.ERROR:
+ break
+ await self.handle_response(msg)
+
+ @abstractmethod
+ async def handle_response(self, msg: aiohttp.WSMessage) -> None:
+ """处理响应。"""
+
+
+class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ app: web.Application
+ runner: web.AppRunner
+ site: web.TCPSite
+ host: str
+ port: int
+ get_url: str
+ post_url: str
+
+ async def startup(self) -> None:
+ """初始化适配器。"""
+ self.app = web.Application()
+ self.app.add_routes(
+ [
+ web.get(self.get_url, self.handle_response),
+ web.post(self.post_url, self.handle_response),
+ ]
+ )
+
+ async def run(self) -> None:
+ self.runner = web.AppRunner(self.app)
+ await self.runner.setup()
+ self.site = web.TCPSite(self.runner, self.host, self.port)
+ await self.site.start()
+
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ await self.runner.cleanup()
+
+ @abstractmethod
+ async def handle_response(self, request: web.Request) -> web.StreamResponse:
+ """处理响应。"""
+
+
+class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ app: web.Application
+ runner: web.AppRunner
+ site: web.TCPSite
+ websocket: web.WebSocketResponse
+ host: str
+ port: int
+ url: str
+
+ async def startup(self) -> None:
+ self.app = web.Application()
+ self.app.add_routes([web.get(self.url, self.handle_response)])
+
+ async def run(self) -> None:
+ self.runner = web.AppRunner(self.app)
+ await self.runner.setup()
+ self.site = web.TCPSite(self.runner, self.host, self.port)
+ await self.site.start()
+
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ await self.websocket.close()
+ await self.site.stop()
+ await self.runner.cleanup()
+
+ async def handle_response(self, request: web.Request) -> web.WebSocketResponse:
+ """处理 WebSocket。"""
+ ws = web.WebSocketResponse()
+ await ws.prepare(request)
+ self.websocket = ws
+
+ msg: aiohttp.WSMessage
+ async for msg in ws:
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ await self.handle_ws_response(msg)
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ break
+
+ return ws
+
+ @abstractmethod
+ async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None:
+ """处理 WebSocket 响应。"""
+
+
+class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta):
+ """
+ 同时支持 WebSocket 客户端和服务端。
+ """
+
+ websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = (
+ None
+ )
+
+ # ws
+ session: Optional[aiohttp.ClientSession]
+
+ # reverse-ws
+ app: Optional[web.Application]
+ runner: Optional[web.AppRunner]
+ site: Optional[web.TCPSite]
+
+ # config
+ service_type: Literal["ws", "reverse-ws"]
+ host: str
+ port: int
+ url: str
+ reconnect_interval: int = 3
+
+ async def startup(self) -> None:
+ if self.service_type == "ws":
+ self.session = aiohttp.ClientSession()
+ elif self.service_type == "reverse-ws":
+ self.app = web.Application()
+ self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)])
+ else:
+ logger.error(
+ 'Config "service_type" must be "ws" or "reverse-ws", not '
+ + self.service_type
+ )
+
+ async def run(self) -> None:
+ if self.service_type == "ws":
+ while True:
+ try:
+ await self.websocket_connect()
+ except aiohttp.ClientError as e:
+ self.core.error_or_exception("WebSocket connection error:", e)
+ if self.core.should_exit.is_set():
+ break
+ await asyncio.sleep(self.reconnect_interval)
+ elif self.service_type == "reverse-ws":
+ assert self.app is not None
+ self.runner = web.AppRunner(self.app)
+ await self.runner.setup()
+ self.site = web.TCPSite(self.runner, self.host, self.port)
+ await self.site.start()
+
+ async def shutdown(self) -> None:
+ """关闭并清理连接。"""
+ if self.websocket is not None:
+ await self.websocket.close()
+ if self.service_type == "ws":
+ if self.session is not None:
+ await self.session.close()
+ elif self.service_type == "reverse-ws":
+ if self.site is not None:
+ await self.site.stop()
+ if self.runner is not None:
+ await self.runner.cleanup()
+
+ async def handle_reverse_ws_response(
+ self, request: web.Request
+ ) -> web.WebSocketResponse:
+ """处理 aiohttp WebSocket 服务器的接收。"""
+ self.websocket = web.WebSocketResponse()
+ await self.websocket.prepare(request)
+ await self.reverse_ws_connection_hook()
+ await self.handle_websocket()
+ return self.websocket
+
+ async def reverse_ws_connection_hook(self) -> None:
+ """反向 WebSocket 连接建立时的钩子函数。"""
+ logger.info("WebSocket connected!")
+
+ async def websocket_connect(self) -> None:
+ """创建正向 WebSocket 连接。"""
+ assert self.session is not None
+ logger.info("Tying to connect to WebSocket server...")
+ async with self.session.ws_connect(
+ f"ws://{self.host}:{self.port}{self.url}"
+ ) as self.websocket:
+ await self.handle_websocket()
+
+ async def handle_websocket(self) -> None:
+ """处理 WebSocket。"""
+ if self.websocket is None or self.websocket.closed:
+ return
+ async for msg in self.websocket:
+ await self.handle_websocket_msg(msg)
+ if not self.core.should_exit.is_set():
+ logger.warning("WebSocket connection closed!")
+
+ @abstractmethod
+ async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None:
+ """处理 WebSocket 消息。"""
+ raise NotImplementedError \ No newline at end of file