blob: 18dab9d53af8edc09b4afa6eaac0f531b69494c0 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
from infini.core import Core
from infini.generator import Generator
from infini.handler import Handler
from infini.injector import Injector
from infini.input import Input
from infini.interceptor import Interceptor
from infini.output import Output
from infini.router import Startswith
def test_workflow():
def func_workflow(input: Input):
yield input.output("workflow", "test.workflow", block=True)
input = Input("testmsg")
handler = Handler()
handler.handlers = [
{
"priority": 0,
"router": Startswith(""),
"handler": func_workflow,
}
]
interceptor = Interceptor()
interceptor.interceptors = []
generator = Generator()
generator.events = {
"test.cmd": "cmd",
"test.add": "{{ result }}",
"block.jianlvchun": "检测到违禁词",
}
generator.global_variables = {}
core = Core()
core.handler = handler
core.interceptor = interceptor
core.pre_interceptor = interceptor
core.generator = generator
core.injector = Injector()
for output in core.input(input):
assert isinstance(output, Output)
assert output.type == "workflow"
output.status = 0
|