diff options
Diffstat (limited to 'src/conventionalrp/core')
| -rw-r--r-- | src/conventionalrp/core/__init__.py | 9 | ||||
| -rw-r--r-- | src/conventionalrp/core/processor.py | 150 | ||||
| -rw-r--r-- | src/conventionalrp/core/rules.py | 238 |
3 files changed, 339 insertions, 58 deletions
diff --git a/src/conventionalrp/core/__init__.py b/src/conventionalrp/core/__init__.py index 91d0f8f..08829b8 100644 --- a/src/conventionalrp/core/__init__.py +++ b/src/conventionalrp/core/__init__.py @@ -1,3 +1,6 @@ -""" -This file initializes the core module of the conventionalrp SDK. -""" +from .parser import Parser +from .processor import Processor +from .rules import Rule, RuleEngine + +__all__ = ["Parser", "Processor", "Rule", "RuleEngine"] + diff --git a/src/conventionalrp/core/processor.py b/src/conventionalrp/core/processor.py index bc74ffb..12ca32b 100644 --- a/src/conventionalrp/core/processor.py +++ b/src/conventionalrp/core/processor.py @@ -1,68 +1,104 @@ -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Callable +import logging +from .rules import RuleEngine, Rule + +logger = logging.getLogger(__name__) class Processor: - """处理器,用于处理解析后的token""" - def __init__(self, rules: Optional[Dict[str, Any]] = None): - """ - 初始化处理器 - - Args: - rules: 处理规则(可选) - """ self.rules = rules or {} - - def process_tokens(self, tokens: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - 处理token列表 + self.rule_engine = RuleEngine() + self.custom_processors: List[Callable] = [] + + self._load_rules_to_engine() - Args: - tokens: 解析后的token列表 + logger.info("Processor initialized with %d rules", + self.rule_engine.rule_count()) + + def _load_rules_to_engine(self): + if not isinstance(self.rules, dict): + return + + rules_list = self.rules.get("rules", []) + for rule_dict in rules_list: + if not isinstance(rule_dict, dict): + continue - Returns: - 处理后的数据列表 - """ + try: + self.rule_engine.add_rule_dict( + name=rule_dict.get("name", "unnamed"), + condition=rule_dict.get("condition", {}), + action=rule_dict.get("action", {}), + priority=rule_dict.get("priority", 50) + ) + except Exception as e: + logger.warning("Failed to load rule: %s", e) + + def add_rule(self, rule: Rule): + self.rule_engine.add_rule(rule) + logger.debug("Added rule: %s", rule.name) + + def add_processor(self, processor: Callable[[Dict[str, Any]], Dict[str, Any]]): + self.custom_processors.append(processor) + logger.debug("Added custom processor") + + def process_tokens( + self, + tokens: List[Dict[str, Any]], + apply_all_rules: bool = False + ) -> List[Dict[str, Any]]: + if not tokens: + logger.warning("Empty token list provided") + return [] + + logger.info("Processing %d tokens", len(tokens)) processed_data = [] - for token in tokens: - processed_token = self.apply_rules(token) - processed_data.append(processed_token) - return processed_data - - def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]: - """ - 对单个token应用规则 - Args: - token: 单个token - - Returns: - 处理后的token - """ - # 基础实现:直接返回token - # 可以在此添加更多处理逻辑 + for i, token in enumerate(tokens): + try: + processed_token = self.process_single_token(token, apply_all_rules) + processed_data.append(processed_token) + except Exception as e: + logger.error("Error processing token %d: %s", i, e) + # 发生错误时保留原始 token + processed_data.append(token) + + logger.info("Successfully processed %d tokens", len(processed_data)) + return processed_data + + def process_single_token( + self, + token: Dict[str, Any], + apply_all_rules: bool = False + ) -> Dict[str, Any]: processed = token.copy() - # 添加处理时间戳 + if self.rule_engine.rule_count() > 0: + processed = self.rule_engine.process(processed, apply_all_rules) + + for processor in self.custom_processors: + try: + processed = processor(processed) + except Exception as e: + logger.error("Custom processor failed: %s", e) + if "timestamp" in processed: processed["processed"] = True return processed - - def generate_output(self, processed_data: List[Dict[str, Any]], format_type: str) -> str: - """ - 生成指定格式的输出 + + def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]: + return self.process_single_token(token) + + def generate_output( + self, + processed_data: List[Dict[str, Any]], + format_type: str + ) -> str: + logger.info("Generating %s output for %d items", + format_type, len(processed_data)) - Args: - processed_data: 处理后的数据 - format_type: 输出格式 (json/html/markdown) - - Returns: - 格式化后的字符串 - - Raises: - ValueError: 不支持的格式类型 - """ if format_type == "json": return self.generate_json_output(processed_data) elif format_type == "html": @@ -71,20 +107,24 @@ class Processor: return self.generate_markdown_output(processed_data) else: raise ValueError(f"Unsupported format type: {format_type}") - + def generate_json_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成JSON格式输出""" import json return json.dumps(processed_data, ensure_ascii=False, indent=2) - + def generate_html_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成HTML格式输出""" return ( "<html><body>" + "".join(f"<p>{data}</p>" for data in processed_data) + "</body></html>" ) - + def generate_markdown_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成Markdown格式输出""" return "\n".join(f"- {data}" for data in processed_data) + + def get_statistics(self) -> Dict[str, Any]: + return { + "rule_count": self.rule_engine.rule_count(), + "custom_processor_count": len(self.custom_processors), + "has_rules_config": bool(self.rules), + } diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py new file mode 100644 index 0000000..f198d4e --- /dev/null +++ b/src/conventionalrp/core/rules.py @@ -0,0 +1,238 @@ +from typing import Dict, Any, Callable, List, Optional +from enum import Enum +import re + + +class RuleCondition(Enum): + """规则条件类型""" + EQUALS = "equals" + CONTAINS = "contains" + MATCHES = "matches" + STARTS_WITH = "starts_with" + ENDS_WITH = "ends_with" + IN_LIST = "in_list" + GREATER_THAN = "greater_than" + LESS_THAN = "less_than" + + +class Rule: + def __init__( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + self.name = name + self.condition = condition + self.action = action + self.priority = priority + self._compiled_patterns = {} + + self._precompile_patterns() + + def _precompile_patterns(self): + """预编译正则表达式以提高性能""" + if isinstance(self.condition, dict): + for key, value in self.condition.items(): + if isinstance(value, dict) and value.get("type") == "matches": + pattern = value.get("pattern") + if pattern: + self._compiled_patterns[key] = re.compile(pattern) + + def matches(self, data: Dict[str, Any]) -> bool: + """ + 检查数据是否匹配规则条件 + """ + if not isinstance(self.condition, dict): + return False + + for field, condition_spec in self.condition.items(): + if not self._check_field_condition(data, field, condition_spec): + return False + + return True + + def _check_field_condition( + self, + data: Dict[str, Any], + field: str, + condition: Any + ) -> bool: + """检查单个字段的条件""" + value = data.get(field) + + if not isinstance(condition, dict): + return value == condition + + condition_type = condition.get("type") + expected_value = condition.get("value") + + if condition_type == "equals": + return value == expected_value + elif condition_type == "contains": + return expected_value in str(value) if value else False + elif condition_type == "matches": + if field in self._compiled_patterns: + pattern = self._compiled_patterns[field] + return bool(pattern.search(str(value))) if value else False + return False + elif condition_type == "starts_with": + return str(value).startswith(expected_value) if value else False + elif condition_type == "ends_with": + return str(value).endswith(expected_value) if value else False + elif condition_type == "in_list": + return value in expected_value if isinstance(expected_value, list) else False + elif condition_type == "greater_than": + try: + return float(value) > float(expected_value) + except (ValueError, TypeError): + return False + elif condition_type == "less_than": + try: + return float(value) < float(expected_value) + except (ValueError, TypeError): + return False + + return False + + def apply(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + 对匹配的数据应用规则动作 + """ + result = data.copy() + + if not isinstance(self.action, dict): + return result + + action_type = self.action.get("type") + + if action_type == "set_field": + field = self.action.get("field") + value = self.action.get("value") + if field: + result[field] = value + + elif action_type == "add_field": + field = self.action.get("field") + value = self.action.get("value") + if field and field not in result: + result[field] = value + + elif action_type == "remove_field": + field = self.action.get("field") + if field and field in result: + del result[field] + + elif action_type == "transform": + field = self.action.get("field") + func_name = self.action.get("function") + if field and field in result and func_name: + result[field] = self._apply_transform(result[field], func_name) + + elif action_type == "add_tag": + tag = self.action.get("tag") + if tag: + if "tags" not in result: + result["tags"] = [] + if tag not in result["tags"]: + result["tags"].append(tag) + + elif action_type == "copy_field": + source = self.action.get("source") + target = self.action.get("target") + if source and target and source in result: + result[target] = result[source] + + return result + + def _apply_transform(self, value: Any, func_name: str) -> Any: + transforms = { + "upper": lambda x: str(x).upper(), + "lower": lambda x: str(x).lower(), + "strip": lambda x: str(x).strip(), + "int": lambda x: int(x), + "float": lambda x: float(x), + "len": lambda x: len(x) if hasattr(x, '__len__') else 0, + } + + func = transforms.get(func_name) + if func: + try: + return func(value) + except Exception: + return value + return value + + def __repr__(self) -> str: + return f"Rule(name={self.name}, priority={self.priority})" + + +class RuleEngine: + """ + 规则引擎 + """ + + def __init__(self): + self.rules: List[Rule] = [] + self._sorted = False + + def add_rule(self, rule: Rule): + self.rules.append(rule) + self._sorted = False + + def add_rule_dict( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + """ + 从字典添加规则 + """ + rule = Rule(name, condition, action, priority) + self.add_rule(rule) + + def _ensure_sorted(self): + """确保规则按优先级排序""" + if not self._sorted: + self.rules.sort(key=lambda r: r.priority, reverse=True) + self._sorted = True + + def process( + self, + data: Dict[str, Any], + apply_all: bool = False + ) -> Dict[str, Any]: + self._ensure_sorted() + result = data.copy() + + for rule in self.rules: + if rule.matches(result): + result = rule.apply(result) + if not apply_all: + break + + return result + + def process_batch( + self, + data_list: List[Dict[str, Any]], + apply_all: bool = False + ) -> List[Dict[str, Any]]: + return [self.process(data, apply_all) for data in data_list] + + def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]: + self._ensure_sorted() + return [rule for rule in self.rules if rule.matches(data)] + + def clear_rules(self): + self.rules.clear() + self._sorted = False + + def rule_count(self) -> int: + return len(self.rules) + + def __repr__(self) -> str: + return f"RuleEngine(rules={len(self.rules)})" |
