1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
from infini.input import Input
from infini.interceptor import Interceptor
from infini.generator import TextGenerator
from infini.handler import Handler
from infini.injector import Injector
from infini.output import Output
from infini.typing import Any, Generator, Union
from infini.exceptions import ValueError
class Core:
pre_interceptor: Interceptor
handler: Handler
generator: TextGenerator
interceptor: Interceptor
injector: Injector
def input(
self, input: Input
) -> Generator[Union[str, Output], Any, None]: # TODO 支持Workflow
for pre_intercepted_stream in self.pre_intercept(input):
if isinstance(pre_intercepted_stream, Output):
if not isinstance(pre_intercepted_stream, Output):
raise ValueError(
"Interceptor functions should return or yield a `Output` object."
)
if pre_intercepted_stream.is_empty():
return
yield self.generate(pre_intercepted_stream)
if pre_intercepted_stream.block:
return
else:
input = pre_intercepted_stream
for handled_stream in self.handle(input):
if not isinstance(handled_stream, Output):
raise ValueError(
"Handler functions should return or yield a `Output` object."
)
if handled_stream.is_empty():
return
outcome = self.generate(handled_stream)
for stream in self.intercept(handled_stream, outcome):
if isinstance(stream, Output):
if stream.is_empty():
return
yield self.generate(stream)
if stream.block:
return
continue
outcome = stream
yield outcome
if handled_stream.block:
return
def pre_intercept(self, input: Input) -> Generator[Union[Input, Output], Any, None]:
return self.pre_interceptor.input(input)
def handle(self, input: Input) -> Generator[Output, Any, None]:
iterator = self.handler.input(input)
for output in iterator:
yield output
def generate(self, output: Output) -> str:
return self.generator.output(output, self.injector)
def intercept(
self, output: Output, output_text: str
) -> Generator[Union[str, Output], Any, None]:
return self.interceptor.output(output, output_text)
|