aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/hrc/service
diff options
context:
space:
mode:
authorHsiangNianian <i@jyunko.cn>2025-01-04 22:38:23 +0800
committerHsiangNianian <i@jyunko.cn>2025-01-04 22:38:23 +0800
commitc990518cb533a793399e44edbb4bc036342c7175 (patch)
tree8e2bd0f833b803a73dea7d88e7c294cf3d078d4d /hrc/service
parentbc57c1410c08323ba37114082d0fe609fafc2c5d (diff)
downloadHydroRollCore-c990518cb533a793399e44edbb4bc036342c7175.tar.gz
HydroRollCore-c990518cb533a793399e44edbb4bc036342c7175.zip
feat(core): Initialize core components and configuration modelsHEADmain
Diffstat (limited to 'hrc/service')
-rw-r--r--hrc/service/__init__.py111
-rw-r--r--hrc/service/console/__init__.py41
-rw-r--r--hrc/service/http/__init__.py33
-rw-r--r--hrc/service/utils.py256
-rw-r--r--hrc/service/websocket/__init__.py30
5 files changed, 0 insertions, 471 deletions
diff --git a/hrc/service/__init__.py b/hrc/service/__init__.py
deleted file mode 100644
index f153d5b..0000000
--- a/hrc/service/__init__.py
+++ /dev/null
@@ -1,111 +0,0 @@
-import os
-from abc import ABC, abstractmethod
-from typing import (
- TYPE_CHECKING,
- Any,
- Awaitable,
- Callable,
- Generic,
- Optional,
- Type,
- TypeVar,
- Union,
- final,
- overload,
-)
-
-from hrc.event import Event
-from hrc.typing import ConfigT, EventT
-from hrc.utils import is_config_class
-
-if TYPE_CHECKING:
- from ..core import Core
-
-__all__ = ["Server"]
-
-if os.getenv("IAMAI_DEV") == "1": # pragma: no cover
- # 当处于开发环境时,使用 pkg_resources 风格的命名空间包
- __import__("pkg_resources").declare_namespace(__name__)
-
-
-_EventT = TypeVar("_EventT", bound="Event[Any]")
-
-
-class Service(Generic[EventT, ConfigT], ABC):
- name: str
- core: "Core"
- Config: Type[ConfigT]
-
- def __init__(self, core: "Core") -> None:
- if not hasattr(self, "name"):
- self.name = self.__class__.__name__
- self.core: Core = core
- self.handle_event = self.core.handle_event
-
- @property
- def config(self) -> ConfigT:
- default: Any = None
- config_class = getattr(self, "Config", None)
- if is_config_class(config_class):
- return getattr(
- self.core.config.service,
- config_class.__config_name__,
- default,
- )
- return default
-
- @final
- async def safe_run(self) -> None:
- try:
- await self.run()
- except Exception as e:
- self.core.error_or_exception(
- f"Run service {self.__class__.__name__} failed:", e
- )
-
- @abstractmethod
- async def run(self) -> None:
- raise NotImplementedError
-
- async def startup(self) -> None:
- ...
-
- async def shutdown(self) -> None:
- ...
-
- @overload
- async def get(
- self,
- func: Optional[Callable[[EventT], Union[bool, Awaitable[bool]]]] = None,
- *,
- event_type: None = None,
- max_try_times: Optional[int] = None,
- timeout: Optional[Union[int, float]] = None,
- ) -> EventT: ...
-
- @overload
- async def get(
- self,
- func: Optional[Callable[[_EventT], Union[bool, Awaitable[bool]]]] = None,
- *,
- event_type: Type[_EventT],
- max_try_times: Optional[int] = None,
- timeout: Optional[Union[int, float]] = None,
- ) -> _EventT: ...
-
- @final
- async def get(
- self,
- func: Optional[Callable[[Any], Union[bool, Awaitable[bool]]]] = None,
- *,
- event_type: Any = None,
- max_try_times: Optional[int] = None,
- timeout: Optional[Union[int, float]] = None,
- ) -> Event[Any]:
- return await self.core.get(
- func,
- event_type=event_type,
- server_type=type(self),
- max_try_times=max_try_times,
- timeout=timeout,
- ) \ No newline at end of file
diff --git a/hrc/service/console/__init__.py b/hrc/service/console/__init__.py
deleted file mode 100644
index e11768e..0000000
--- a/hrc/service/console/__init__.py
+++ /dev/null
@@ -1,41 +0,0 @@
-import asyncio
-import sys
-from typing_extensions import override
-
-from hrc.event import MessageEvent
-from hrc.service import Service
-
-class ConsoleServiceEvent(MessageEvent["ConsoleService"]):
- message: str
-
- @override
- def get_sender_id(self) -> None:
- return None
-
- @override
- def get_plain_text(self) -> str:
- return self.message
-
- @override
- async def reply(self, message: str) -> None:
- return await self.service.send(message)
-
- async def is_same_sender(self) -> bool:
- return True
-
-class ConsoleService(Service[ConsoleServiceEvent, None]):
- name: str = "console"
-
- @override
- async def run(self) -> None:
- while not self.core.should_exit.is_set():
- print("Please input message: ") # noqa: T201
- message = await asyncio.get_event_loop().run_in_executor(
- None, sys.stdin.readline
- )
- await self.handle_event(
- ConsoleServiceEvent(service=self, type="message", message=message, rule="")
- )
-
- async def send(self, message: str) -> None:
- print(f"Send a message: {message}") # noqa: T201 \ No newline at end of file
diff --git a/hrc/service/http/__init__.py b/hrc/service/http/__init__.py
deleted file mode 100644
index a8d938b..0000000
--- a/hrc/service/http/__init__.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from typing_extensions import override
-
-from aiohttp import web
-
-from hrc.service.utils import HttpServerService
-from hrc.event import Event
-from hrc.log import logger
-from aiohttp import web
-
-class HttpServerTestEvent(Event["HttpServerTestService"]):
- """HTTP 服务端示例适配器事件类。"""
-
- message: str
-
-
-class HttpServerTestService(HttpServerService[HttpServerTestEvent, None]):
- name: str = "http_server_service"
- get_url: str = "/"
- post_url: str = "/"
- host: str = "127.0.0.1"
- port: int = 8080
-
-
- @override
- async def handle_response(self, request: web.Request) -> web.StreamResponse:
- event = HttpServerTestEvent(
- service=self,
- type="message",
- rule="",
- message=await request.text(),
- )
- await self.handle_event(event)
- return web.Response() \ No newline at end of file
diff --git a/hrc/service/utils.py b/hrc/service/utils.py
deleted file mode 100644
index 4f29e0c..0000000
--- a/hrc/service/utils.py
+++ /dev/null
@@ -1,256 +0,0 @@
-import asyncio
-from abc import ABCMeta, abstractmethod
-from typing import Literal, Optional, Union
-
-import aiohttp
-from aiohttp import web
-
-from hrc.service import Service
-from hrc.log import logger
-from hrc.typing import ConfigT, EventT
-
-__all__ = [
- "PollingService",
- "HttpClientService",
- "WebSocketClientService",
- "HttpServerService",
- "WebSocketServerService",
- "WebSocketService",
-]
-
-
-class PollingService(Service[EventT, ConfigT], metaclass=ABCMeta):
- """轮询式适配器示例。"""
-
- delay: float = 0.1
- create_task: bool = False
- _on_tick_task: Optional["asyncio.Task[None]"] = None
-
- async def run(self) -> None:
- while not self.core.should_exit.is_set():
- await asyncio.sleep(self.delay)
- if self.create_task:
- self._on_tick_task = asyncio.create_task(self.on_tick())
- else:
- await self.on_tick()
-
- @abstractmethod
- async def on_tick(self) -> None:
- """当轮询发生。"""
-
-
-class HttpClientService(PollingService[EventT, ConfigT], metaclass=ABCMeta):
- session: aiohttp.ClientSession
-
- async def startup(self) -> None:
- self.session = aiohttp.ClientSession()
-
- @abstractmethod
- async def on_tick(self) -> None:
- ...
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- await self.session.close()
-
-
-class WebSocketClientService(Service[EventT, ConfigT], metaclass=ABCMeta):
- url: str
-
- async def run(self) -> None:
- async with aiohttp.ClientSession() as session, session.ws_connect(
- self.url
- ) as ws:
- msg: aiohttp.WSMessage
- async for msg in ws:
- if self.core.should_exit.is_set():
- break
- if msg.type == aiohttp.WSMsgType.ERROR:
- break
- await self.handle_response(msg)
-
- @abstractmethod
- async def handle_response(self, msg: aiohttp.WSMessage) -> None:
- """处理响应。"""
-
-
-class HttpServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
- app: web.Application
- runner: web.AppRunner
- site: web.TCPSite
- host: str
- port: int
- get_url: str
- post_url: str
-
- async def startup(self) -> None:
- """初始化适配器。"""
- self.app = web.Application()
- self.app.add_routes(
- [
- web.get(self.get_url, self.handle_response),
- web.post(self.post_url, self.handle_response),
- ]
- )
-
- async def run(self) -> None:
- self.runner = web.AppRunner(self.app)
- await self.runner.setup()
- self.site = web.TCPSite(self.runner, self.host, self.port)
- await self.site.start()
-
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- await self.runner.cleanup()
-
- @abstractmethod
- async def handle_response(self, request: web.Request) -> web.StreamResponse:
- """处理响应。"""
-
-
-class WebSocketServerService(Service[EventT, ConfigT], metaclass=ABCMeta):
- app: web.Application
- runner: web.AppRunner
- site: web.TCPSite
- websocket: web.WebSocketResponse
- host: str
- port: int
- url: str
-
- async def startup(self) -> None:
- self.app = web.Application()
- self.app.add_routes([web.get(self.url, self.handle_response)])
-
- async def run(self) -> None:
- self.runner = web.AppRunner(self.app)
- await self.runner.setup()
- self.site = web.TCPSite(self.runner, self.host, self.port)
- await self.site.start()
-
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- await self.websocket.close()
- await self.site.stop()
- await self.runner.cleanup()
-
- async def handle_response(self, request: web.Request) -> web.WebSocketResponse:
- """处理 WebSocket。"""
- ws = web.WebSocketResponse()
- await ws.prepare(request)
- self.websocket = ws
-
- msg: aiohttp.WSMessage
- async for msg in ws:
- if msg.type == aiohttp.WSMsgType.TEXT:
- await self.handle_ws_response(msg)
- elif msg.type == aiohttp.WSMsgType.ERROR:
- break
-
- return ws
-
- @abstractmethod
- async def handle_ws_response(self, msg: aiohttp.WSMessage) -> None:
- """处理 WebSocket 响应。"""
-
-
-class WebSocketService(Service[EventT, ConfigT], metaclass=ABCMeta):
- """
- 同时支持 WebSocket 客户端和服务端。
- """
-
- websocket: Union[web.WebSocketResponse, aiohttp.ClientWebSocketResponse, None] = (
- None
- )
-
- # ws
- session: Optional[aiohttp.ClientSession]
-
- # reverse-ws
- app: Optional[web.Application]
- runner: Optional[web.AppRunner]
- site: Optional[web.TCPSite]
-
- # config
- service_type: Literal["ws", "reverse-ws"]
- host: str
- port: int
- url: str
- reconnect_interval: int = 3
-
- async def startup(self) -> None:
- if self.service_type == "ws":
- self.session = aiohttp.ClientSession()
- elif self.service_type == "reverse-ws":
- self.app = web.Application()
- self.app.add_routes([web.get(self.url, self.handle_reverse_ws_response)])
- else:
- logger.error(
- 'Config "service_type" must be "ws" or "reverse-ws", not '
- + self.service_type
- )
-
- async def run(self) -> None:
- if self.service_type == "ws":
- while True:
- try:
- await self.websocket_connect()
- except aiohttp.ClientError as e:
- self.core.error_or_exception("WebSocket connection error:", e)
- if self.core.should_exit.is_set():
- break
- await asyncio.sleep(self.reconnect_interval)
- elif self.service_type == "reverse-ws":
- assert self.app is not None
- self.runner = web.AppRunner(self.app)
- await self.runner.setup()
- self.site = web.TCPSite(self.runner, self.host, self.port)
- await self.site.start()
-
- async def shutdown(self) -> None:
- """关闭并清理连接。"""
- if self.websocket is not None:
- await self.websocket.close()
- if self.service_type == "ws":
- if self.session is not None:
- await self.session.close()
- elif self.service_type == "reverse-ws":
- if self.site is not None:
- await self.site.stop()
- if self.runner is not None:
- await self.runner.cleanup()
-
- async def handle_reverse_ws_response(
- self, request: web.Request
- ) -> web.WebSocketResponse:
- """处理 aiohttp WebSocket 服务器的接收。"""
- self.websocket = web.WebSocketResponse()
- await self.websocket.prepare(request)
- await self.reverse_ws_connection_hook()
- await self.handle_websocket()
- return self.websocket
-
- async def reverse_ws_connection_hook(self) -> None:
- """反向 WebSocket 连接建立时的钩子函数。"""
- logger.info("WebSocket connected!")
-
- async def websocket_connect(self) -> None:
- """创建正向 WebSocket 连接。"""
- assert self.session is not None
- logger.info("Tying to connect to WebSocket server...")
- async with self.session.ws_connect(
- f"ws://{self.host}:{self.port}{self.url}"
- ) as self.websocket:
- await self.handle_websocket()
-
- async def handle_websocket(self) -> None:
- """处理 WebSocket。"""
- if self.websocket is None or self.websocket.closed:
- return
- async for msg in self.websocket:
- await self.handle_websocket_msg(msg)
- if not self.core.should_exit.is_set():
- logger.warning("WebSocket connection closed!")
-
- @abstractmethod
- async def handle_websocket_msg(self, msg: aiohttp.WSMessage) -> None:
- """处理 WebSocket 消息。"""
- raise NotImplementedError \ No newline at end of file
diff --git a/hrc/service/websocket/__init__.py b/hrc/service/websocket/__init__.py
deleted file mode 100644
index 3a7f089..0000000
--- a/hrc/service/websocket/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-from typing import Any, Coroutine
-from typing_extensions import override
-from aiohttp import web, ClientWebSocketResponse
-
-from hrc.service.utils import WebSocketService
-from hrc.event import Event
-from hrc.log import logger
-
-from aiohttp import web
-
-class WebSocketTestEvent(Event["WebSocketTestEvent"]):
- message: str
-
-class WebSocketTestService(WebSocketService[WebSocketTestEvent, None]):
- name: str = "websocket_test_service"
- service_type: str = "reverse-ws"
- host: str = "127.0.0.1"
- port: int = 8765
- url: str = "/"
-
- @override
- async def handle_reverse_ws_response(self, request: web.Request) -> Coroutine[Any, Any, ClientWebSocketResponse]:
- event = WebSocketTestEvent(
- service=self,
- type="message",
- message=await request.text()
- )
- logger.info(f"Receive {event}")
- await self.handle_event(event)
- return web.Response() \ No newline at end of file