aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/conventionalrp
diff options
context:
space:
mode:
author简律纯 <i@jyunko.cn>2025-10-25 00:30:48 +0800
committer简律纯 <i@jyunko.cn>2025-10-25 00:30:48 +0800
commitcbc653ffd0ea9abf4360623dc7a7651e1a49cc61 (patch)
treeea3c396148158077bae3e77eaa9341f8c1990636 /src/conventionalrp
parent08299b37dfda86e56e4f2b442f68ccd2da7a82e3 (diff)
downloadconventional_role_play-cbc653ffd0ea9abf4360623dc7a7651e1a49cc61.tar.gz
conventional_role_play-cbc653ffd0ea9abf4360623dc7a7651e1a49cc61.zip
feat: Implement plugin system with combat tracker and dice analyzer
- Added `plugin_system_demo.py` to demonstrate basic plugin usage, processing, and analysis. - Created `CombatTrackerPlugin` for tracking combat statistics including damage and healing. - Developed `DiceAnalyzerPlugin` for analyzing dice rolls and calculating success rates. - Introduced `renderer_demo.py` for rendering output in HTML, Markdown, and JSON formats. - Implemented `rule_system_demo.py` to showcase rule engine capabilities with various examples. - Established core rule engine functionality in `rules.py` with support for conditions and actions. - Enhanced base plugin structure in `base.py` to support different plugin types (Processor, Renderer, Analyzer). - Added custom exception handling in `exceptions.py` for better error management. - Configured logging setup in `logging_config.py` for improved logging capabilities. - Created unit tests in `test_rust_core.py` to validate core functionalities and performance.
Diffstat (limited to 'src/conventionalrp')
-rw-r--r--src/conventionalrp/__init__.py36
-rw-r--r--src/conventionalrp/__main__.py9
-rw-r--r--src/conventionalrp/_core.pyi238
-rw-r--r--src/conventionalrp/base.py0
-rw-r--r--src/conventionalrp/core/__init__.py9
-rw-r--r--src/conventionalrp/core/processor.py150
-rw-r--r--src/conventionalrp/core/rules.py238
-rw-r--r--src/conventionalrp/extractors/__init__.py3
-rw-r--r--src/conventionalrp/extractors/rule_extractor.py25
-rw-r--r--src/conventionalrp/html_renderer.py0
-rw-r--r--src/conventionalrp/json_renderer.py0
-rw-r--r--src/conventionalrp/markdown_renderer.py0
-rw-r--r--src/conventionalrp/plugins/__init__.py20
-rw-r--r--src/conventionalrp/plugins/base.py103
-rw-r--r--src/conventionalrp/plugins/plugin_manager.py209
-rw-r--r--src/conventionalrp/renderers/__init__.py3
-rw-r--r--src/conventionalrp/renderers/html_renderer.py433
-rw-r--r--src/conventionalrp/renderers/json_renderer.py77
-rw-r--r--src/conventionalrp/renderers/markdown_renderer.py176
-rw-r--r--src/conventionalrp/tokenizer.py0
-rw-r--r--src/conventionalrp/utils/__init__.py33
-rw-r--r--src/conventionalrp/utils/exceptions.py104
-rw-r--r--src/conventionalrp/utils/logging_config.py77
-rw-r--r--src/conventionalrp/utils/text_processing.py0
24 files changed, 1766 insertions, 177 deletions
diff --git a/src/conventionalrp/__init__.py b/src/conventionalrp/__init__.py
index 06dbd63..ab4b17d 100644
--- a/src/conventionalrp/__init__.py
+++ b/src/conventionalrp/__init__.py
@@ -1,15 +1,45 @@
+"""
+Conventional Role Play SDK (ConventionalRP)
+"""
+
import sys
from importlib.metadata import version
from . import _core
+from .core import Parser, Processor, Rule, RuleEngine
+from .utils import (
+ setup_logging,
+ get_logger,
+ ConventionalRPError,
+ ParserError,
+ RuleError,
+ ProcessorError,
+ ValidationError,
+ ConfigurationError,
+)
-__all__ = ["_core", "__version__"]
+__all__ = [
+ "Parser",
+ "Processor",
+ "Rule",
+ "RuleEngine",
+ "setup_logging",
+ "get_logger",
+ "ConventionalRPError",
+ "ParserError",
+ "RuleError",
+ "ProcessorError",
+ "ValidationError",
+ "ConfigurationError",
+ "__version__",
+]
if sys.version_info >= (3, 8):
- # For Python 3.8+
__version__ = version("conventionalrp")
elif sys.version_info < (3, 8):
from pkg_resources import get_distribution
- # For Python < 3.8
__version__ = get_distribution("conventionalrp").version
+
+_default_logger = setup_logging(level="INFO")
+
diff --git a/src/conventionalrp/__main__.py b/src/conventionalrp/__main__.py
index fa4f45e..e69de29 100644
--- a/src/conventionalrp/__main__.py
+++ b/src/conventionalrp/__main__.py
@@ -1,9 +0,0 @@
-from ._core import sum_as_string
-
-
-def main():
- print(sum_as_string(1, 2))
-
-
-if __name__ == "__main__":
- main()
diff --git a/src/conventionalrp/_core.pyi b/src/conventionalrp/_core.pyi
index 0805887..6f87269 100644
--- a/src/conventionalrp/_core.pyi
+++ b/src/conventionalrp/_core.pyi
@@ -1,3 +1,237 @@
-def sum_as_string(a: int, b: int) -> str: ...
+"""
+提供高性能的文本解析和匹配功能
+"""
-class Base: ...
+from typing import Dict, List, Optional, Tuple
+
+class Base:
+ """基础类(向后兼容)"""
+ ...
+
+class Token:
+ """
+ 解析后的 Token
+
+ 表示文本中的一个语义单元,包含类型、内容和元数据
+ """
+
+ token_type: str
+ content: str
+ metadata: Dict[str, str]
+
+ def __init__(self, token_type: str, content: str) -> None:
+ """
+ 创建新的 Token
+
+ Args:
+ token_type: Token 类型
+ content: Token 内容
+ """
+ ...
+
+ def add_metadata(self, key: str, value: str) -> None:
+ """
+ 添加元数据
+
+ Args:
+ key: 元数据键
+ value: 元数据值
+ """
+ ...
+
+ def get_metadata(self, key: str) -> Optional[str]:
+ """
+ 获取元数据
+
+ Args:
+ key: 元数据键
+
+ Returns:
+ 元数据值,如果不存在返回 None
+ """
+ ...
+
+ def to_dict(self) -> Dict[str, any]:
+ """
+ 转换为 Python 字典
+
+ Returns:
+ 包含 Token 所有信息的字典
+ """
+ ...
+
+ def __repr__(self) -> str: ...
+
+class RegexRule:
+ """
+ 正则表达式规则
+
+ 用于文本匹配和提取
+ """
+
+ rule_type: str
+ priority: int
+
+ def __init__(self, pattern: str, rule_type: str, priority: int) -> None:
+ """
+ 创建正则规则
+
+ Args:
+ pattern: 正则表达式模式
+ rule_type: 规则类型
+ priority: 优先级(数字越大优先级越高)
+
+ Raises:
+ ValueError: 如果正则表达式无效
+ """
+ ...
+
+ def matches(self, text: str) -> bool:
+ """
+ 测试文本是否匹配
+
+ Args:
+ text: 待测试的文本
+
+ Returns:
+ 是否匹配
+ """
+ ...
+
+ def extract(self, text: str) -> Optional[List[str]]:
+ """
+ 提取匹配的捕获组
+
+ Args:
+ text: 待提取的文本
+
+ Returns:
+ 捕获组列表,如果不匹配返回 None
+ """
+ ...
+
+ def find_all(self, text: str) -> List[Tuple[int, int, str]]:
+ """
+ 查找所有匹配
+
+ Args:
+ text: 待搜索的文本
+
+ Returns:
+ 列表,每个元素为 (start, end, matched_text)
+ """
+ ...
+
+class TextParser:
+ """
+ 高性能文本解析器
+
+ 支持多规则、优先级排序的文本解析
+ """
+
+ def __init__(self) -> None:
+ """创建新的解析器"""
+ ...
+
+ def add_rule(self, pattern: str, rule_type: str, priority: int) -> None:
+ """
+ 添加解析规则
+
+ Args:
+ pattern: 正则表达式模式
+ rule_type: 规则类型
+ priority: 优先级
+
+ Raises:
+ ValueError: 如果正则表达式无效
+ """
+ ...
+
+ def parse_line(self, text: str) -> List[Tuple[str, str, int, int]]:
+ """
+ 解析单行文本
+
+ Args:
+ text: 待解析的文本
+
+ Returns:
+ 列表,每个元素为 (type, content, start, end)
+ """
+ ...
+
+ def parse_lines(self, lines: List[str]) -> List[List[Dict[str, any]]]:
+ """
+ 批量解析多行文本
+
+ Args:
+ lines: 文本行列表
+
+ Returns:
+ 每行的解析结果列表
+ """
+ ...
+
+ def clear_rules(self) -> None:
+ """清除所有规则"""
+ ...
+
+ def rule_count(self) -> int:
+ """
+ 获取规则数量
+
+ Returns:
+ 当前规则数量
+ """
+ ...
+
+class FastMatcher:
+ """
+ 快速字符串匹配器
+
+ 用于高效的多模式字符串匹配
+ """
+
+ def __init__(self, patterns: List[str]) -> None:
+ """
+ 创建匹配器
+
+ Args:
+ patterns: 模式列表
+ """
+ ...
+
+ def contains_any(self, text: str) -> bool:
+ """
+ 检查文本是否包含任意模式
+
+ Args:
+ text: 待检查的文本
+
+ Returns:
+ 是否包含任意模式
+ """
+ ...
+
+ def find_matches(self, text: str) -> List[str]:
+ """
+ 查找所有匹配的模式
+
+ Args:
+ text: 待搜索的文本
+
+ Returns:
+ 匹配的模式列表
+ """
+ ...
+
+ def count_matches(self, text: str) -> Dict[str, int]:
+ """
+ 统计每个模式的出现次数
+
+ Args:
+ text: 待统计的文本
+
+ Returns:
+ 字典,键为模式,值为出现次数
+ """
+ ...
diff --git a/src/conventionalrp/base.py b/src/conventionalrp/base.py
deleted file mode 100644
index e69de29..0000000
--- a/src/conventionalrp/base.py
+++ /dev/null
diff --git a/src/conventionalrp/core/__init__.py b/src/conventionalrp/core/__init__.py
index 91d0f8f..08829b8 100644
--- a/src/conventionalrp/core/__init__.py
+++ b/src/conventionalrp/core/__init__.py
@@ -1,3 +1,6 @@
-"""
-This file initializes the core module of the conventionalrp SDK.
-"""
+from .parser import Parser
+from .processor import Processor
+from .rules import Rule, RuleEngine
+
+__all__ = ["Parser", "Processor", "Rule", "RuleEngine"]
+
diff --git a/src/conventionalrp/core/processor.py b/src/conventionalrp/core/processor.py
index bc74ffb..12ca32b 100644
--- a/src/conventionalrp/core/processor.py
+++ b/src/conventionalrp/core/processor.py
@@ -1,68 +1,104 @@
-from typing import List, Dict, Any, Optional
+from typing import List, Dict, Any, Optional, Callable
+import logging
+from .rules import RuleEngine, Rule
+
+logger = logging.getLogger(__name__)
class Processor:
- """处理器,用于处理解析后的token"""
-
def __init__(self, rules: Optional[Dict[str, Any]] = None):
- """
- 初始化处理器
-
- Args:
- rules: 处理规则(可选)
- """
self.rules = rules or {}
-
- def process_tokens(self, tokens: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 处理token列表
+ self.rule_engine = RuleEngine()
+ self.custom_processors: List[Callable] = []
+
+ self._load_rules_to_engine()
- Args:
- tokens: 解析后的token列表
+ logger.info("Processor initialized with %d rules",
+ self.rule_engine.rule_count())
+
+ def _load_rules_to_engine(self):
+ if not isinstance(self.rules, dict):
+ return
+
+ rules_list = self.rules.get("rules", [])
+ for rule_dict in rules_list:
+ if not isinstance(rule_dict, dict):
+ continue
- Returns:
- 处理后的数据列表
- """
+ try:
+ self.rule_engine.add_rule_dict(
+ name=rule_dict.get("name", "unnamed"),
+ condition=rule_dict.get("condition", {}),
+ action=rule_dict.get("action", {}),
+ priority=rule_dict.get("priority", 50)
+ )
+ except Exception as e:
+ logger.warning("Failed to load rule: %s", e)
+
+ def add_rule(self, rule: Rule):
+ self.rule_engine.add_rule(rule)
+ logger.debug("Added rule: %s", rule.name)
+
+ def add_processor(self, processor: Callable[[Dict[str, Any]], Dict[str, Any]]):
+ self.custom_processors.append(processor)
+ logger.debug("Added custom processor")
+
+ def process_tokens(
+ self,
+ tokens: List[Dict[str, Any]],
+ apply_all_rules: bool = False
+ ) -> List[Dict[str, Any]]:
+ if not tokens:
+ logger.warning("Empty token list provided")
+ return []
+
+ logger.info("Processing %d tokens", len(tokens))
processed_data = []
- for token in tokens:
- processed_token = self.apply_rules(token)
- processed_data.append(processed_token)
- return processed_data
-
- def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]:
- """
- 对单个token应用规则
- Args:
- token: 单个token
-
- Returns:
- 处理后的token
- """
- # 基础实现:直接返回token
- # 可以在此添加更多处理逻辑
+ for i, token in enumerate(tokens):
+ try:
+ processed_token = self.process_single_token(token, apply_all_rules)
+ processed_data.append(processed_token)
+ except Exception as e:
+ logger.error("Error processing token %d: %s", i, e)
+ # 发生错误时保留原始 token
+ processed_data.append(token)
+
+ logger.info("Successfully processed %d tokens", len(processed_data))
+ return processed_data
+
+ def process_single_token(
+ self,
+ token: Dict[str, Any],
+ apply_all_rules: bool = False
+ ) -> Dict[str, Any]:
processed = token.copy()
- # 添加处理时间戳
+ if self.rule_engine.rule_count() > 0:
+ processed = self.rule_engine.process(processed, apply_all_rules)
+
+ for processor in self.custom_processors:
+ try:
+ processed = processor(processed)
+ except Exception as e:
+ logger.error("Custom processor failed: %s", e)
+
if "timestamp" in processed:
processed["processed"] = True
return processed
-
- def generate_output(self, processed_data: List[Dict[str, Any]], format_type: str) -> str:
- """
- 生成指定格式的输出
+
+ def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]:
+ return self.process_single_token(token)
+
+ def generate_output(
+ self,
+ processed_data: List[Dict[str, Any]],
+ format_type: str
+ ) -> str:
+ logger.info("Generating %s output for %d items",
+ format_type, len(processed_data))
- Args:
- processed_data: 处理后的数据
- format_type: 输出格式 (json/html/markdown)
-
- Returns:
- 格式化后的字符串
-
- Raises:
- ValueError: 不支持的格式类型
- """
if format_type == "json":
return self.generate_json_output(processed_data)
elif format_type == "html":
@@ -71,20 +107,24 @@ class Processor:
return self.generate_markdown_output(processed_data)
else:
raise ValueError(f"Unsupported format type: {format_type}")
-
+
def generate_json_output(self, processed_data: List[Dict[str, Any]]) -> str:
- """生成JSON格式输出"""
import json
return json.dumps(processed_data, ensure_ascii=False, indent=2)
-
+
def generate_html_output(self, processed_data: List[Dict[str, Any]]) -> str:
- """生成HTML格式输出"""
return (
"<html><body>"
+ "".join(f"<p>{data}</p>" for data in processed_data)
+ "</body></html>"
)
-
+
def generate_markdown_output(self, processed_data: List[Dict[str, Any]]) -> str:
- """生成Markdown格式输出"""
return "\n".join(f"- {data}" for data in processed_data)
+
+ def get_statistics(self) -> Dict[str, Any]:
+ return {
+ "rule_count": self.rule_engine.rule_count(),
+ "custom_processor_count": len(self.custom_processors),
+ "has_rules_config": bool(self.rules),
+ }
diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py
new file mode 100644
index 0000000..f198d4e
--- /dev/null
+++ b/src/conventionalrp/core/rules.py
@@ -0,0 +1,238 @@
+from typing import Dict, Any, Callable, List, Optional
+from enum import Enum
+import re
+
+
+class RuleCondition(Enum):
+ """规则条件类型"""
+ EQUALS = "equals"
+ CONTAINS = "contains"
+ MATCHES = "matches"
+ STARTS_WITH = "starts_with"
+ ENDS_WITH = "ends_with"
+ IN_LIST = "in_list"
+ GREATER_THAN = "greater_than"
+ LESS_THAN = "less_than"
+
+
+class Rule:
+ def __init__(
+ self,
+ name: str,
+ condition: Dict[str, Any],
+ action: Dict[str, Any],
+ priority: int = 50
+ ):
+ self.name = name
+ self.condition = condition
+ self.action = action
+ self.priority = priority
+ self._compiled_patterns = {}
+
+ self._precompile_patterns()
+
+ def _precompile_patterns(self):
+ """预编译正则表达式以提高性能"""
+ if isinstance(self.condition, dict):
+ for key, value in self.condition.items():
+ if isinstance(value, dict) and value.get("type") == "matches":
+ pattern = value.get("pattern")
+ if pattern:
+ self._compiled_patterns[key] = re.compile(pattern)
+
+ def matches(self, data: Dict[str, Any]) -> bool:
+ """
+ 检查数据是否匹配规则条件
+ """
+ if not isinstance(self.condition, dict):
+ return False
+
+ for field, condition_spec in self.condition.items():
+ if not self._check_field_condition(data, field, condition_spec):
+ return False
+
+ return True
+
+ def _check_field_condition(
+ self,
+ data: Dict[str, Any],
+ field: str,
+ condition: Any
+ ) -> bool:
+ """检查单个字段的条件"""
+ value = data.get(field)
+
+ if not isinstance(condition, dict):
+ return value == condition
+
+ condition_type = condition.get("type")
+ expected_value = condition.get("value")
+
+ if condition_type == "equals":
+ return value == expected_value
+ elif condition_type == "contains":
+ return expected_value in str(value) if value else False
+ elif condition_type == "matches":
+ if field in self._compiled_patterns:
+ pattern = self._compiled_patterns[field]
+ return bool(pattern.search(str(value))) if value else False
+ return False
+ elif condition_type == "starts_with":
+ return str(value).startswith(expected_value) if value else False
+ elif condition_type == "ends_with":
+ return str(value).endswith(expected_value) if value else False
+ elif condition_type == "in_list":
+ return value in expected_value if isinstance(expected_value, list) else False
+ elif condition_type == "greater_than":
+ try:
+ return float(value) > float(expected_value)
+ except (ValueError, TypeError):
+ return False
+ elif condition_type == "less_than":
+ try:
+ return float(value) < float(expected_value)
+ except (ValueError, TypeError):
+ return False
+
+ return False
+
+ def apply(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ 对匹配的数据应用规则动作
+ """
+ result = data.copy()
+
+ if not isinstance(self.action, dict):
+ return result
+
+ action_type = self.action.get("type")
+
+ if action_type == "set_field":
+ field = self.action.get("field")
+ value = self.action.get("value")
+ if field:
+ result[field] = value
+
+ elif action_type == "add_field":
+ field = self.action.get("field")
+ value = self.action.get("value")
+ if field and field not in result:
+ result[field] = value
+
+ elif action_type == "remove_field":
+ field = self.action.get("field")
+ if field and field in result:
+ del result[field]
+
+ elif action_type == "transform":
+ field = self.action.get("field")
+ func_name = self.action.get("function")
+ if field and field in result and func_name:
+ result[field] = self._apply_transform(result[field], func_name)
+
+ elif action_type == "add_tag":
+ tag = self.action.get("tag")
+ if tag:
+ if "tags" not in result:
+ result["tags"] = []
+ if tag not in result["tags"]:
+ result["tags"].append(tag)
+
+ elif action_type == "copy_field":
+ source = self.action.get("source")
+ target = self.action.get("target")
+ if source and target and source in result:
+ result[target] = result[source]
+
+ return result
+
+ def _apply_transform(self, value: Any, func_name: str) -> Any:
+ transforms = {
+ "upper": lambda x: str(x).upper(),
+ "lower": lambda x: str(x).lower(),
+ "strip": lambda x: str(x).strip(),
+ "int": lambda x: int(x),
+ "float": lambda x: float(x),
+ "len": lambda x: len(x) if hasattr(x, '__len__') else 0,
+ }
+
+ func = transforms.get(func_name)
+ if func:
+ try:
+ return func(value)
+ except Exception:
+ return value
+ return value
+
+ def __repr__(self) -> str:
+ return f"Rule(name={self.name}, priority={self.priority})"
+
+
+class RuleEngine:
+ """
+ 规则引擎
+ """
+
+ def __init__(self):
+ self.rules: List[Rule] = []
+ self._sorted = False
+
+ def add_rule(self, rule: Rule):
+ self.rules.append(rule)
+ self._sorted = False
+
+ def add_rule_dict(
+ self,
+ name: str,
+ condition: Dict[str, Any],
+ action: Dict[str, Any],
+ priority: int = 50
+ ):
+ """
+ 从字典添加规则
+ """
+ rule = Rule(name, condition, action, priority)
+ self.add_rule(rule)
+
+ def _ensure_sorted(self):
+ """确保规则按优先级排序"""
+ if not self._sorted:
+ self.rules.sort(key=lambda r: r.priority, reverse=True)
+ self._sorted = True
+
+ def process(
+ self,
+ data: Dict[str, Any],
+ apply_all: bool = False
+ ) -> Dict[str, Any]:
+ self._ensure_sorted()
+ result = data.copy()
+
+ for rule in self.rules:
+ if rule.matches(result):
+ result = rule.apply(result)
+ if not apply_all:
+ break
+
+ return result
+
+ def process_batch(
+ self,
+ data_list: List[Dict[str, Any]],
+ apply_all: bool = False
+ ) -> List[Dict[str, Any]]:
+ return [self.process(data, apply_all) for data in data_list]
+
+ def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]:
+ self._ensure_sorted()
+ return [rule for rule in self.rules if rule.matches(data)]
+
+ def clear_rules(self):
+ self.rules.clear()
+ self._sorted = False
+
+ def rule_count(self) -> int:
+ return len(self.rules)
+
+ def __repr__(self) -> str:
+ return f"RuleEngine(rules={len(self.rules)})"
diff --git a/src/conventionalrp/extractors/__init__.py b/src/conventionalrp/extractors/__init__.py
index e693e03..e69de29 100644
--- a/src/conventionalrp/extractors/__init__.py
+++ b/src/conventionalrp/extractors/__init__.py
@@ -1,3 +0,0 @@
-"""
-This file initializes the extractors module.
-"""
diff --git a/src/conventionalrp/extractors/rule_extractor.py b/src/conventionalrp/extractors/rule_extractor.py
index bfc60c8..12b9e4b 100644
--- a/src/conventionalrp/extractors/rule_extractor.py
+++ b/src/conventionalrp/extractors/rule_extractor.py
@@ -15,12 +15,6 @@ class RuleExtractor(BaseExtractor):
"""规则提取器,用于从配置文件加载解析规则"""
def __init__(self, config_file: Optional[str] = None):
- """
- 初始化规则提取器
-
- Args:
- config_file: 规则配置文件路径(可选)
- """
self.config_file = config_file
self.rules: Dict[str, Any] = {}
if config_file:
@@ -29,16 +23,6 @@ class RuleExtractor(BaseExtractor):
def load_rules_from_file(self, config_file: str) -> Dict[str, Any]:
"""
从文件加载规则
-
- Args:
- config_file: 规则配置文件路径
-
- Returns:
- 解析后的规则字典
-
- Raises:
- FileNotFoundError: 文件不存在
- ValueError: 文件内容为空或格式错误
"""
if not Path(config_file).exists():
raise FileNotFoundError(f"Rule file not found: {config_file}")
@@ -56,12 +40,6 @@ class RuleExtractor(BaseExtractor):
def load_rules(self, config_file: str) -> Dict[str, Any]:
"""
加载规则(兼容旧接口)
-
- Args:
- config_file: 规则配置文件路径
-
- Returns:
- 解析后的规则字典
"""
self.rules = self.load_rules_from_file(config_file)
return self.rules
@@ -69,8 +47,5 @@ class RuleExtractor(BaseExtractor):
def extract(self) -> Dict[str, Any]:
"""
提取规则
-
- Returns:
- 规则字典
"""
return self.rules
diff --git a/src/conventionalrp/html_renderer.py b/src/conventionalrp/html_renderer.py
deleted file mode 100644
index e69de29..0000000
--- a/src/conventionalrp/html_renderer.py
+++ /dev/null
diff --git a/src/conventionalrp/json_renderer.py b/src/conventionalrp/json_renderer.py
deleted file mode 100644
index e69de29..0000000
--- a/src/conventionalrp/json_renderer.py
+++ /dev/null
diff --git a/src/conventionalrp/markdown_renderer.py b/src/conventionalrp/markdown_renderer.py
deleted file mode 100644
index e69de29..0000000
--- a/src/conventionalrp/markdown_renderer.py
+++ /dev/null
diff --git a/src/conventionalrp/plugins/__init__.py b/src/conventionalrp/plugins/__init__.py
index c7bbee6..e759fd3 100644
--- a/src/conventionalrp/plugins/__init__.py
+++ b/src/conventionalrp/plugins/__init__.py
@@ -1,3 +1,17 @@
-"""
-This file initializes the plugins module.
-"""
+from .base import (
+ Plugin,
+ ParserPlugin,
+ ProcessorPlugin,
+ RendererPlugin,
+ AnalyzerPlugin,
+)
+from .plugin_manager import PluginManager
+
+__all__ = [
+ "Plugin",
+ "ParserPlugin",
+ "ProcessorPlugin",
+ "RendererPlugin",
+ "AnalyzerPlugin",
+ "PluginManager",
+]
diff --git a/src/conventionalrp/plugins/base.py b/src/conventionalrp/plugins/base.py
new file mode 100644
index 0000000..f9e92a7
--- /dev/null
+++ b/src/conventionalrp/plugins/base.py
@@ -0,0 +1,103 @@
+from typing import Any, Dict, List, Optional
+from abc import ABC, abstractmethod
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class Plugin(ABC):
+ """
+ 插件基类
+
+ 所有插件必须继承此类并实现必要的方法
+ """
+
+ def __init__(self, name: str, version: str = "1.0.0"):
+ self.name = name
+ self.version = version
+ self.enabled = True
+ self.logger = logging.getLogger(f"conventionalrp.plugins.{name}")
+
+ @abstractmethod
+ def initialize(self, config: Optional[Dict[str, Any]] = None):
+ pass
+
+ @abstractmethod
+ def process(self, data: Any) -> Any:
+ pass
+
+ def on_enable(self):
+ """插件启用时调用"""
+ self.logger.info(f"Plugin {self.name} enabled")
+
+ def on_disable(self):
+ """插件禁用时调用"""
+ self.logger.info(f"Plugin {self.name} disabled")
+
+ def get_metadata(self) -> Dict[str, Any]:
+ return {
+ "name": self.name,
+ "version": self.version,
+ "enabled": self.enabled,
+ "type": self.__class__.__name__,
+ }
+
+ def __repr__(self) -> str:
+ return f"Plugin(name={self.name}, version={self.version}, enabled={self.enabled})"
+
+
+class ParserPlugin(Plugin):
+ def __init__(self, name: str, version: str = "1.0.0"):
+ super().__init__(name, version)
+ self.priority = 50
+
+ @abstractmethod
+ def can_parse(self, text: str) -> bool:
+ """
+ 判断是否可以解析给定文本
+ """
+ pass
+
+ @abstractmethod
+ def parse(self, text: str) -> List[Dict[str, Any]]:
+ """
+ 解析文本
+ """
+ pass
+
+
+class ProcessorPlugin(Plugin):
+ @abstractmethod
+ def process_token(self, token: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ 处理单个 token
+ """
+ pass
+
+ def process(self, data: Any) -> Any:
+ """
+ 处理数据(实现基类方法)
+ """
+ if isinstance(data, dict):
+ return self.process_token(data)
+ elif isinstance(data, list):
+ return [self.process_token(token) for token in data]
+ return data
+
+
+class RendererPlugin(Plugin):
+ @abstractmethod
+ def render(self, data: Any) -> str:
+ pass
+
+ def process(self, data: Any) -> Any:
+ return self.render(data)
+
+
+class AnalyzerPlugin(Plugin):
+ @abstractmethod
+ def analyze(self, data: Any) -> Dict[str, Any]:
+ pass
+
+ def process(self, data: Any) -> Any:
+ return self.analyze(data)
diff --git a/src/conventionalrp/plugins/plugin_manager.py b/src/conventionalrp/plugins/plugin_manager.py
index 0d49a9c..0230bda 100644
--- a/src/conventionalrp/plugins/plugin_manager.py
+++ b/src/conventionalrp/plugins/plugin_manager.py
@@ -1,19 +1,204 @@
+"""
+插件管理器
+"""
+
import os
+import sys
import importlib
+import importlib.util
+from pathlib import Path
+from typing import Dict, List, Optional, Type, Any
+import logging
+
+from .base import Plugin
+
+logger = logging.getLogger(__name__)
class PluginManager:
- def __init__(self, plugin_dir: str):
- self.plugin_dir = plugin_dir
- self.plugins = []
+ def __init__(self, plugin_dirs: Optional[List[str]] = None):
+ self.plugin_dirs = plugin_dirs or []
+ self.plugins: Dict[str, Plugin] = {}
+ self.plugin_classes: Dict[str, Type[Plugin]] = {}
+
+ logger.info("PluginManager initialized with %d directories",
+ len(self.plugin_dirs))
+
+ def add_plugin_dir(self, directory: str):
- def load_plugins(self):
- for plugin in os.listdir(self.plugin_dir):
- if plugin.endswith(".py"):
- plugin_name = plugin.split(".")[0]
- module = importlib.import_module(f"{self.plugin_dir}.{plugin_name}")
- self.plugins.append(module)
+ if directory not in self.plugin_dirs:
+ self.plugin_dirs.append(directory)
+ logger.info(f"Added plugin directory: {directory}")
+
+ def discover_plugins(self) -> List[str]:
+ discovered = []
+
+ for plugin_dir in self.plugin_dirs:
+ path = Path(plugin_dir)
+ if not path.exists():
+ logger.warning(f"Plugin directory does not exist: {plugin_dir}")
+ continue
+
+ if str(path.parent) not in sys.path:
+ sys.path.insert(0, str(path.parent))
- def run_plugins(self):
- for plugin in self.plugins:
- plugin.run()
+ for py_file in path.glob("*.py"):
+ if py_file.name.startswith("_"):
+ continue
+
+ module_name = py_file.stem
+ discovered.append(module_name)
+ logger.debug(f"Discovered plugin module: {module_name}")
+
+ logger.info(f"Discovered {len(discovered)} plugin modules")
+ return discovered
+
+ def load_plugin_from_file(self, file_path: str) -> Optional[Type[Plugin]]:
+ try:
+ module_name = Path(file_path).stem
+ spec = importlib.util.spec_from_file_location(module_name, file_path)
+
+ if spec is None or spec.loader is None:
+ logger.error(f"Failed to load plugin spec: {file_path}")
+ return None
+
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+
+ for name in dir(module):
+ obj = getattr(module, name)
+ if (isinstance(obj, type) and
+ issubclass(obj, Plugin) and
+ obj is not Plugin):
+ logger.info(f"Loaded plugin class: {name} from {file_path}")
+ return obj
+
+ logger.warning(f"No Plugin subclass found in: {file_path}")
+ return None
+
+ except Exception as e:
+ logger.error(f"Error loading plugin from {file_path}: {e}")
+ return None
+
+ def load_plugin(self, module_name: str) -> Optional[str]:
+ try:
+ module = importlib.import_module(module_name)
+
+ # 查找 Plugin 子类
+ for name in dir(module):
+ obj = getattr(module, name)
+ if (isinstance(obj, type) and
+ issubclass(obj, Plugin) and
+ obj is not Plugin):
+
+ plugin_class = obj
+ self.plugin_classes[name] = plugin_class
+ logger.info(f"Loaded plugin class: {name}")
+ return name
+
+ logger.warning(f"No Plugin subclass found in module: {module_name}")
+ return None
+
+ except Exception as e:
+ logger.error(f"Error loading plugin module {module_name}: {e}")
+ return None
+
+ def register_plugin(
+ self,
+ plugin: Plugin,
+ replace: bool = False
+ ) -> bool:
+ if plugin.name in self.plugins and not replace:
+ logger.warning(f"Plugin {plugin.name} already registered")
+ return False
+
+ self.plugins[plugin.name] = plugin
+ plugin.on_enable()
+ logger.info(f"Registered plugin: {plugin.name}")
+ return True
+
+ def unregister_plugin(self, plugin_name: str) -> bool:
+ if plugin_name not in self.plugins:
+ logger.warning(f"Plugin {plugin_name} not found")
+ return False
+
+ plugin = self.plugins[plugin_name]
+ plugin.on_disable()
+ del self.plugins[plugin_name]
+ logger.info(f"Unregistered plugin: {plugin_name}")
+ return True
+
+ def get_plugin(self, plugin_name: str) -> Optional[Plugin]:
+ return self.plugins.get(plugin_name)
+
+ def list_plugins(self) -> List[Dict[str, Any]]:
+ return [plugin.get_metadata() for plugin in self.plugins.values()]
+
+ def enable_plugin(self, plugin_name: str) -> bool:
+ plugin = self.plugins.get(plugin_name)
+ if plugin is None:
+ logger.warning(f"Plugin {plugin_name} not found")
+ return False
+
+ if not plugin.enabled:
+ plugin.enabled = True
+ plugin.on_enable()
+ logger.info(f"Enabled plugin: {plugin_name}")
+
+ return True
+
+ def disable_plugin(self, plugin_name: str) -> bool:
+ plugin = self.plugins.get(plugin_name)
+ if plugin is None:
+ logger.warning(f"Plugin {plugin_name} not found")
+ return False
+
+ if plugin.enabled:
+ plugin.enabled = False
+ plugin.on_disable()
+ logger.info(f"Disabled plugin: {plugin_name}")
+
+ return True
+
+ def execute_plugins(
+ self,
+ data: Any,
+ plugin_type: Optional[Type[Plugin]] = None
+ ) -> Any:
+ result = data
+
+ for plugin in self.plugins.values():
+ if not plugin.enabled:
+ continue
+
+ if plugin_type is not None and not isinstance(plugin, plugin_type):
+ continue
+
+ try:
+ result = plugin.process(result)
+ logger.debug(f"Executed plugin: {plugin.name}")
+ except Exception as e:
+ logger.error(f"Error executing plugin {plugin.name}: {e}")
+
+ return result
+
+ def clear_plugins(self):
+ for plugin_name in list(self.plugins.keys()):
+ self.unregister_plugin(plugin_name)
+
+ self.plugin_classes.clear()
+ logger.info("Cleared all plugins")
+
+ def get_statistics(self) -> Dict[str, Any]:
+ enabled_count = sum(1 for p in self.plugins.values() if p.enabled)
+
+ return {
+ "total_plugins": len(self.plugins),
+ "enabled_plugins": enabled_count,
+ "disabled_plugins": len(self.plugins) - enabled_count,
+ "plugin_classes": len(self.plugin_classes),
+ "plugin_directories": len(self.plugin_dirs),
+ }
+
+ def __repr__(self) -> str:
+ return f"PluginManager(plugins={len(self.plugins)}, enabled={sum(1 for p in self.plugins.values() if p.enabled)})"
diff --git a/src/conventionalrp/renderers/__init__.py b/src/conventionalrp/renderers/__init__.py
index 6d5ac7a..e69de29 100644
--- a/src/conventionalrp/renderers/__init__.py
+++ b/src/conventionalrp/renderers/__init__.py
@@ -1,3 +0,0 @@
-"""
-This file initializes the renderers module.
-"""
diff --git a/src/conventionalrp/renderers/html_renderer.py b/src/conventionalrp/renderers/html_renderer.py
index 75efcd3..d435fc9 100644
--- a/src/conventionalrp/renderers/html_renderer.py
+++ b/src/conventionalrp/renderers/html_renderer.py
@@ -1,22 +1,423 @@
+from typing import Any, Dict, List, Optional
from .base import BaseRenderer
class HTMLRenderer(BaseRenderer):
- def __init__(self):
+ THEMES = {
+ "light": {
+ "bg_color": "#ffffff",
+ "text_color": "#333333",
+ "header_bg": "#f5f5f5",
+ "border_color": "#e0e0e0",
+ "dialogue_color": "#4CAF50",
+ "dice_color": "#2196F3",
+ "narration_color": "#FF9800",
+ "system_color": "#9E9E9E",
+ "success_color": "#43a047",
+ "failure_color": "#e53935",
+ "code_bg": "#f5f5f5",
+ },
+ "dark": {
+ "bg_color": "#1e1e1e",
+ "text_color": "#d4d4d4",
+ "header_bg": "#2d2d30",
+ "border_color": "#3e3e42",
+ "dialogue_color": "#6adb8d",
+ "dice_color": "#5fb3f5",
+ "narration_color": "#ffb74d",
+ "system_color": "#bdbdbd",
+ "success_color": "#66bb6a",
+ "failure_color": "#ef5350",
+ "code_bg": "#2d2d30",
+ },
+ "fantasy": {
+ "bg_color": "#f9f6f1",
+ "text_color": "#3e2723",
+ "header_bg": "#d7ccc8",
+ "border_color": "#bcaaa4",
+ "dialogue_color": "#8d6e63",
+ "dice_color": "#5d4037",
+ "narration_color": "#795548",
+ "system_color": "#a1887f",
+ "success_color": "#7cb342",
+ "failure_color": "#c62828",
+ "code_bg": "#efebe9",
+ },
+ }
+
+ def __init__(self, theme: str = "light", custom_css: Optional[str] = None):
super().__init__()
- self.title = "TRPG Log Output"
-
- def render(self, data):
- html_content = f"<html><head><title>{self.title}</title></head><body>"
- html_content += "<h1>TRPG Log Output</h1>"
- html_content += "<ul>"
-
- for entry in data:
- html_content += f"<li>{entry}</li>"
-
- html_content += "</ul></body></html>"
- return html_content
-
+ self.title = "Rendered Log"
+ self.theme = theme if theme in self.THEMES else "light"
+ self.custom_css = custom_css
+
+ def _get_css(self) -> str:
+ colors = self.THEMES[self.theme]
+
+ css = f"""
+ * {{
+ margin: 0;
+ padding: 0;
+ box-sizing: border-box;
+ }}
+
+ body {{
+ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
+ background-color: {colors['bg_color']};
+ color: {colors['text_color']};
+ line-height: 1.6;
+ padding: 20px;
+ max-width: 1200px;
+ margin: 0 auto;
+ }}
+
+ header {{
+ background: linear-gradient(135deg, {colors['header_bg']} 0%, {colors['border_color']} 100%);
+ padding: 30px;
+ border-radius: 10px;
+ margin-bottom: 30px;
+ box-shadow: 0 4px 6px rgba(0,0,0,0.1);
+ }}
+
+ h1 {{
+ font-size: 2.5em;
+ font-weight: 700;
+ margin-bottom: 10px;
+ color: {colors['text_color']};
+ }}
+
+ .subtitle {{
+ font-size: 1.1em;
+ opacity: 0.8;
+ }}
+
+ .stats {{
+ display: flex;
+ gap: 20px;
+ margin-top: 20px;
+ flex-wrap: wrap;
+ }}
+
+ .stat-item {{
+ background: {colors['bg_color']};
+ padding: 10px 20px;
+ border-radius: 5px;
+ font-size: 0.9em;
+ border: 1px solid {colors['border_color']};
+ }}
+
+ .stat-label {{
+ opacity: 0.7;
+ margin-right: 5px;
+ }}
+
+ .stat-value {{
+ font-weight: bold;
+ }}
+
+ .content-wrapper {{
+ background: {colors['bg_color']};
+ border-radius: 10px;
+ padding: 20px;
+ box-shadow: 0 2px 4px rgba(0,0,0,0.05);
+ }}
+
+ .token {{
+ margin: 15px 0;
+ padding: 15px;
+ border-left: 4px solid {colors['border_color']};
+ border-radius: 5px;
+ background: {colors['header_bg']};
+ transition: transform 0.2s, box-shadow 0.2s;
+ }}
+
+ .token:hover {{
+ transform: translateX(5px);
+ box-shadow: 0 4px 8px rgba(0,0,0,0.1);
+ }}
+
+ .token.dialogue {{
+ border-left-color: {colors['dialogue_color']};
+ }}
+
+ .token.dice {{
+ border-left-color: {colors['dice_color']};
+ }}
+
+ .token.narration {{
+ border-left-color: {colors['narration_color']};
+ }}
+
+ .token.system {{
+ border-left-color: {colors['system_color']};
+ }}
+
+ .token.success {{
+ border-left-color: {colors['success_color']};
+ background: {colors['success_color']}15;
+ }}
+
+ .token.failure {{
+ border-left-color: {colors['failure_color']};
+ background: {colors['failure_color']}15;
+ }}
+
+ .token-header {{
+ display: flex;
+ align-items: center;
+ gap: 10px;
+ margin-bottom: 8px;
+ }}
+
+ .type-badge {{
+ display: inline-block;
+ padding: 3px 10px;
+ border-radius: 12px;
+ font-size: 0.75em;
+ font-weight: 600;
+ text-transform: uppercase;
+ letter-spacing: 0.5px;
+ }}
+
+ .type-badge.dialogue {{
+ background: {colors['dialogue_color']};
+ color: white;
+ }}
+
+ .type-badge.dice {{
+ background: {colors['dice_color']};
+ color: white;
+ }}
+
+ .type-badge.narration {{
+ background: {colors['narration_color']};
+ color: white;
+ }}
+
+ .type-badge.system {{
+ background: {colors['system_color']};
+ color: white;
+ }}
+
+ .speaker {{
+ font-weight: 700;
+ font-size: 1.1em;
+ color: {colors['text_color']};
+ }}
+
+ .content {{
+ margin-top: 8px;
+ line-height: 1.8;
+ font-size: 1em;
+ }}
+
+ .metadata {{
+ display: flex;
+ gap: 15px;
+ margin-top: 10px;
+ padding-top: 10px;
+ border-top: 1px solid {colors['border_color']};
+ font-size: 0.85em;
+ opacity: 0.7;
+ flex-wrap: wrap;
+ }}
+
+ .metadata-item {{
+ display: flex;
+ align-items: center;
+ gap: 5px;
+ }}
+
+ .tags {{
+ display: flex;
+ gap: 5px;
+ flex-wrap: wrap;
+ }}
+
+ .tag {{
+ background: {colors['code_bg']};
+ padding: 2px 8px;
+ border-radius: 3px;
+ font-size: 0.85em;
+ border: 1px solid {colors['border_color']};
+ }}
+
+ code {{
+ background: {colors['code_bg']};
+ padding: 2px 6px;
+ border-radius: 3px;
+ font-family: 'Courier New', monospace;
+ font-size: 0.9em;
+ }}
+
+ .dice-result {{
+ display: inline-block;
+ background: {colors['dice_color']};
+ color: white;
+ padding: 3px 10px;
+ border-radius: 5px;
+ font-weight: bold;
+ margin-left: 5px;
+ }}
+
+ footer {{
+ margin-top: 40px;
+ padding: 20px;
+ text-align: center;
+ opacity: 0.6;
+ font-size: 0.9em;
+ }}
+
+ @media (max-width: 768px) {{
+ body {{
+ padding: 10px;
+ }}
+
+ h1 {{
+ font-size: 1.8em;
+ }}
+
+ .token {{
+ padding: 10px;
+ }}
+ }}
+ """
+
+ if self.custom_css:
+ css += "\n" + self.custom_css
+
+ return css
+
+ def render(self, data: List[Dict[str, Any]]) -> str:
+ if data and not isinstance(data[0], dict):
+ data = [{"type": "text", "content": str(item)} for item in data]
+
+ stats = self._calculate_stats(data)
+
+ html = f"""<!DOCTYPE html>
+<html lang="zh-CN">
+<head>
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>{self.title}</title>
+ <style>{self._get_css()}</style>
+</head>
+<body>
+ <header>
+ <h1>🎲 {self.title}</h1>
+ <div class="subtitle">完整的游戏日志与数据分析</div>
+ <div class="stats">
+ <div class="stat-item">
+ <span class="stat-label">总条目:</span>
+ <span class="stat-value">{stats['total']}</span>
+ </div>
+ <div class="stat-item">
+ <span class="stat-label">对话:</span>
+ <span class="stat-value">{stats['dialogue']}</span>
+ </div>
+ <div class="stat-item">
+ <span class="stat-label">骰子:</span>
+ <span class="stat-value">{stats['dice']}</span>
+ </div>
+ <div class="stat-item">
+ <span class="stat-label">旁白:</span>
+ <span class="stat-value">{stats['narration']}</span>
+ </div>
+ <div class="stat-item">
+ <span class="stat-label">系统:</span>
+ <span class="stat-value">{stats['system']}</span>
+ </div>
+ </div>
+ </header>
+
+ <div class="content-wrapper">
+"""
+
+ for item in data:
+ html += self._render_token(item)
+
+ html += """ </div>
+
+ <footer>
+ <p>Generated by ConventionalRP HTML Renderer</p>
+ <p>Theme: """ + self.theme.capitalize() + """</p>
+ </footer>
+</body>
+</html>"""
+
+ return html
+
+ def _render_token(self, token: Dict[str, Any]) -> str:
+ token_type = token.get("type", "unknown")
+ speaker = token.get("speaker", "")
+ content = str(token.get("content", ""))
+
+ content = (content
+ .replace("&", "&amp;")
+ .replace("<", "&lt;")
+ .replace(">", "&gt;")
+ .replace('"', "&quot;"))
+
+ if token_type == "dice" and "result" in token:
+ content += f' <span class="dice-result">{token["result"]}</span>'
+
+ html = f' <div class="token {token_type}">\n'
+ html += ' <div class="token-header">\n'
+ html += f' <span class="type-badge {token_type}">{token_type}</span>\n'
+
+ if speaker:
+ html += f' <span class="speaker">{speaker}</span>\n'
+
+ html += ' </div>\n'
+ html += f' <div class="content">{content}</div>\n'
+
+ metadata_items = []
+
+ if "timestamp" in token:
+ metadata_items.append(f'⏰ {token["timestamp"]}')
+
+ if "tags" in token and token["tags"]:
+ tags_html = '<div class="tags">'
+ for tag in token["tags"]:
+ tags_html += f'<span class="tag">{tag}</span>'
+ tags_html += '</div>'
+ metadata_items.append(tags_html)
+
+ if "combat_data" in token:
+ combat = token["combat_data"]
+ if combat.get("type") == "damage":
+ metadata_items.append(f'⚔️ 伤害: {combat["amount"]}')
+ elif combat.get("type") == "healing":
+ metadata_items.append(f'💚 治疗: {combat["amount"]}')
+
+ if metadata_items:
+ html += ' <div class="metadata">\n'
+ for item in metadata_items:
+ html += f' <div class="metadata-item">{item}</div>\n'
+ html += ' </div>\n'
+
+ html += ' </div>\n'
+
+ return html
+
+ def _calculate_stats(self, data: List[Dict[str, Any]]) -> Dict[str, int]:
+ stats = {
+ "total": len(data),
+ "dialogue": 0,
+ "dice": 0,
+ "narration": 0,
+ "system": 0,
+ }
+
+ for item in data:
+ token_type = item.get("type", "unknown")
+ if token_type in stats:
+ stats[token_type] += 1
+
+ return stats
+
def set_style(self, style):
- # Implement style setting if needed
- pass
+ """设置样式(向后兼容)"""
+ if style in self.THEMES:
+ self.theme = style
+
diff --git a/src/conventionalrp/renderers/json_renderer.py b/src/conventionalrp/renderers/json_renderer.py
index 8dcdd6d..42d5fd4 100644
--- a/src/conventionalrp/renderers/json_renderer.py
+++ b/src/conventionalrp/renderers/json_renderer.py
@@ -1,11 +1,78 @@
+import json
+from typing import Any, Dict, List
from .base import BaseRenderer
class JSONRenderer(BaseRenderer):
- def render(self, data):
- import json
-
- return json.dumps(data, ensure_ascii=False, indent=4)
+ def __init__(self, pretty: bool = True, indent: int = 2, sort_keys: bool = False):
+ super().__init__()
+ self.pretty = pretty
+ self.indent = indent if pretty else None
+ self.sort_keys = sort_keys
+ self.style = {}
+
+ def render(self, data: Any) -> str:
+ if self.pretty and isinstance(data, list):
+ output = {
+ "metadata": {
+ "total_entries": len(data),
+ "renderer": "ConventionalRP JSONRenderer",
+ "version": "1.0.0",
+ },
+ "statistics": self._calculate_stats(data),
+ "data": data,
+ }
+ else:
+ output = data
+
+ return json.dumps(
+ output,
+ ensure_ascii=False,
+ indent=self.indent,
+ sort_keys=self.sort_keys,
+ )
+
+ def _calculate_stats(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
+ stats = {
+ "dialogue": 0,
+ "dice": 0,
+ "narration": 0,
+ "system": 0,
+ "success": 0,
+ "failure": 0,
+ "other": 0,
+ }
+
+ speakers = set()
+
+ for item in data:
+ if not isinstance(item, dict):
+ continue
+
+ token_type = item.get("type", "unknown")
+ if token_type in stats:
+ stats[token_type] += 1
+ else:
+ stats["other"] += 1
+
+ if "speaker" in item and item["speaker"]:
+ speakers.add(item["speaker"])
+
+ stats["unique_speakers"] = len(speakers)
+ stats["speakers"] = sorted(list(speakers))
+
+ return stats
def set_style(self, style):
- self.style = style # Placeholder for potential styling options in the future
+ self.style = style
+
+ # 从 style 中提取设置
+ if isinstance(style, dict):
+ if "pretty" in style:
+ self.pretty = style["pretty"]
+ self.indent = 2 if self.pretty else None
+ if "indent" in style:
+ self.indent = style["indent"]
+ if "sort_keys" in style:
+ self.sort_keys = style["sort_keys"]
+
diff --git a/src/conventionalrp/renderers/markdown_renderer.py b/src/conventionalrp/renderers/markdown_renderer.py
index 9df59a2..7080913 100644
--- a/src/conventionalrp/renderers/markdown_renderer.py
+++ b/src/conventionalrp/renderers/markdown_renderer.py
@@ -1,18 +1,19 @@
+"""
+Markdown 渲染器
+"""
+
+from typing import Any, Dict, List, Union
from .base import BaseRenderer
-from typing import List, Dict, Any, Union
class MarkdownRenderer(BaseRenderer):
+ def __init__(self, enable_syntax_hints: bool = True, enable_emoji: bool = True):
+ super().__init__()
+ self.enable_syntax_hints = enable_syntax_hints
+ self.enable_emoji = enable_emoji
+ self.style = {}
+
def render(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> str:
- """
- Renders the given data in Markdown format.
-
- Args:
- data: The data to render (can be list or dict).
-
- Returns:
- str: The rendered Markdown string.
- """
if isinstance(data, list):
return self._render_list(data)
elif isinstance(data, dict):
@@ -21,40 +22,141 @@ class MarkdownRenderer(BaseRenderer):
return str(data)
def _render_list(self, data: List[Dict[str, Any]]) -> str:
- """渲染列表数据为 Markdown"""
- markdown_output = "# TRPG Log\n\n"
-
- for i, entry in enumerate(data, 1):
- if entry.get("type") == "metadata":
- markdown_output += f"## Entry {i}\n\n"
- markdown_output += f"**Timestamp**: {entry.get('timestamp', 'N/A')} \n"
- markdown_output += f"**Speaker**: {entry.get('speaker', 'N/A')} \n\n"
-
- content_items = entry.get("content", [])
- if content_items:
- markdown_output += "**Content**:\n\n"
- for content in content_items:
- content_type = content.get("type", "unknown")
- content_text = content.get("content", "")
- markdown_output += f"- [{content_type}] {content_text}\n"
- markdown_output += "\n"
- else:
- markdown_output += f"- {entry}\n"
+ if data and not isinstance(data[0], dict):
+ data = [{"type": "text", "content": str(item)} for item in data]
- return markdown_output
+ stats = self._calculate_stats(data)
+
+ emoji_prefix = "🎲 " if self.enable_emoji else ""
+ md = f"# {emoji_prefix}\n\n"
+
+ md += "## Statistics\n\n"
+ md += "| Type | Count |\n"
+ md += "|------|------|\n"
+ md += f"| Total | {stats['total']} |\n"
+ md += f"| Dialogue | {stats['dialogue']} |\n"
+ md += f"| Dice | {stats['dice']} |\n"
+ md += f"| Narration | {stats['narration']} |\n"
+ md += f"| System | {stats['system']} |\n"
+ md += f"| Metadata | {stats['metadata']} |\n\n"
+
+ md += "---\n\n"
+ md += "## Detailed Log\n\n"
+
+ # Render each token
+ for i, item in enumerate(data, 1):
+ md += self._render_token(item, i)
+ md += "\n"
+
+ md += "---\n\n"
+ md += "_Generated by ConventionalRP Markdown Renderer_\n"
+
+ return md
+
+ def _render_token(self, token: Dict[str, Any], index: int) -> str:
+ token_type = token.get("type", "unknown")
+
+ # 处理元数据类型(向后兼容)
+ if token_type == "metadata":
+ return self._render_metadata_token(token, index)
+
+ speaker = token.get("speaker", "")
+ content = str(token.get("content", ""))
+
+ type_emojis = {
+ "dialogue": "💬",
+ "dice": "🎲",
+ "narration": "📖",
+ "system": "⚙️",
+ "success": "✅",
+ "failure": "❌",
+ "text": "📄",
+ "unknown": "❓",
+ }
+
+ emoji = type_emojis.get(token_type, "•") if self.enable_emoji else "-"
+
+ md = f"### {emoji} [{index}] {token_type.upper()}\n\n"
+
+ if speaker:
+ md += f"**说话者:** {speaker}\n\n"
+
+ if token_type == "dice" and self.enable_syntax_hints:
+ md += f"```dice\n{content}\n```\n\n"
+
+ if "result" in token:
+ md += f"**结果:** `{token['result']}`\n\n"
+ else:
+ md += f"{content}\n\n"
+
+ metadata_lines = []
+
+ if "timestamp" in token:
+ metadata_lines.append(f"Timestamp: `{token['timestamp']}`")
+
+ if "tags" in token and token["tags"]:
+ tags_str = " ".join([f"`{tag}`" for tag in token["tags"]])
+ metadata_lines.append(f"Tags: {tags_str}")
+
+ if "combat_data" in token:
+ combat = token["combat_data"]
+ if combat.get("type") == "damage":
+ metadata_lines.append(f"Damage: **{combat['amount']}** (Total: {combat.get('total_damage', '?')})")
+ elif combat.get("type") == "healing":
+ metadata_lines.append(f"Healing: **{combat['amount']}** (Total: {combat.get('total_healing', '?')})")
+
+ if metadata_lines:
+ md += "> " + "\n> ".join(metadata_lines) + "\n\n"
+
+ return md
+
+ def _render_metadata_token(self, token: Dict[str, Any], index: int) -> str:
+ """渲染元数据类型的 token(向后兼容)"""
+ md = f"### Entry {index}\n\n"
+ md += f"**Timestamp**: {token.get('timestamp', 'N/A')} \n"
+ md += f"**Speaker**: {token.get('speaker', 'N/A')} \n\n"
+
+ content_items = token.get("content", [])
+ if content_items:
+ md += "**Content**:\n\n"
+ for content in content_items:
+ content_type = content.get("type", "unknown")
+ content_text = content.get("content", "")
+ md += f"- [{content_type}] {content_text}\n"
+ md += "\n"
+
+ return md
def _render_dict(self, data: Dict[str, Any]) -> str:
- """渲染字典数据为 Markdown"""
markdown_output = ""
for key, value in data.items():
markdown_output += f"## {key}\n\n{value}\n\n"
return markdown_output
+
+ def _calculate_stats(self, data: List[Dict[str, Any]]) -> Dict[str, int]:
+ stats = {
+ "total": len(data),
+ "dialogue": 0,
+ "dice": 0,
+ "narration": 0,
+ "system": 0,
+ "metadata": 0,
+ }
+
+ for item in data:
+ token_type = item.get("type", "unknown")
+ if token_type in stats:
+ stats[token_type] += 1
+
+ return stats
def set_style(self, style):
- """
- Sets the style for the Markdown renderer.
+ self.style = style
+
+ # 从 style 中提取设置
+ if isinstance(style, dict):
+ if "emoji" in style:
+ self.enable_emoji = style["emoji"]
+ if "syntax_hints" in style:
+ self.enable_syntax_hints = style["syntax_hints"]
- Args:
- style (dict): A dictionary of style options.
- """
- self.style = style # Currently, Markdown does not support styling, but this can be extended.
diff --git a/src/conventionalrp/tokenizer.py b/src/conventionalrp/tokenizer.py
deleted file mode 100644
index e69de29..0000000
--- a/src/conventionalrp/tokenizer.py
+++ /dev/null
diff --git a/src/conventionalrp/utils/__init__.py b/src/conventionalrp/utils/__init__.py
index 53b7600..dadb3cd 100644
--- a/src/conventionalrp/utils/__init__.py
+++ b/src/conventionalrp/utils/__init__.py
@@ -1 +1,32 @@
-"""This file initializes the utils module."""
+from .exceptions import (
+ ConventionalRPError,
+ ParserError,
+ RuleError,
+ ProcessorError,
+ ValidationError,
+ ConfigurationError,
+ safe_execute,
+ format_error,
+ validate_not_none,
+ validate_type,
+ validate_not_empty,
+)
+from .logging_config import setup_logging, get_logger, LogContext
+
+__all__ = [
+ "ConventionalRPError",
+ "ParserError",
+ "RuleError",
+ "ProcessorError",
+ "ValidationError",
+ "ConfigurationError",
+ "safe_execute",
+ "format_error",
+ "validate_not_none",
+ "validate_type",
+ "validate_not_empty",
+ "setup_logging",
+ "get_logger",
+ "LogContext",
+]
+
diff --git a/src/conventionalrp/utils/exceptions.py b/src/conventionalrp/utils/exceptions.py
new file mode 100644
index 0000000..28fa179
--- /dev/null
+++ b/src/conventionalrp/utils/exceptions.py
@@ -0,0 +1,104 @@
+from typing import Optional, Any, Dict
+import traceback
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class ConventionalRPError(Exception):
+ """基础异常类"""
+
+ def __init__(
+ self,
+ message: str,
+ details: Optional[Dict[str, Any]] = None,
+ cause: Optional[Exception] = None
+ ):
+ super().__init__(message)
+ self.message = message
+ self.details = details or {}
+ self.cause = cause
+
+ def __str__(self) -> str:
+ result = self.message
+ if self.details:
+ details_str = ", ".join(f"{k}={v}" for k, v in self.details.items())
+ result += f" ({details_str})"
+ if self.cause:
+ result += f"\nCaused by: {str(self.cause)}"
+ return result
+
+
+class ParserError(ConventionalRPError):
+ """解析错误"""
+ pass
+
+
+class RuleError(ConventionalRPError):
+ """规则相关错误"""
+ pass
+
+
+class ProcessorError(ConventionalRPError):
+ """处理器错误"""
+ pass
+
+
+class ValidationError(ConventionalRPError):
+ """验证错误"""
+ pass
+
+
+class ConfigurationError(ConventionalRPError):
+ """配置错误"""
+ pass
+
+
+def safe_execute(func, *args, default=None, error_msg="Operation failed", **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ logger.error(f"{error_msg}: {e}")
+ return default
+
+
+def format_error(error: Exception, include_traceback: bool = False) -> str:
+ error_type = type(error).__name__
+ error_msg = str(error)
+
+ result = f"[{error_type}] {error_msg}"
+
+ if include_traceback:
+ tb = traceback.format_exc()
+ result += f"\n\nTraceback:\n{tb}"
+
+ return result
+
+
+def validate_not_none(value: Any, name: str):
+ if value is None:
+ raise ValidationError(
+ f"{name} cannot be None",
+ details={"parameter": name}
+ )
+
+
+def validate_type(value: Any, expected_type: type, name: str):
+ if not isinstance(value, expected_type):
+ raise ValidationError(
+ f"{name} must be of type {expected_type.__name__}, "
+ f"got {type(value).__name__}",
+ details={
+ "parameter": name,
+ "expected_type": expected_type.__name__,
+ "actual_type": type(value).__name__
+ }
+ )
+
+
+def validate_not_empty(value: Any, name: str):
+ if not value:
+ raise ValidationError(
+ f"{name} cannot be empty",
+ details={"parameter": name, "value": value}
+ )
diff --git a/src/conventionalrp/utils/logging_config.py b/src/conventionalrp/utils/logging_config.py
new file mode 100644
index 0000000..956d6d4
--- /dev/null
+++ b/src/conventionalrp/utils/logging_config.py
@@ -0,0 +1,77 @@
+import logging
+import sys
+from pathlib import Path
+from typing import Optional
+from datetime import datetime
+
+
+def setup_logging(
+ level: str = "INFO",
+ log_file: Optional[str] = None,
+ format_string: Optional[str] = None,
+ include_timestamp: bool = True,
+ include_module: bool = True
+) -> logging.Logger:
+ log_level = getattr(logging, level.upper(), logging.INFO)
+
+ if format_string is None:
+ parts = []
+ if include_timestamp:
+ parts.append("%(asctime)s")
+ parts.append("[%(levelname)s]")
+ if include_module:
+ parts.append("%(name)s")
+ parts.append("%(message)s")
+ format_string = " - ".join(parts)
+
+ logging.basicConfig(
+ level=log_level,
+ format=format_string,
+ datefmt="%Y-%m-%d %H:%M:%S",
+ handlers=[]
+ )
+
+ logger = logging.getLogger("conventionalrp")
+ logger.setLevel(log_level)
+
+ logger.handlers.clear()
+
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_handler.setLevel(log_level)
+ console_formatter = logging.Formatter(format_string, datefmt="%Y-%m-%d %H:%M:%S")
+ console_handler.setFormatter(console_formatter)
+ logger.addHandler(console_handler)
+
+ if log_file:
+ log_path = Path(log_file)
+ log_path.parent.mkdir(parents=True, exist_ok=True)
+
+ file_handler = logging.FileHandler(log_file, encoding="utf-8")
+ file_handler.setLevel(log_level)
+ file_handler.setFormatter(console_formatter)
+ logger.addHandler(file_handler)
+
+ logger.info(f"Logging to file: {log_file}")
+
+ logger.info(f"Logging configured at {level} level")
+ return logger
+
+
+def get_logger(name: str) -> logging.Logger:
+ return logging.getLogger(f"conventionalrp.{name}")
+
+
+class LogContext:
+ def __init__(self, logger: logging.Logger, level: str):
+ self.logger = logger
+ self.level = getattr(logging, level.upper())
+ self.original_level = None
+
+ def __enter__(self):
+ self.original_level = self.logger.level
+ self.logger.setLevel(self.level)
+ return self.logger
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.logger.setLevel(self.original_level)
+ return False
diff --git a/src/conventionalrp/utils/text_processing.py b/src/conventionalrp/utils/text_processing.py
deleted file mode 100644
index e69de29..0000000
--- a/src/conventionalrp/utils/text_processing.py
+++ /dev/null