1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
from typing import List, Dict, Any, Optional, Callable
import logging
from .rules import RuleEngine, Rule
logger = logging.getLogger(__name__)
class Processor:
def __init__(self, rules: Optional[Dict[str, Any]] = None):
self.rules = rules or {}
self.rule_engine = RuleEngine()
self.custom_processors: List[Callable] = []
self._load_rules_to_engine()
logger.info("Processor initialized with %d rules",
self.rule_engine.rule_count())
def _load_rules_to_engine(self):
if not isinstance(self.rules, dict):
return
rules_list = self.rules.get("rules", [])
for rule_dict in rules_list:
if not isinstance(rule_dict, dict):
continue
try:
self.rule_engine.add_rule_dict(
name=rule_dict.get("name", "unnamed"),
condition=rule_dict.get("condition", {}),
action=rule_dict.get("action", {}),
priority=rule_dict.get("priority", 50)
)
except Exception as e:
logger.warning("Failed to load rule: %s", e)
def add_rule(self, rule: Rule):
self.rule_engine.add_rule(rule)
logger.debug("Added rule: %s", rule.name)
def add_processor(self, processor: Callable[[Dict[str, Any]], Dict[str, Any]]):
self.custom_processors.append(processor)
logger.debug("Added custom processor")
def process_tokens(
self,
tokens: List[Dict[str, Any]],
apply_all_rules: bool = False
) -> List[Dict[str, Any]]:
if not tokens:
logger.warning("Empty token list provided")
return []
logger.info("Processing %d tokens", len(tokens))
processed_data = []
for i, token in enumerate(tokens):
try:
processed_token = self.process_single_token(token, apply_all_rules)
processed_data.append(processed_token)
except Exception as e:
logger.error("Error processing token %d: %s", i, e)
# 发生错误时保留原始 token
processed_data.append(token)
logger.info("Successfully processed %d tokens", len(processed_data))
return processed_data
def process_single_token(
self,
token: Dict[str, Any],
apply_all_rules: bool = False
) -> Dict[str, Any]:
processed = token.copy()
if self.rule_engine.rule_count() > 0:
processed = self.rule_engine.process(processed, apply_all_rules)
for processor in self.custom_processors:
try:
processed = processor(processed)
except Exception as e:
logger.error("Custom processor failed: %s", e)
if "timestamp" in processed:
processed["processed"] = True
return processed
def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]:
return self.process_single_token(token)
def generate_output(
self,
processed_data: List[Dict[str, Any]],
format_type: str
) -> str:
logger.info("Generating %s output for %d items",
format_type, len(processed_data))
if format_type == "json":
return self.generate_json_output(processed_data)
elif format_type == "html":
return self.generate_html_output(processed_data)
elif format_type == "markdown":
return self.generate_markdown_output(processed_data)
else:
raise ValueError(f"Unsupported format type: {format_type}")
def generate_json_output(self, processed_data: List[Dict[str, Any]]) -> str:
import json
return json.dumps(processed_data, ensure_ascii=False, indent=2)
def generate_html_output(self, processed_data: List[Dict[str, Any]]) -> str:
return (
"<html><body>"
+ "".join(f"<p>{data}</p>" for data in processed_data)
+ "</body></html>"
)
def generate_markdown_output(self, processed_data: List[Dict[str, Any]]) -> str:
return "\n".join(f"- {data}" for data in processed_data)
def get_statistics(self) -> Dict[str, Any]:
return {
"rule_count": self.rule_engine.rule_count(),
"custom_processor_count": len(self.custom_processors),
"has_rules_config": bool(self.rules),
}
|