aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/conventionalrp/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/conventionalrp/core')
-rw-r--r--src/conventionalrp/core/__init__.py9
-rw-r--r--src/conventionalrp/core/processor.py150
-rw-r--r--src/conventionalrp/core/rules.py238
3 files changed, 339 insertions, 58 deletions
diff --git a/src/conventionalrp/core/__init__.py b/src/conventionalrp/core/__init__.py
index 91d0f8f..08829b8 100644
--- a/src/conventionalrp/core/__init__.py
+++ b/src/conventionalrp/core/__init__.py
@@ -1,3 +1,6 @@
-"""
-This file initializes the core module of the conventionalrp SDK.
-"""
+from .parser import Parser
+from .processor import Processor
+from .rules import Rule, RuleEngine
+
+__all__ = ["Parser", "Processor", "Rule", "RuleEngine"]
+
diff --git a/src/conventionalrp/core/processor.py b/src/conventionalrp/core/processor.py
index bc74ffb..12ca32b 100644
--- a/src/conventionalrp/core/processor.py
+++ b/src/conventionalrp/core/processor.py
@@ -1,68 +1,104 @@
-from typing import List, Dict, Any, Optional
+from typing import List, Dict, Any, Optional, Callable
+import logging
+from .rules import RuleEngine, Rule
+
+logger = logging.getLogger(__name__)
class Processor:
- """处理器,用于处理解析后的token"""
-
def __init__(self, rules: Optional[Dict[str, Any]] = None):
- """
- 初始化处理器
-
- Args:
- rules: 处理规则(可选)
- """
self.rules = rules or {}
-
- def process_tokens(self, tokens: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 处理token列表
+ self.rule_engine = RuleEngine()
+ self.custom_processors: List[Callable] = []
+
+ self._load_rules_to_engine()
- Args:
- tokens: 解析后的token列表
+ logger.info("Processor initialized with %d rules",
+ self.rule_engine.rule_count())
+
+ def _load_rules_to_engine(self):
+ if not isinstance(self.rules, dict):
+ return
+
+ rules_list = self.rules.get("rules", [])
+ for rule_dict in rules_list:
+ if not isinstance(rule_dict, dict):
+ continue
- Returns:
- 处理后的数据列表
- """
+ try:
+ self.rule_engine.add_rule_dict(
+ name=rule_dict.get("name", "unnamed"),
+ condition=rule_dict.get("condition", {}),
+ action=rule_dict.get("action", {}),
+ priority=rule_dict.get("priority", 50)
+ )
+ except Exception as e:
+ logger.warning("Failed to load rule: %s", e)
+
+ def add_rule(self, rule: Rule):
+ self.rule_engine.add_rule(rule)
+ logger.debug("Added rule: %s", rule.name)
+
+ def add_processor(self, processor: Callable[[Dict[str, Any]], Dict[str, Any]]):
+ self.custom_processors.append(processor)
+ logger.debug("Added custom processor")
+
+ def process_tokens(
+ self,
+ tokens: List[Dict[str, Any]],
+ apply_all_rules: bool = False
+ ) -> List[Dict[str, Any]]:
+ if not tokens:
+ logger.warning("Empty token list provided")
+ return []
+
+ logger.info("Processing %d tokens", len(tokens))
processed_data = []
- for token in tokens:
- processed_token = self.apply_rules(token)
- processed_data.append(processed_token)
- return processed_data
-
- def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]:
- """
- 对单个token应用规则
- Args:
- token: 单个token
-
- Returns:
- 处理后的token
- """
- # 基础实现:直接返回token
- # 可以在此添加更多处理逻辑
+ for i, token in enumerate(tokens):
+ try:
+ processed_token = self.process_single_token(token, apply_all_rules)
+ processed_data.append(processed_token)
+ except Exception as e:
+ logger.error("Error processing token %d: %s", i, e)
+ # 发生错误时保留原始 token
+ processed_data.append(token)
+
+ logger.info("Successfully processed %d tokens", len(processed_data))
+ return processed_data
+
+ def process_single_token(
+ self,
+ token: Dict[str, Any],
+ apply_all_rules: bool = False
+ ) -> Dict[str, Any]:
processed = token.copy()
- # 添加处理时间戳
+ if self.rule_engine.rule_count() > 0:
+ processed = self.rule_engine.process(processed, apply_all_rules)
+
+ for processor in self.custom_processors:
+ try:
+ processed = processor(processed)
+ except Exception as e:
+ logger.error("Custom processor failed: %s", e)
+
if "timestamp" in processed:
processed["processed"] = True
return processed
-
- def generate_output(self, processed_data: List[Dict[str, Any]], format_type: str) -> str:
- """
- 生成指定格式的输出
+
+ def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]:
+ return self.process_single_token(token)
+
+ def generate_output(
+ self,
+ processed_data: List[Dict[str, Any]],
+ format_type: str
+ ) -> str:
+ logger.info("Generating %s output for %d items",
+ format_type, len(processed_data))
- Args:
- processed_data: 处理后的数据
- format_type: 输出格式 (json/html/markdown)
-
- Returns:
- 格式化后的字符串
-
- Raises:
- ValueError: 不支持的格式类型
- """
if format_type == "json":
return self.generate_json_output(processed_data)
elif format_type == "html":
@@ -71,20 +107,24 @@ class Processor:
return self.generate_markdown_output(processed_data)
else:
raise ValueError(f"Unsupported format type: {format_type}")
-
+
def generate_json_output(self, processed_data: List[Dict[str, Any]]) -> str:
- """生成JSON格式输出"""
import json
return json.dumps(processed_data, ensure_ascii=False, indent=2)
-
+
def generate_html_output(self, processed_data: List[Dict[str, Any]]) -> str:
- """生成HTML格式输出"""
return (
"<html><body>"
+ "".join(f"<p>{data}</p>" for data in processed_data)
+ "</body></html>"
)
-
+
def generate_markdown_output(self, processed_data: List[Dict[str, Any]]) -> str:
- """生成Markdown格式输出"""
return "\n".join(f"- {data}" for data in processed_data)
+
+ def get_statistics(self) -> Dict[str, Any]:
+ return {
+ "rule_count": self.rule_engine.rule_count(),
+ "custom_processor_count": len(self.custom_processors),
+ "has_rules_config": bool(self.rules),
+ }
diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py
new file mode 100644
index 0000000..f198d4e
--- /dev/null
+++ b/src/conventionalrp/core/rules.py
@@ -0,0 +1,238 @@
+from typing import Dict, Any, Callable, List, Optional
+from enum import Enum
+import re
+
+
+class RuleCondition(Enum):
+ """规则条件类型"""
+ EQUALS = "equals"
+ CONTAINS = "contains"
+ MATCHES = "matches"
+ STARTS_WITH = "starts_with"
+ ENDS_WITH = "ends_with"
+ IN_LIST = "in_list"
+ GREATER_THAN = "greater_than"
+ LESS_THAN = "less_than"
+
+
+class Rule:
+ def __init__(
+ self,
+ name: str,
+ condition: Dict[str, Any],
+ action: Dict[str, Any],
+ priority: int = 50
+ ):
+ self.name = name
+ self.condition = condition
+ self.action = action
+ self.priority = priority
+ self._compiled_patterns = {}
+
+ self._precompile_patterns()
+
+ def _precompile_patterns(self):
+ """预编译正则表达式以提高性能"""
+ if isinstance(self.condition, dict):
+ for key, value in self.condition.items():
+ if isinstance(value, dict) and value.get("type") == "matches":
+ pattern = value.get("pattern")
+ if pattern:
+ self._compiled_patterns[key] = re.compile(pattern)
+
+ def matches(self, data: Dict[str, Any]) -> bool:
+ """
+ 检查数据是否匹配规则条件
+ """
+ if not isinstance(self.condition, dict):
+ return False
+
+ for field, condition_spec in self.condition.items():
+ if not self._check_field_condition(data, field, condition_spec):
+ return False
+
+ return True
+
+ def _check_field_condition(
+ self,
+ data: Dict[str, Any],
+ field: str,
+ condition: Any
+ ) -> bool:
+ """检查单个字段的条件"""
+ value = data.get(field)
+
+ if not isinstance(condition, dict):
+ return value == condition
+
+ condition_type = condition.get("type")
+ expected_value = condition.get("value")
+
+ if condition_type == "equals":
+ return value == expected_value
+ elif condition_type == "contains":
+ return expected_value in str(value) if value else False
+ elif condition_type == "matches":
+ if field in self._compiled_patterns:
+ pattern = self._compiled_patterns[field]
+ return bool(pattern.search(str(value))) if value else False
+ return False
+ elif condition_type == "starts_with":
+ return str(value).startswith(expected_value) if value else False
+ elif condition_type == "ends_with":
+ return str(value).endswith(expected_value) if value else False
+ elif condition_type == "in_list":
+ return value in expected_value if isinstance(expected_value, list) else False
+ elif condition_type == "greater_than":
+ try:
+ return float(value) > float(expected_value)
+ except (ValueError, TypeError):
+ return False
+ elif condition_type == "less_than":
+ try:
+ return float(value) < float(expected_value)
+ except (ValueError, TypeError):
+ return False
+
+ return False
+
+ def apply(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ 对匹配的数据应用规则动作
+ """
+ result = data.copy()
+
+ if not isinstance(self.action, dict):
+ return result
+
+ action_type = self.action.get("type")
+
+ if action_type == "set_field":
+ field = self.action.get("field")
+ value = self.action.get("value")
+ if field:
+ result[field] = value
+
+ elif action_type == "add_field":
+ field = self.action.get("field")
+ value = self.action.get("value")
+ if field and field not in result:
+ result[field] = value
+
+ elif action_type == "remove_field":
+ field = self.action.get("field")
+ if field and field in result:
+ del result[field]
+
+ elif action_type == "transform":
+ field = self.action.get("field")
+ func_name = self.action.get("function")
+ if field and field in result and func_name:
+ result[field] = self._apply_transform(result[field], func_name)
+
+ elif action_type == "add_tag":
+ tag = self.action.get("tag")
+ if tag:
+ if "tags" not in result:
+ result["tags"] = []
+ if tag not in result["tags"]:
+ result["tags"].append(tag)
+
+ elif action_type == "copy_field":
+ source = self.action.get("source")
+ target = self.action.get("target")
+ if source and target and source in result:
+ result[target] = result[source]
+
+ return result
+
+ def _apply_transform(self, value: Any, func_name: str) -> Any:
+ transforms = {
+ "upper": lambda x: str(x).upper(),
+ "lower": lambda x: str(x).lower(),
+ "strip": lambda x: str(x).strip(),
+ "int": lambda x: int(x),
+ "float": lambda x: float(x),
+ "len": lambda x: len(x) if hasattr(x, '__len__') else 0,
+ }
+
+ func = transforms.get(func_name)
+ if func:
+ try:
+ return func(value)
+ except Exception:
+ return value
+ return value
+
+ def __repr__(self) -> str:
+ return f"Rule(name={self.name}, priority={self.priority})"
+
+
+class RuleEngine:
+ """
+ 规则引擎
+ """
+
+ def __init__(self):
+ self.rules: List[Rule] = []
+ self._sorted = False
+
+ def add_rule(self, rule: Rule):
+ self.rules.append(rule)
+ self._sorted = False
+
+ def add_rule_dict(
+ self,
+ name: str,
+ condition: Dict[str, Any],
+ action: Dict[str, Any],
+ priority: int = 50
+ ):
+ """
+ 从字典添加规则
+ """
+ rule = Rule(name, condition, action, priority)
+ self.add_rule(rule)
+
+ def _ensure_sorted(self):
+ """确保规则按优先级排序"""
+ if not self._sorted:
+ self.rules.sort(key=lambda r: r.priority, reverse=True)
+ self._sorted = True
+
+ def process(
+ self,
+ data: Dict[str, Any],
+ apply_all: bool = False
+ ) -> Dict[str, Any]:
+ self._ensure_sorted()
+ result = data.copy()
+
+ for rule in self.rules:
+ if rule.matches(result):
+ result = rule.apply(result)
+ if not apply_all:
+ break
+
+ return result
+
+ def process_batch(
+ self,
+ data_list: List[Dict[str, Any]],
+ apply_all: bool = False
+ ) -> List[Dict[str, Any]]:
+ return [self.process(data, apply_all) for data in data_list]
+
+ def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]:
+ self._ensure_sorted()
+ return [rule for rule in self.rules if rule.matches(data)]
+
+ def clear_rules(self):
+ self.rules.clear()
+ self._sorted = False
+
+ def rule_count(self) -> int:
+ return len(self.rules)
+
+ def __repr__(self) -> str:
+ return f"RuleEngine(rules={len(self.rules)})"