diff options
Diffstat (limited to 'src/conventionalrp/core/rules.py')
| -rw-r--r-- | src/conventionalrp/core/rules.py | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py new file mode 100644 index 0000000..f198d4e --- /dev/null +++ b/src/conventionalrp/core/rules.py @@ -0,0 +1,238 @@ +from typing import Dict, Any, Callable, List, Optional +from enum import Enum +import re + + +class RuleCondition(Enum): + """规则条件类型""" + EQUALS = "equals" + CONTAINS = "contains" + MATCHES = "matches" + STARTS_WITH = "starts_with" + ENDS_WITH = "ends_with" + IN_LIST = "in_list" + GREATER_THAN = "greater_than" + LESS_THAN = "less_than" + + +class Rule: + def __init__( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + self.name = name + self.condition = condition + self.action = action + self.priority = priority + self._compiled_patterns = {} + + self._precompile_patterns() + + def _precompile_patterns(self): + """预编译正则表达式以提高性能""" + if isinstance(self.condition, dict): + for key, value in self.condition.items(): + if isinstance(value, dict) and value.get("type") == "matches": + pattern = value.get("pattern") + if pattern: + self._compiled_patterns[key] = re.compile(pattern) + + def matches(self, data: Dict[str, Any]) -> bool: + """ + 检查数据是否匹配规则条件 + """ + if not isinstance(self.condition, dict): + return False + + for field, condition_spec in self.condition.items(): + if not self._check_field_condition(data, field, condition_spec): + return False + + return True + + def _check_field_condition( + self, + data: Dict[str, Any], + field: str, + condition: Any + ) -> bool: + """检查单个字段的条件""" + value = data.get(field) + + if not isinstance(condition, dict): + return value == condition + + condition_type = condition.get("type") + expected_value = condition.get("value") + + if condition_type == "equals": + return value == expected_value + elif condition_type == "contains": + return expected_value in str(value) if value else False + elif condition_type == "matches": + if field in self._compiled_patterns: + pattern = self._compiled_patterns[field] + return bool(pattern.search(str(value))) if value else False + return False + elif condition_type == "starts_with": + return str(value).startswith(expected_value) if value else False + elif condition_type == "ends_with": + return str(value).endswith(expected_value) if value else False + elif condition_type == "in_list": + return value in expected_value if isinstance(expected_value, list) else False + elif condition_type == "greater_than": + try: + return float(value) > float(expected_value) + except (ValueError, TypeError): + return False + elif condition_type == "less_than": + try: + return float(value) < float(expected_value) + except (ValueError, TypeError): + return False + + return False + + def apply(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + 对匹配的数据应用规则动作 + """ + result = data.copy() + + if not isinstance(self.action, dict): + return result + + action_type = self.action.get("type") + + if action_type == "set_field": + field = self.action.get("field") + value = self.action.get("value") + if field: + result[field] = value + + elif action_type == "add_field": + field = self.action.get("field") + value = self.action.get("value") + if field and field not in result: + result[field] = value + + elif action_type == "remove_field": + field = self.action.get("field") + if field and field in result: + del result[field] + + elif action_type == "transform": + field = self.action.get("field") + func_name = self.action.get("function") + if field and field in result and func_name: + result[field] = self._apply_transform(result[field], func_name) + + elif action_type == "add_tag": + tag = self.action.get("tag") + if tag: + if "tags" not in result: + result["tags"] = [] + if tag not in result["tags"]: + result["tags"].append(tag) + + elif action_type == "copy_field": + source = self.action.get("source") + target = self.action.get("target") + if source and target and source in result: + result[target] = result[source] + + return result + + def _apply_transform(self, value: Any, func_name: str) -> Any: + transforms = { + "upper": lambda x: str(x).upper(), + "lower": lambda x: str(x).lower(), + "strip": lambda x: str(x).strip(), + "int": lambda x: int(x), + "float": lambda x: float(x), + "len": lambda x: len(x) if hasattr(x, '__len__') else 0, + } + + func = transforms.get(func_name) + if func: + try: + return func(value) + except Exception: + return value + return value + + def __repr__(self) -> str: + return f"Rule(name={self.name}, priority={self.priority})" + + +class RuleEngine: + """ + 规则引擎 + """ + + def __init__(self): + self.rules: List[Rule] = [] + self._sorted = False + + def add_rule(self, rule: Rule): + self.rules.append(rule) + self._sorted = False + + def add_rule_dict( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + """ + 从字典添加规则 + """ + rule = Rule(name, condition, action, priority) + self.add_rule(rule) + + def _ensure_sorted(self): + """确保规则按优先级排序""" + if not self._sorted: + self.rules.sort(key=lambda r: r.priority, reverse=True) + self._sorted = True + + def process( + self, + data: Dict[str, Any], + apply_all: bool = False + ) -> Dict[str, Any]: + self._ensure_sorted() + result = data.copy() + + for rule in self.rules: + if rule.matches(result): + result = rule.apply(result) + if not apply_all: + break + + return result + + def process_batch( + self, + data_list: List[Dict[str, Any]], + apply_all: bool = False + ) -> List[Dict[str, Any]]: + return [self.process(data, apply_all) for data in data_list] + + def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]: + self._ensure_sorted() + return [rule for rule in self.rules if rule.matches(data)] + + def clear_rules(self): + self.rules.clear() + self._sorted = False + + def rule_count(self) -> int: + return len(self.rules) + + def __repr__(self) -> str: + return f"RuleEngine(rules={len(self.rules)})" |
