aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/infini/core.py
blob: 08b81f15cceb9313e3f64787643b8a28c05c3e7e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from infini.input import Input
from infini.interceptor import Interceptor
from infini.generator import TextGenerator
from infini.handler import Handler
from infini.injector import Injector
from infini.output import Output
from infini.typing import Any, Generator, Union
from infini.exceptions import ValueError


class Core:
    pre_interceptor: Interceptor
    handler: Handler
    generator: TextGenerator
    interceptor: Interceptor
    injector: Injector

    def input(
        self, input: Input
    ) -> Generator[Union[str, Output], Any, None]:  # TODO 支持Workflow
        for pre_intercepted_stream in self.pre_intercept(input):
            if isinstance(pre_intercepted_stream, Output):
                if not isinstance(pre_intercepted_stream, Output):
                    raise ValueError(
                        "Interceptor functions should return or yield a `Output` object."
                    )
                if pre_intercepted_stream.is_empty():
                    return
                yield self.generate(pre_intercepted_stream)
                if pre_intercepted_stream.block:
                    return
            else:
                input = pre_intercepted_stream

        for handled_stream in self.handle(input):
            if not isinstance(handled_stream, Output):
                raise ValueError(
                    "Handler functions should return or yield a `Output` object."
                )
            if handled_stream.is_empty():
                return
            outcome = self.generate(handled_stream)
            for stream in self.intercept(handled_stream, outcome):
                if isinstance(stream, Output):
                    if stream.is_empty():
                        return
                    yield self.generate(stream)
                    if stream.block:
                        return
                    continue
                outcome = stream
            yield outcome
            if handled_stream.block:
                return

    def pre_intercept(self, input: Input) -> Generator[Union[Input, Output], Any, None]:
        return self.pre_interceptor.input(input)

    def handle(self, input: Input) -> Generator[Output, Any, None]:
        iterator = self.handler.input(input)
        for output in iterator:
            yield output

    def generate(self, output: Output) -> str:
        return self.generator.output(output, self.injector)

    def intercept(
        self, output: Output, output_text: str
    ) -> Generator[Union[str, Output], Any, None]:
        return self.interceptor.output(output, output_text)