aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/conventionalrp/core/rules.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/conventionalrp/core/rules.py')
-rw-r--r--src/conventionalrp/core/rules.py238
1 files changed, 238 insertions, 0 deletions
diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py
new file mode 100644
index 0000000..f198d4e
--- /dev/null
+++ b/src/conventionalrp/core/rules.py
@@ -0,0 +1,238 @@
+from typing import Dict, Any, Callable, List, Optional
+from enum import Enum
+import re
+
+
+class RuleCondition(Enum):
+ """规则条件类型"""
+ EQUALS = "equals"
+ CONTAINS = "contains"
+ MATCHES = "matches"
+ STARTS_WITH = "starts_with"
+ ENDS_WITH = "ends_with"
+ IN_LIST = "in_list"
+ GREATER_THAN = "greater_than"
+ LESS_THAN = "less_than"
+
+
+class Rule:
+ def __init__(
+ self,
+ name: str,
+ condition: Dict[str, Any],
+ action: Dict[str, Any],
+ priority: int = 50
+ ):
+ self.name = name
+ self.condition = condition
+ self.action = action
+ self.priority = priority
+ self._compiled_patterns = {}
+
+ self._precompile_patterns()
+
+ def _precompile_patterns(self):
+ """预编译正则表达式以提高性能"""
+ if isinstance(self.condition, dict):
+ for key, value in self.condition.items():
+ if isinstance(value, dict) and value.get("type") == "matches":
+ pattern = value.get("pattern")
+ if pattern:
+ self._compiled_patterns[key] = re.compile(pattern)
+
+ def matches(self, data: Dict[str, Any]) -> bool:
+ """
+ 检查数据是否匹配规则条件
+ """
+ if not isinstance(self.condition, dict):
+ return False
+
+ for field, condition_spec in self.condition.items():
+ if not self._check_field_condition(data, field, condition_spec):
+ return False
+
+ return True
+
+ def _check_field_condition(
+ self,
+ data: Dict[str, Any],
+ field: str,
+ condition: Any
+ ) -> bool:
+ """检查单个字段的条件"""
+ value = data.get(field)
+
+ if not isinstance(condition, dict):
+ return value == condition
+
+ condition_type = condition.get("type")
+ expected_value = condition.get("value")
+
+ if condition_type == "equals":
+ return value == expected_value
+ elif condition_type == "contains":
+ return expected_value in str(value) if value else False
+ elif condition_type == "matches":
+ if field in self._compiled_patterns:
+ pattern = self._compiled_patterns[field]
+ return bool(pattern.search(str(value))) if value else False
+ return False
+ elif condition_type == "starts_with":
+ return str(value).startswith(expected_value) if value else False
+ elif condition_type == "ends_with":
+ return str(value).endswith(expected_value) if value else False
+ elif condition_type == "in_list":
+ return value in expected_value if isinstance(expected_value, list) else False
+ elif condition_type == "greater_than":
+ try:
+ return float(value) > float(expected_value)
+ except (ValueError, TypeError):
+ return False
+ elif condition_type == "less_than":
+ try:
+ return float(value) < float(expected_value)
+ except (ValueError, TypeError):
+ return False
+
+ return False
+
+ def apply(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ 对匹配的数据应用规则动作
+ """
+ result = data.copy()
+
+ if not isinstance(self.action, dict):
+ return result
+
+ action_type = self.action.get("type")
+
+ if action_type == "set_field":
+ field = self.action.get("field")
+ value = self.action.get("value")
+ if field:
+ result[field] = value
+
+ elif action_type == "add_field":
+ field = self.action.get("field")
+ value = self.action.get("value")
+ if field and field not in result:
+ result[field] = value
+
+ elif action_type == "remove_field":
+ field = self.action.get("field")
+ if field and field in result:
+ del result[field]
+
+ elif action_type == "transform":
+ field = self.action.get("field")
+ func_name = self.action.get("function")
+ if field and field in result and func_name:
+ result[field] = self._apply_transform(result[field], func_name)
+
+ elif action_type == "add_tag":
+ tag = self.action.get("tag")
+ if tag:
+ if "tags" not in result:
+ result["tags"] = []
+ if tag not in result["tags"]:
+ result["tags"].append(tag)
+
+ elif action_type == "copy_field":
+ source = self.action.get("source")
+ target = self.action.get("target")
+ if source and target and source in result:
+ result[target] = result[source]
+
+ return result
+
+ def _apply_transform(self, value: Any, func_name: str) -> Any:
+ transforms = {
+ "upper": lambda x: str(x).upper(),
+ "lower": lambda x: str(x).lower(),
+ "strip": lambda x: str(x).strip(),
+ "int": lambda x: int(x),
+ "float": lambda x: float(x),
+ "len": lambda x: len(x) if hasattr(x, '__len__') else 0,
+ }
+
+ func = transforms.get(func_name)
+ if func:
+ try:
+ return func(value)
+ except Exception:
+ return value
+ return value
+
+ def __repr__(self) -> str:
+ return f"Rule(name={self.name}, priority={self.priority})"
+
+
+class RuleEngine:
+ """
+ 规则引擎
+ """
+
+ def __init__(self):
+ self.rules: List[Rule] = []
+ self._sorted = False
+
+ def add_rule(self, rule: Rule):
+ self.rules.append(rule)
+ self._sorted = False
+
+ def add_rule_dict(
+ self,
+ name: str,
+ condition: Dict[str, Any],
+ action: Dict[str, Any],
+ priority: int = 50
+ ):
+ """
+ 从字典添加规则
+ """
+ rule = Rule(name, condition, action, priority)
+ self.add_rule(rule)
+
+ def _ensure_sorted(self):
+ """确保规则按优先级排序"""
+ if not self._sorted:
+ self.rules.sort(key=lambda r: r.priority, reverse=True)
+ self._sorted = True
+
+ def process(
+ self,
+ data: Dict[str, Any],
+ apply_all: bool = False
+ ) -> Dict[str, Any]:
+ self._ensure_sorted()
+ result = data.copy()
+
+ for rule in self.rules:
+ if rule.matches(result):
+ result = rule.apply(result)
+ if not apply_all:
+ break
+
+ return result
+
+ def process_batch(
+ self,
+ data_list: List[Dict[str, Any]],
+ apply_all: bool = False
+ ) -> List[Dict[str, Any]]:
+ return [self.process(data, apply_all) for data in data_list]
+
+ def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]:
+ self._ensure_sorted()
+ return [rule for rule in self.rules if rule.matches(data)]
+
+ def clear_rules(self):
+ self.rules.clear()
+ self._sorted = False
+
+ def rule_count(self) -> int:
+ return len(self.rules)
+
+ def __repr__(self) -> str:
+ return f"RuleEngine(rules={len(self.rules)})"