aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/hrc/service/utils.py
diff options
context:
space:
mode:
authorHsiangNianian <i@jyunko.cn>2025-01-04 22:38:23 +0800
committerHsiangNianian <i@jyunko.cn>2025-01-04 22:38:23 +0800
commitc990518cb533a793399e44edbb4bc036342c7175 (patch)
tree8e2bd0f833b803a73dea7d88e7c294cf3d078d4d /hrc/service/utils.py
parentbc57c1410c08323ba37114082d0fe609fafc2c5d (diff)
downloadHydroRollCore-c990518cb533a793399e44edbb4bc036342c7175.tar.gz
HydroRollCore-c990518cb533a793399e44edbb4bc036342c7175.zip
feat(core): Initialize core components and configuration modelsHEADmain
Diffstat (limited to 'hrc/service/utils.py')
-rw-r--r--hrc/service/utils.py256
1 files changed, 0 insertions, 256 deletions
diff --git a/hrc/service/utils.py b/hrc/service/utils.py
deleted file mode 100644
index 4f29e0c..0000000
--- a/hrc/service/utils.py
+++ /dev/null
@@ -1,256 +0,0 @@
-import asyncio
-from abc import ABCMeta, abstractmethod
-from typing import Literal, Optional, Union
-
-import aiohttp
-from aiohttp import web
-
-from hrc.service import Service
-from hrc.log import logger
-from hrc.typing import ConfigT, EventT
-
-__all__ = [
- "PollingService",
- "HttpClientService",
- "WebSocketClientService",
- "HttpServerService",
- "WebSocketServerService",
- "WebSocketService",
-]
-
-
-class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta):
- """轮询式适配器示例。"""
-
- delay: float = 0.1
- create_task: bool = False
- _on_tick_task: Optional["asyncio.Task[None]"] = None
-
- async def run(self) -> None:
- while not self.core.should_exit.is_set():
- await asyncio.sleep(self.delay)
- if self.create_task:
- self._on_tick_task = asyncio.create_task(self.on_tick())
- else:
- await self.on_tick()
-
- @abstractmethod
- async def on_tick(self) -> None:
- """当轮询发生。"""
-
-
-class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta):
- session: aiohttp.ClientSession
-
- async def startup(self) -> None:
- self.session = aiohttp.ClientSession()
-
- @abstractmethod
- async def on_tick(self) -> None:
- ...
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- await self.session.close()
-
-
-class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta):
- url: str
-
- async def run(self) -> None:
- async with aiohttp.ClientSession() as session, session.ws_connect(
- self.url
- ) as ws:
- msg: aiohttp.WSMessage
- async for msg in ws:
- if self.core.should_exit.is_set():
- break
- if msg.type == aiohttp.WSMsgType.ERROR:
- break
- await self.handle_response(msg)
-
- @abstractmethod
- async def handle_response(self, msg: aiohttp.WSMessage) -> None:
- """处理响应。"""
-
-
-class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
- app: web.Application
- runner: web.AppRunner
- site: web.TCPSite
- host: str
- port: int
- get_url: str
- post_url: str
-
- async def startup(self) -> None:
- """初始化适配器。"""
- self.app = web.Application()
- self.app.add_routes(
- [
- web.get(self.get_url, self.handle_response),
- web.post(self.post_url, self.handle_response),
- ]
- )
-
- async def run(self) -> None:
- self.runner = web.AppRunner(self.app)
- await self.runner.setup()
- self.site = web.TCPSite(self.runner, self.host, self.port)
- await self.site.start()
-
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- await self.runner.cleanup()
-
- @abstractmethod
- async def handle_response(self, request: web.Request) -> web.StreamResponse:
- """处理响应。"""
-
-
-class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
- app: web.Application
- runner: web.AppRunner
- site: web.TCPSite
- websocket: web.WebSocketResponse
- host: str
- port: int
- url: str
-
- async def startup(self) -> None:
- self.app = web.Application()
- self.app.add_routes([web.get(self.url, self.handle_response)])
-
- async def run(self) -> None:
- self.runner = web.AppRunner(self.app)
- await self.runner.setup()
- self.site = web.TCPSite(self.runner, self.host, self.port)
- await self.site.start()
-
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- await self.websocket.close()
- await self.site.stop()
- await self.runner.cleanup()
-
- async def handle_response(self, request: web.Request) -> web.WebSocketResponse:
- """处理 WebSocket。"""
- ws = web.WebSocketResponse()
- await ws.prepare(request)
- self.websocket = ws
-
- msg: aiohttp.WSMessage
- async for msg in ws:
- if msg.type == aiohttp.WSMsgType.TEXT:
- await self.handle_ws_response(msg)
- elif msg.type == aiohttp.WSMsgType.ERROR:
- break
-
- return ws
-
- @abstractmethod
- async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None:
- """处理 WebSocket 响应。"""
-
-
-class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta):
- """
- 同时支持 WebSocket 客户端和服务端。
- """
-
- websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = (
- None
- )
-
- # ws
- session: Optional[aiohttp.ClientSession]
-
- # reverse-ws
- app: Optional[web.Application]
- runner: Optional[web.AppRunner]
- site: Optional[web.TCPSite]
-
- # config
- service_type: Literal["ws", "reverse-ws"]
- host: str
- port: int
- url: str
- reconnect_interval: int = 3
-
- async def startup(self) -> None:
- if self.service_type == "ws":
- self.session = aiohttp.ClientSession()
- elif self.service_type == "reverse-ws":
- self.app = web.Application()
- self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)])
- else:
- logger.error(
- 'Config "service_type" must be "ws" or "reverse-ws", not '
- + self.service_type
- )
-
- async def run(self) -> None:
- if self.service_type == "ws":
- while True:
- try:
- await self.websocket_connect()
- except aiohttp.ClientError as e:
- self.core.error_or_exception("WebSocket connection error:", e)
- if self.core.should_exit.is_set():
- break
- await asyncio.sleep(self.reconnect_interval)
- elif self.service_type == "reverse-ws":
- assert self.app is not None
- self.runner = web.AppRunner(self.app)
- await self.runner.setup()
- self.site = web.TCPSite(self.runner, self.host, self.port)
- await self.site.start()
-
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- if self.websocket is not None:
- await self.websocket.close()
- if self.service_type == "ws":
- if self.session is not None:
- await self.session.close()
- elif self.service_type == "reverse-ws":
- if self.site is not None:
- await self.site.stop()
- if self.runner is not None:
- await self.runner.cleanup()
-
- async def handle_reverse_ws_response(
- self, request: web.Request
- ) -> web.WebSocketResponse:
- """处理 aiohttp WebSocket 服务器的接收。"""
- self.websocket = web.WebSocketResponse()
- await self.websocket.prepare(request)
- await self.reverse_ws_connection_hook()
- await self.handle_websocket()
- return self.websocket
-
- async def reverse_ws_connection_hook(self) -> None:
- """反向 WebSocket 连接建立时的钩子函数。"""
- logger.info("WebSocket connected!")
-
- async def websocket_connect(self) -> None:
- """创建正向 WebSocket 连接。"""
- assert self.session is not None
- logger.info("Tying to connect to WebSocket server...")
- async with self.session.ws_connect(
- f"ws://{self.host}:{self.port}{self.url}"
- ) as self.websocket:
- await self.handle_websocket()
-
- async def handle_websocket(self) -> None:
- """处理 WebSocket。"""
- if self.websocket is None or self.websocket.closed:
- return
- async for msg in self.websocket:
- await self.handle_websocket_msg(msg)
- if not self.core.should_exit.is_set():
- logger.warning("WebSocket connection closed!")
-
- @abstractmethod
- async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None:
- """处理 WebSocket 消息。"""
- raise NotImplementedError \ No newline at end of file