aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src
diff options
context:
space:
mode:
authorHsiangNianian <i@jyunko.cn>2025-08-02 11:30:27 +0800
committerHsiangNianian <i@jyunko.cn>2025-08-02 11:30:27 +0800
commitec4566c3d17370c778a1e6cd6b22ed85263731a0 (patch)
tree469258084d3ae0e72db37dd98593731ff20131e4 /src
parentc68a18ca6440f6460b3b5fce311901229f8f50a9 (diff)
downloadinfini-ec4566c3d17370c778a1e6cd6b22ed85263731a0.tar.gz
infini-ec4566c3d17370c778a1e6cd6b22ed85263731a0.zip
refactor(v3): publish v3 branch
Diffstat (limited to 'src')
-rw-r--r--src/infini/__init__.py3
-rw-r--r--src/infini/core.py93
-rw-r--r--src/infini/doc.py45
-rw-r--r--src/infini/exceptions.py18
-rw-r--r--src/infini/generator.py68
-rw-r--r--src/infini/handler.py43
-rw-r--r--src/infini/injector.py50
-rw-r--r--src/infini/input.py43
-rw-r--r--src/infini/interceptor.py89
-rw-r--r--src/infini/internal.py43
-rw-r--r--src/infini/loader.py259
-rw-r--r--src/infini/logging.py23
-rw-r--r--src/infini/output.py31
-rw-r--r--src/infini/queue.py47
-rw-r--r--src/infini/register.py206
-rw-r--r--src/infini/router.py60
16 files changed, 3 insertions, 1118 deletions
diff --git a/src/infini/__init__.py b/src/infini/__init__.py
index e69de29b..2cc6f09b 100644
--- a/src/infini/__init__.py
+++ b/src/infini/__init__.py
@@ -0,0 +1,3 @@
+from importlib_metadata import version
+
+__version__ = version("infini")
diff --git a/src/infini/core.py b/src/infini/core.py
deleted file mode 100644
index 5694caff..00000000
--- a/src/infini/core.py
+++ /dev/null
@@ -1,93 +0,0 @@
-from infini.doc import Doc
-from infini.input import Input
-from infini.interceptor import Interceptor
-from infini.generator import Generator
-from infini.handler import Handler
-from infini.injector import Injector
-from infini.output import Output
-from infini.typing import Any, Generator as GeneratorT, Union
-from infini.exceptions import ValueError
-
-
-class Core:
- pre_interceptor: Interceptor
- handler: Handler
- generator: Generator
- interceptor: Interceptor
- injector: Injector
- doc: Doc
-
- def input(self, input: Input) -> GeneratorT[Union[str, Output], Any, None]:
- for pre_intercepted_stream in self.pre_intercept(input):
- if isinstance(pre_intercepted_stream, Output):
- if not isinstance(pre_intercepted_stream, Output):
- raise ValueError(
- "Interceptor functions should return or yield a `Output` object."
- )
- if pre_intercepted_stream.is_empty():
- return
- if pre_intercepted_stream.type == "workflow":
- yield pre_intercepted_stream
- if pre_intercepted_stream.block:
- while pre_intercepted_stream.status != 0:
- pass
- continue
- else:
- yield self.generate(pre_intercepted_stream) # TODO 拦截拦截器文本
-
- if pre_intercepted_stream.block:
- return
- else:
- input = pre_intercepted_stream
-
- for handled_stream in self.handle(input):
- if not isinstance(handled_stream, Output):
- raise ValueError(
- "Handler functions should return or yield a `Output` object."
- )
- if handled_stream.is_empty():
- return
- if handled_stream.type == "workflow":
- yield handled_stream
- if handled_stream.block:
- while handled_stream.status != 0:
- pass
- continue
- else:
- outcome = self.generate(handled_stream)
- for stream in self.intercept(handled_stream, outcome):
- if isinstance(stream, Output):
- if stream.is_empty():
- return
- if stream.type == "workflow":
- yield stream
- if stream.block:
- while stream.status != 0:
- pass
- else:
- yield self.generate(stream)
- if stream.block:
- return
- continue
- outcome = stream
- yield outcome
- if handled_stream.block:
- return
-
- def pre_intercept(
- self, input: Input
- ) -> GeneratorT[Union[Input, Output], Any, None]:
- return self.pre_interceptor.input(input)
-
- def handle(self, input: Input) -> GeneratorT[Output, Any, None]:
- iterator = self.handler.input(input)
- for output in iterator:
- yield output
-
- def generate(self, output: Output) -> str:
- return self.generator.output(output, self.injector)
-
- def intercept(
- self, output: Output, output_text: str
- ) -> GeneratorT[Union[str, Output], Any, None]:
- return self.interceptor.output(output, output_text)
diff --git a/src/infini/doc.py b/src/infini/doc.py
deleted file mode 100644
index 52a85fe2..00000000
--- a/src/infini/doc.py
+++ /dev/null
@@ -1,45 +0,0 @@
-from typing import Dict, Optional, TypedDict
-
-import json
-
-
-class Annotation(TypedDict, total=False):
- usage: Optional[str]
- description: Optional[str]
- content: Optional[str]
- epilog: Optional[str]
- var_doc: Dict[str, str]
- sub_cmd: Dict[str, "Annotation"]
-
-
-class Doc:
- pre_interceptors: Dict[str, Annotation]
- handlers: Dict[str, Annotation]
- events: Dict[str, Annotation]
- global_variables: Dict[str, Annotation]
- interceptors: Dict[str, Annotation]
-
- def __init__(self) -> None:
- self.pre_interceptors = {}
- self.handlers = {}
- self.events = {}
- self.global_variables = {}
- self.interceptors = {}
-
- def update(self, __object: "Doc") -> None:
- self.pre_interceptors.update(__object.pre_interceptors)
- self.handlers.update(__object.handlers)
- self.events.update(__object.events)
- self.global_variables.update(__object.global_variables)
- self.interceptors.update(__object.interceptors)
-
- def dumps(self) -> str:
- return json.dumps(
- {
- "pre_interceptors": self.pre_interceptors,
- "handlers": self.handlers,
- "events": self.events,
- "global_variables": self.global_variables,
- "interceptors": self.interceptors,
- }
- )
diff --git a/src/infini/exceptions.py b/src/infini/exceptions.py
deleted file mode 100644
index ac9c3d22..00000000
--- a/src/infini/exceptions.py
+++ /dev/null
@@ -1,18 +0,0 @@
-class InfiniException(Exception):
- """Infini 异常基类"""
-
-
-class KeyError(InfiniException):
- """键值错误"""
-
-
-class UnknownEvent(InfiniException):
- """文本事件不存在"""
-
-
-class UnknownEventType(InfiniException, TypeError):
- """事件类型不存在"""
-
-
-class ValueError(InfiniException, ValueError):
- """错误的数据"""
diff --git a/src/infini/generator.py b/src/infini/generator.py
deleted file mode 100644
index e2730ca3..00000000
--- a/src/infini/generator.py
+++ /dev/null
@@ -1,68 +0,0 @@
-from infini.output import Output
-from infini.typing import Dict, Callable, Union, Optional
-from infini.exceptions import UnknownEvent, UnknownEventType
-from infini.injector import Injector
-from jinja2 import Template
-
-import abc
-
-
-class BaseGenerator(metaclass=abc.ABCMeta):
- type: str
- events: Dict[str, str]
- global_variables: Dict[str, Union[str, Callable]]
-
- @abc.abstractmethod
- def output(self, output: Output, injector: Injector) -> str:
- raise NotImplementedError
-
- @abc.abstractmethod
- def match(self, output: Output) -> Template:
- raise NotImplementedError
-
-
-class TextGenerator(BaseGenerator):
- type = "text"
-
- def __init__(self) -> None:
- self.events = {}
- self.global_variables = {}
-
- def output(self, output: Output, injector: Injector) -> str:
- assert (
- output.type == self.type
- ), f"文本生成器应当传入类型为 '{self.type}' 的 Output 实例"
- variables = self.global_variables.copy()
- variables.update(output.variables)
- for name, variable in variables.items():
- if callable(variable):
- variables[name] = injector.output(variable, variables)
- return self.match(output).render(variables)
-
- def match(self, output: Output) -> Template:
- if context := self.events.get(output.name):
- return Template(context)
- raise UnknownEvent(f"事件不存在: {output.name}")
-
-
-class Generator:
- generators: Dict[str, BaseGenerator]
- events: Dict[str, str]
- global_variables: Dict[str, Union[str, Callable]]
-
- def __init__(self) -> None:
- self.generators = {"text": TextGenerator()}
-
- def output(self, output: Output, injector: Injector) -> str:
- assert (
- output.type != "workflow"
- ), "生成器应当传入类型为非 'workflow' 的 Output 实例"
- if not (generator := self.match(output)):
- raise UnknownEventType(f"没有为事件类型 '{output.type}' 注册生成器")
-
- generator.events = self.events
- generator.global_variables = self.global_variables
- return generator.output(output, injector)
-
- def match(self, output: Output) -> Optional[BaseGenerator]:
- return self.generators.get(output.type)
diff --git a/src/infini/handler.py b/src/infini/handler.py
deleted file mode 100644
index c0e09532..00000000
--- a/src/infini/handler.py
+++ /dev/null
@@ -1,43 +0,0 @@
-from infini.injector import Injector
-from infini.input import Input
-from infini.output import Output
-from infini.typing import List, Any, RouterType, Callable, Generator, Union
-from infini.queue import EventQueue
-
-
-class Handler:
- handlers: List[RouterType]
-
- def input(self, input: Input):
- injector = Injector()
- parameters = {
- "input": input,
- "plain_text": input.get_plain_text(),
- "user_id": input.get_user_id(),
- }
- if (queue := self.match(injector, **parameters)).is_empty():
- yield Output.empty()
- return
- while not queue.is_empty():
- if isinstance(
- stream := injector.output(queue.pop(), parameters=parameters), Generator
- ):
- for output in stream:
- yield output
- if output.block:
- return
- else:
- yield stream
- if stream.block:
- return
-
- def match(
- self, injector: Injector, **parameters
- ) -> EventQueue[Callable[..., Union[Output, Generator[Output, Any, None]]]]:
- queue = EventQueue()
-
- for handler in self.handlers:
- if injector.output(handler["router"].match, parameters=parameters):
- queue.push(handler["priority"], handler["handler"])
-
- return queue
diff --git a/src/infini/injector.py b/src/infini/injector.py
deleted file mode 100644
index 62032bb5..00000000
--- a/src/infini/injector.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import typing
-from infini.typing import Callable, T, Optional, Dict, Any
-from typing import get_args, get_origin
-
-import inspect
-
-
-class Injector:
- def __init__(self) -> None:
- self.parameters: Dict[str, Any] = {}
-
- def inject(
- self, func: Callable[..., T], parameters: Optional[Dict[str, Any]] = None
- ) -> Callable[[], T]:
- signature = inspect.signature(func)
- _parameters = {} if parameters is None else parameters
- parameters = self.parameters.copy()
- parameters.update(_parameters)
- inject_params = {}
- for param_name, param in signature.parameters.items():
- default = None if param.default == inspect._empty else param.default
- if param_name in parameters:
- origin = get_origin(param.annotation)
- if isinstance(origin, typing._SpecialForm):
- param_types = get_args(param.annotation)
- elif not origin:
- param_types = (param.annotation,)
- else:
- param_types = (origin,)
-
- if not any(
- isinstance(parameters[param_name], param_type)
- for param_type in param_types
- if not isinstance(param_type, typing._SpecialForm)
- ):
- raise ValueError(
- f"Parameter with name '{param_name}' has a mismatch type, "
- f"expected '{param.annotation!r}' but got '{type(parameters[param_name])!r}'."
- )
- inject_params[param_name] = parameters[param_name]
- else:
- inject_params[param_name] = default
- bound_args = signature.bind(**inject_params)
- bound_args.apply_defaults()
- return lambda: func(*bound_args.args, **bound_args.kwargs)
-
- def output(
- self, func: Callable[..., T], parameters: Optional[Dict[str, Any]] = None
- ) -> T:
- return self.inject(func, parameters)()
diff --git a/src/infini/input.py b/src/infini/input.py
deleted file mode 100644
index 2a926e2a..00000000
--- a/src/infini/input.py
+++ /dev/null
@@ -1,43 +0,0 @@
-from infini.typing import Literal, Optional
-from infini.typing import Dict, Any, Generic, T, Optional
-from infini.output import Output
-
-
-class Input(Generic[T]):
- plain_data: T
- variables: Dict[str, Any]
-
- def __init__(
- self, plain_data: T, variables: Optional[Dict[str, Any]] = None
- ) -> None:
- self.plain_data = plain_data
- self.variables = variables or {}
-
- def get_user_id(self) -> str:
- return self.variables.get("user_id", "0")
-
- def get_session_id(self) -> str:
- return (
- self.variables.get("group_id")
- or self.variables.get("session_id")
- or self.variables.get("user_id")
- or "0"
- )
-
- def is_tome(self) -> bool:
- return bool(self.variables.get("is_tome"))
-
- def get_plain_text(self) -> str:
- return str(self.plain_data)
-
- def output(
- self,
- type: Literal["text", "workflow"],
- name: str,
- *,
- block: bool = False,
- variables: Dict[str, Any] = {},
- ):
- vars = self.variables.copy()
- vars.update(variables)
- return Output(type, name, block=block, variables=vars)
diff --git a/src/infini/interceptor.py b/src/infini/interceptor.py
deleted file mode 100644
index c73386ad..00000000
--- a/src/infini/interceptor.py
+++ /dev/null
@@ -1,89 +0,0 @@
-from infini.injector import Injector
-from infini.input import Input
-from infini.output import Output
-from infini.typing import List, Any, RouterType, Callable, Generator, Union
-from infini.queue import EventQueue
-
-
-class Interceptor:
- interceptors: List[RouterType]
-
- def input(self, input: Input) -> Generator[Union[Output, Input], Any, None]:
- injector = Injector()
- parameters = {
- "input": input,
- "plain_text": input.get_plain_text(),
- "user_id": input.get_user_id(),
- }
- queue = self.match(injector, **parameters)
- while not queue.is_empty():
- if isinstance(
- stream := injector.output(queue.pop(), parameters=parameters),
- Generator,
- ):
- for outcome in stream:
- if isinstance(outcome, Input):
- input = outcome
- break
- yield outcome
- if outcome.block:
- return
- else:
- if stream is None:
- yield Output.empty()
- return
- if isinstance(stream, Output):
- yield stream
- if stream.block:
- return
- continue
- input = stream
- yield input
-
- def output(
- self, output: Output, output_text: str
- ) -> Generator[Union[Output, str], Any, None]:
- input = Input(output_text, variables=output.variables)
- injector = Injector()
- parameters = {
- "input": input,
- "output": output,
- "plain_text": output_text,
- "user_id": input.get_user_id(),
- }
- queue = self.match(injector, **parameters)
- while not queue.is_empty():
- if isinstance(
- stream := injector.output(queue.pop(), parameters=parameters), Generator
- ):
- for outcome in stream:
- if isinstance(outcome, Input):
- input = outcome
- break
- yield outcome
- if outcome.block:
- return
- else:
- if stream is None:
- yield Output.empty()
- return
- if isinstance(stream, Output):
- yield stream
- if stream.block:
- return
- continue
- input = stream
- yield input.get_plain_text()
-
- def match(
- self, injector: Injector, **parameters
- ) -> EventQueue[
- Callable[..., Union[Input, Output, Generator[Union[Input, Output], Any, None]]]
- ]:
- queue = EventQueue()
-
- for interceptor in self.interceptors:
- if injector.output(interceptor["router"].match, parameters=parameters):
- queue.push(interceptor["priority"], interceptor["handler"])
-
- return queue
diff --git a/src/infini/internal.py b/src/infini/internal.py
deleted file mode 100644
index d97503f2..00000000
--- a/src/infini/internal.py
+++ /dev/null
@@ -1,43 +0,0 @@
-from infini.core import Core
-from infini.loader import Loader
-from infini.register import Register
-from infini.typing import List, Optional
-from pathlib import Path
-
-import sys
-import inspect
-
-
-def require(name: str, paths: Optional[List] = None) -> Register:
- caller_frame = inspect.stack()[1][0]
- caller_file = caller_frame.f_globals["__file__"]
-
- default_paths = [Path(caller_file).resolve().parent]
- paths = [
- str(path)
- for path in ((list(paths) + default_paths) if paths else default_paths)
- ]
- (sys.path.insert(0, path) for path in paths)
-
- with Loader() as loader:
- loader.load(name)
- sys.path = sys.path[len(paths) - 1 :]
- register = loader.into_register()
-
- caller_frame.f_globals[f"{name}_register"] = register
- return register
-
-
-def acquire_core() -> Core:
- caller_frame = inspect.stack()[1][0]
- caller_name: str = caller_frame.f_globals["__name__"]
- top_name, *_ = caller_name.split(".")
- try:
- core = getattr(sys.modules[top_name], "__infini__")["core"]
- if not isinstance(core, Core):
- raise ValueError("Infini stack returned a mismatch instance.")
- return core
- except Exception as err:
- raise RuntimeError(
- "Infini Runtime is not accessible, perhaps infini core is down."
- ) from err
diff --git a/src/infini/loader.py b/src/infini/loader.py
deleted file mode 100644
index 13c26fdb..00000000
--- a/src/infini/loader.py
+++ /dev/null
@@ -1,259 +0,0 @@
-from importlib.util import spec_from_file_location
-from infini.core import Core
-from infini.doc import Doc
-from infini.generator import BaseGenerator, Generator, TextGenerator
-from infini.handler import Handler
-from infini.injector import Injector
-from infini.interceptor import Interceptor
-from infini.register import Register
-from infini.typing import (
- List,
- Dict,
- Sequence,
- ModuleType,
- RouterType,
- Callable,
- Union,
- Optional,
-)
-from infini.logging import logger
-from pathlib import Path
-
-import inspect
-import sys
-import importlib
-import importlib.abc
-
-
-class InfiniMetaFinder(importlib.abc.MetaPathFinder):
- def find_spec(self, fullname: str, path: Optional[Sequence[str]], target=None):
- default_entries = [
- Path.cwd().joinpath("packages"),
- Path.home().joinpath(".ipm", "src"),
- ] + [Path(path).resolve() for path in sys.path]
-
- entries: List[Path] = (
- [Path(catch_path).resolve() for catch_path in path] + default_entries
- if path
- else default_entries
- )
-
- if "." in fullname:
- *_, name = fullname.split(".")
- else:
- name = fullname
-
- for entry in entries:
- if (entry / name).is_dir():
- filename = entry / name / "src" / "__init__.py"
- submodule_locations = [entry / name / "src", entry / name / "packages"]
- if not filename.exists():
- filename = entry / name / "src" / (name + ".py")
- else:
- continue
- if not filename.exists():
- continue
-
- return spec_from_file_location(
- fullname,
- filename,
- loader=InfiniLoader(str(filename)),
- submodule_search_locations=(
- [
- str(submodule_location)
- for submodule_location in submodule_locations
- ]
- if submodule_locations
- else None
- ),
- )
-
- return None
-
-
-class InfiniLoader(importlib.abc.Loader):
- def __init__(self, filename):
- self.filename = filename
-
- def create_module(self, _):
- return None
-
- def exec_module(self, module):
- exec(Path(self.filename).read_text("utf-8"), vars(module))
-
-
-def _install():
- sys.meta_path.insert(0, InfiniMetaFinder())
-
-
-def _uninstall():
- for meta_path in sys.meta_path:
- if isinstance(meta_path, InfiniMetaFinder):
- sys.meta_path.remove(meta_path)
-
-
-class Loader:
- pre_interceptors: List[RouterType]
- handlers: List[RouterType]
- events: Dict[str, str]
- global_variables: Dict[str, Union[str, Callable]]
- interceptors: List[RouterType]
- generators: Dict[str, BaseGenerator]
- doc: Doc
-
- _core: Core
-
- def __init__(self) -> None:
- self.pre_interceptors = []
- self.handlers = []
- self.events = {}
- self.global_variables = {}
- self.interceptors = []
- self.generators = {}
- self.doc = Doc()
- self.prepare()
-
- def __enter__(self) -> "Loader":
- self.prepare()
- return self
-
- def __exit__(self, exc_type, exc_value, _) -> None:
- self.close()
- if exc_type is not None:
- raise exc_type(exc_value)
-
- def find_register_variables(self, module: ModuleType) -> List[Register]:
- module_variables = inspect.getmembers(module)
- register_variables = [
- var for _, var in module_variables if isinstance(var, Register)
- ]
- return register_variables
-
- def prepare(self) -> None:
- self._core = Core()
- _install()
-
- def load(self, name: str) -> ModuleType:
- self.prepare()
-
- module = importlib.import_module(name)
- self.load_from_module(module)
- return module
-
- def load_from_module(self, module: ModuleType) -> None:
- vars(module)["__infini__"] = {"core": self._core, "loader": self}
- registers = self.find_register_variables(module)
- self.load_from_registers(registers)
- if not registers:
- logger.warning(
- f"Infini 装载器未能在规则包 [bold green]{module.__name__}[/bold green] 中找到注册器."
- )
-
- def load_from_registers(self, registers: Sequence[Register]):
- for register in registers:
- self.load_from_register(register)
-
- def load_from_register(self, register: Register):
- self.pre_interceptors = self._update_list(
- self.pre_interceptors, register.pre_interceptors
- )
- self.handlers = self._update_list(self.handlers, register.handlers)
- self.events.update(register.events)
- self.global_variables.update(register.global_variables)
- self.interceptors = self._update_list(self.interceptors, register.interceptors)
- self.doc.update(register.doc)
-
- def _update_list(self, old_list: List[RouterType], new_list: List[RouterType]):
- list = old_list.copy()
- for value in new_list:
- exists = False
- for old_value in old_list:
- if old_value["router"] == value["router"]:
- if value["priority"] > old_value["priority"]:
- list.remove(old_value)
- else:
- exists = True
- break
- if not exists:
- list.append(value)
- return list
-
- def close(self):
- _uninstall()
-
- def into_core(self) -> Core:
- pre_interceptor = Interceptor()
- handler = Handler()
- generator = TextGenerator()
- interceptor = Interceptor()
- injector = Injector()
- generator = Generator()
-
- self.inject_pre_interceptor(pre_interceptor)
- self.inject_handler(handler)
- self.inject_generator(generator)
- self.inject_interceptor(interceptor)
- self.inject_injector(injector)
- self._core.pre_interceptor = pre_interceptor
- self._core.handler = handler
- self._core.generator = generator
- self._core.interceptor = interceptor
- self._core.injector = injector
-
- self._core.doc = self.doc
-
- return self._core
-
- def inject_register(self, register: Register):
- register.pre_interceptors = self.pre_interceptors
- register.handlers = self.handlers
- register.events = self.events
- register.global_variables = self.global_variables
- register.interceptors = self.interceptors
-
- def into_register(self) -> Register:
- register = Register()
- self.inject_register(register)
- return register
-
- def inject_interceptor(self, interceptor: Interceptor):
- interceptor.interceptors = self.interceptors
-
- def into_interceptor(self) -> Interceptor:
- interceptor = Interceptor()
- self.inject_interceptor(interceptor)
- return interceptor
-
- def inject_handler(self, handler: Handler):
- handler.handlers = self.handlers
-
- def into_handlers(self) -> Handler:
- handler = Handler()
- self.inject_handler(handler)
- return handler
-
- def inject_generator(self, generator: Generator):
- generator.events = self.events
- generator.global_variables = self.global_variables
- generator.generators.update(self.generators)
-
- def into_generator(self) -> Generator:
- generator = Generator()
- self.inject_generator(generator)
- return generator
-
- def inject_pre_interceptor(self, pre_interceptor: Interceptor):
- pre_interceptor.interceptors = self.pre_interceptors
-
- def into_pre_interceptor(self) -> Interceptor:
- pre_interceptor = Interceptor()
- self.inject_pre_interceptor(pre_interceptor)
- return pre_interceptor
-
- def inject_injector(self, injector: Injector):
- injector.parameters = self.global_variables
-
- def into_injector(self) -> Injector:
- injector = Injector()
- self.inject_injector(injector)
- return injector
diff --git a/src/infini/logging.py b/src/infini/logging.py
deleted file mode 100644
index df4f818d..00000000
--- a/src/infini/logging.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from rich.logging import RichHandler
-from rich.console import Console
-
-import logging
-
-console = Console()
-
-
-logging.basicConfig(
- level="INFO",
- format="%(message)s",
- handlers=[
- RichHandler(
- console=console,
- show_time=False,
- show_path=False,
- show_level=True,
- markup=True,
- )
- ],
-)
-
-logger = logging.getLogger("rich")
diff --git a/src/infini/output.py b/src/infini/output.py
deleted file mode 100644
index 6520d9b4..00000000
--- a/src/infini/output.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from infini.typing import Union, Literal, Dict, Any
-
-
-class Output:
- type: Union[Literal["null", "text", "workflow"], str]
- name: str
- status: int
- block: bool
-
- variables: Dict[str, Any]
-
- def __init__(
- self,
- type: Union[Literal["null", "text", "workflow"], str],
- name: str,
- *,
- block: bool = False,
- variables: Dict[str, Any] = {},
- ) -> None:
- self.type = type
- self.name = name
- self.status = 1
- self.block = block
- self.variables = variables
-
- @classmethod
- def empty(cls) -> "Output":
- return cls("null", "null", block=True)
-
- def is_empty(self) -> bool:
- return self.type == "null"
diff --git a/src/infini/queue.py b/src/infini/queue.py
deleted file mode 100644
index cb7c2d4e..00000000
--- a/src/infini/queue.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from infini.exceptions import KeyError
-from infini.typing import Generic, T
-from collections import deque
-
-import heapq
-
-
-class EventQueue(Generic[T]):
- heap: list
- entry_finder: dict
- counter: int
- order_queue: deque
-
- def __init__(self) -> None:
- self.heap = []
- self.entry_finder = {}
- self.counter = 0
- self.order_queue = deque()
-
- def push(self, priority: int, item: T) -> None:
- if item in self.entry_finder:
- self.remove(item)
- entry = [priority, self.counter, item]
- self.counter += 1
- self.entry_finder[item] = entry
- heapq.heappush(self.heap, entry)
- self.order_queue.appendleft(item)
-
- def pop(self) -> T:
- while self.heap:
- _, _, item = heapq.heappop(self.heap)
- if self.entry_finder[item] is not None:
- del self.entry_finder[item]
- self.order_queue.remove(item)
- return item
- raise KeyError("Event queue is empty.")
-
- def remove(self, item) -> None:
- entry = self.entry_finder.pop(item)
- entry[-1] = None
-
- def is_empty(self) -> bool:
- return not bool(self.heap)
-
- def generate(self):
- while not self.is_empty():
- yield self.pop()
diff --git a/src/infini/register.py b/src/infini/register.py
deleted file mode 100644
index a64d8b01..00000000
--- a/src/infini/register.py
+++ /dev/null
@@ -1,206 +0,0 @@
-from infini.doc import Annotation, Doc
-from infini.typing import List, Dict, Any, Callable, RouterType, Optional, Union, Type
-from infini.input import Input
-from infini.output import Output
-from infini.router import Contains, Router
-from infini.generator import BaseGenerator
-from functools import wraps
-
-
-class Register:
- pre_interceptors: List[RouterType]
- handlers: List[RouterType]
- events: Dict[str, str]
- global_variables: Dict[str, Union[str, Callable]]
- interceptors: List[RouterType]
- generators: Dict[str, BaseGenerator]
-
- doc: Doc
-
- def __init__(self) -> None:
- self.pre_interceptors = []
- self.handlers = []
- self.events = {}
- self.global_variables = {}
- self.interceptors = []
- self.generators = {}
- self.doc = Doc()
-
- def pre_interceptor(
- self,
- router: Union[Router, str],
- *,
- priority: int = 0,
- namespace: Optional[str] = None,
- usage: Optional[str] = None,
- description: Optional[str] = None,
- content: Optional[str] = None,
- epilog: Optional[str] = None,
- ):
- """注册一个文本输入拦截器"""
-
- def decorator(func: Callable):
- @wraps(func)
- def wrapper(*args, **kwargs) -> Union[Input, Output]:
- return func(*args, **kwargs)
-
- _router = Contains(router) if isinstance(router, str) else router
- self.pre_interceptors.append(
- {
- "priority": priority,
- "router": _router,
- "handler": wrapper,
- }
- )
- self.doc.pre_interceptors[
- namespace or _router.namespace or func.__name__
- ] = {
- "usage": usage,
- "description": description,
- "content": content or func.__doc__,
- "epilog": epilog,
- }
- return wrapper
-
- return decorator
-
- def handler(
- self,
- router: Union[Router, str],
- *,
- priority: int = 0,
- namespace: Optional[str] = None,
- usage: Optional[str] = None,
- description: Optional[str] = None,
- content: Optional[str] = None,
- epilog: Optional[str] = None,
- sub_cmd: Optional[Dict[str, Annotation]] = None,
- ):
- """注册一个业务函数"""
-
- def decorator(func):
- @wraps(func)
- def wrapper(*args, **kwargs) -> Output:
- return func(*args, **kwargs)
-
- _router = Contains(router) if isinstance(router, str) else router
- self.handlers.append(
- {
- "priority": priority,
- "router": _router,
- "handler": wrapper,
- }
- )
- self.doc.handlers[namespace or _router.namespace or func.__name__] = {
- "usage": usage,
- "description": description,
- "content": content or func.__doc__,
- "epilog": epilog,
- "sub_cmd": sub_cmd or {},
- }
- return wrapper
-
- return decorator
-
- def register_textevent(
- self,
- name: str,
- text: str,
- *,
- description: Optional[str] = None,
- content: Optional[str] = None,
- var_doc: Optional[Dict[str, str]] = None,
- ):
- """注册一个文本事件"""
- self.events[name] = text
- self.doc.events[name] = {
- "description": description,
- "content": content,
- "var_doc": var_doc or {},
- }
-
- def register_variable(
- self,
- name: str,
- data: Any,
- *,
- usage: Optional[str] = None,
- description: Optional[str] = None,
- content: Optional[str] = None,
- ):
- """注册一个静态全局变量"""
- self.global_variables[name] = data
- self.doc.global_variables[name] = {
- "usage": usage,
- "description": description,
- "content": content,
- }
-
- def dynamic_variable(
- self,
- name: Optional[str] = None,
- *,
- usage: Optional[str] = None,
- description: Optional[str] = None,
- content: Optional[str] = None,
- ):
- """注册一个动态全局变量"""
-
- def decorator(func):
- @wraps(func)
- def wrapper(*args, **kwargs) -> str:
- return func(*args, **kwargs)
-
- self.global_variables[name or func.__name__] = wrapper
- self.doc.global_variables[name or func.__name__] = {
- "usage": usage,
- "description": description,
- "content": content or func.__doc__,
- }
- return wrapper
-
- return decorator
-
- def interceptor(
- self,
- router: Union[Router, str],
- *,
- priority: int = 0,
- namespace: Optional[str] = None,
- usage: Optional[str] = None,
- description: Optional[str] = None,
- content: Optional[str] = None,
- epilog: Optional[str] = None,
- ):
- """注册一个产出文本拦截器"""
-
- def decorator(func):
- @wraps(func)
- def wrapper(*args, **kwargs) -> Union[Input, Output]:
- return func(*args, **kwargs)
-
- _router = Contains(router) if isinstance(router, str) else router
- self.interceptors.append(
- {
- "priority": priority,
- "router": _router,
- "handler": wrapper,
- }
- )
- self.doc.interceptors[namespace or _router.namespace or func.__name__] = {
- "usage": usage,
- "description": description,
- "content": content or func.__doc__,
- "epilog": epilog,
- }
- return wrapper
-
- return decorator
-
- def register_generator(
- self, generator: Union[BaseGenerator, Type[BaseGenerator]]
- ) -> None:
- """注册一个生成器"""
- if not isinstance(generator, BaseGenerator):
- generator = generator()
- self.generators[generator.type] = generator
diff --git a/src/infini/router.py b/src/infini/router.py
deleted file mode 100644
index 4e025a15..00000000
--- a/src/infini/router.py
+++ /dev/null
@@ -1,60 +0,0 @@
-from typing import Optional
-from infini.typing import Sequence, Literal, Tuple
-from infini.input import Input
-
-
-class Router:
- type: Literal["text"] = "text"
- signs: Tuple[str, ...]
-
- def __init__(self, sign: str, alias: Sequence[str] = []) -> None:
- signs = {sign}
- signs.update(alias)
- self.signs = tuple(signs)
-
- def __eq__(self, __router: "Router") -> bool:
- return __router.type == self.type and __router.signs == self.signs
-
- def match(self, plain_text: str) -> bool:
- text = plain_text.strip()
- return any([text == sign for sign in self.signs])
-
- @property
- def namespace(self) -> Optional[str]:
- return self.signs[0] if self.signs else None
-
-
-class Startswith(Router):
- name: Literal["startswith"] = "startswith"
-
- def match(self, plain_text: str) -> bool:
- text = plain_text.strip()
- return any([text.startswith(sign) for sign in self.signs])
-
-
-class Contains(Router):
- name: Literal["contains"] = "contains"
-
- def match(self, plain_text: str) -> bool:
- return any([sign in plain_text for sign in self.signs])
-
-
-class Endswith(Router):
- name: Literal["endswith"] = "endswith"
-
- def match(self, input: Input) -> bool:
- text = input.get_plain_text().strip()
- return any([text.endswith(sign) for sign in self.signs])
-
-
-class Command(Router):
- name: Literal["command"] = "command"
- prefix: tuple = (".", "/", "。", "!", "!")
-
- def match(self, input: Input) -> bool:
- text = input.get_plain_text().strip()
- if text.startswith(self.prefix):
- text = text[1:]
- return any([text.startswith(sign) for sign in self.signs])
-
- return False