From cbc653ffd0ea9abf4360623dc7a7651e1a49cc61 Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Sat, 25 Oct 2025 00:30:48 +0800 Subject: feat: Implement plugin system with combat tracker and dice analyzer - Added `plugin_system_demo.py` to demonstrate basic plugin usage, processing, and analysis. - Created `CombatTrackerPlugin` for tracking combat statistics including damage and healing. - Developed `DiceAnalyzerPlugin` for analyzing dice rolls and calculating success rates. - Introduced `renderer_demo.py` for rendering output in HTML, Markdown, and JSON formats. - Implemented `rule_system_demo.py` to showcase rule engine capabilities with various examples. - Established core rule engine functionality in `rules.py` with support for conditions and actions. - Enhanced base plugin structure in `base.py` to support different plugin types (Processor, Renderer, Analyzer). - Added custom exception handling in `exceptions.py` for better error management. - Configured logging setup in `logging_config.py` for improved logging capabilities. - Created unit tests in `test_rust_core.py` to validate core functionalities and performance. --- src/conventionalrp/__init__.py | 36 +- src/conventionalrp/__main__.py | 9 - src/conventionalrp/_core.pyi | 238 +++++++++++- src/conventionalrp/base.py | 0 src/conventionalrp/core/__init__.py | 9 +- src/conventionalrp/core/processor.py | 150 +++++--- src/conventionalrp/core/rules.py | 238 ++++++++++++ src/conventionalrp/extractors/__init__.py | 3 - src/conventionalrp/extractors/rule_extractor.py | 25 -- src/conventionalrp/html_renderer.py | 0 src/conventionalrp/json_renderer.py | 0 src/conventionalrp/markdown_renderer.py | 0 src/conventionalrp/plugins/__init__.py | 20 +- src/conventionalrp/plugins/base.py | 103 +++++ src/conventionalrp/plugins/plugin_manager.py | 209 ++++++++++- src/conventionalrp/renderers/__init__.py | 3 - src/conventionalrp/renderers/html_renderer.py | 433 +++++++++++++++++++++- src/conventionalrp/renderers/json_renderer.py | 77 +++- src/conventionalrp/renderers/markdown_renderer.py | 176 +++++++-- src/conventionalrp/tokenizer.py | 0 src/conventionalrp/utils/__init__.py | 33 +- src/conventionalrp/utils/exceptions.py | 104 ++++++ src/conventionalrp/utils/logging_config.py | 77 ++++ src/conventionalrp/utils/text_processing.py | 0 src/lib.rs | 252 ++++++++++++- 25 files changed, 2009 insertions(+), 186 deletions(-) delete mode 100644 src/conventionalrp/base.py create mode 100644 src/conventionalrp/core/rules.py delete mode 100644 src/conventionalrp/html_renderer.py delete mode 100644 src/conventionalrp/json_renderer.py delete mode 100644 src/conventionalrp/markdown_renderer.py create mode 100644 src/conventionalrp/plugins/base.py delete mode 100644 src/conventionalrp/tokenizer.py create mode 100644 src/conventionalrp/utils/exceptions.py create mode 100644 src/conventionalrp/utils/logging_config.py delete mode 100644 src/conventionalrp/utils/text_processing.py (limited to 'src') diff --git a/src/conventionalrp/__init__.py b/src/conventionalrp/__init__.py index 06dbd63..ab4b17d 100644 --- a/src/conventionalrp/__init__.py +++ b/src/conventionalrp/__init__.py @@ -1,15 +1,45 @@ +""" +Conventional Role Play SDK (ConventionalRP) +""" + import sys from importlib.metadata import version from . import _core +from .core import Parser, Processor, Rule, RuleEngine +from .utils import ( + setup_logging, + get_logger, + ConventionalRPError, + ParserError, + RuleError, + ProcessorError, + ValidationError, + ConfigurationError, +) -__all__ = ["_core", "__version__"] +__all__ = [ + "Parser", + "Processor", + "Rule", + "RuleEngine", + "setup_logging", + "get_logger", + "ConventionalRPError", + "ParserError", + "RuleError", + "ProcessorError", + "ValidationError", + "ConfigurationError", + "__version__", +] if sys.version_info >= (3, 8): - # For Python 3.8+ __version__ = version("conventionalrp") elif sys.version_info < (3, 8): from pkg_resources import get_distribution - # For Python < 3.8 __version__ = get_distribution("conventionalrp").version + +_default_logger = setup_logging(level="INFO") + diff --git a/src/conventionalrp/__main__.py b/src/conventionalrp/__main__.py index fa4f45e..e69de29 100644 --- a/src/conventionalrp/__main__.py +++ b/src/conventionalrp/__main__.py @@ -1,9 +0,0 @@ -from ._core import sum_as_string - - -def main(): - print(sum_as_string(1, 2)) - - -if __name__ == "__main__": - main() diff --git a/src/conventionalrp/_core.pyi b/src/conventionalrp/_core.pyi index 0805887..6f87269 100644 --- a/src/conventionalrp/_core.pyi +++ b/src/conventionalrp/_core.pyi @@ -1,3 +1,237 @@ -def sum_as_string(a: int, b: int) -> str: ... +""" +提供高性能的文本解析和匹配功能 +""" -class Base: ... +from typing import Dict, List, Optional, Tuple + +class Base: + """基础类(向后兼容)""" + ... + +class Token: + """ + 解析后的 Token + + 表示文本中的一个语义单元,包含类型、内容和元数据 + """ + + token_type: str + content: str + metadata: Dict[str, str] + + def __init__(self, token_type: str, content: str) -> None: + """ + 创建新的 Token + + Args: + token_type: Token 类型 + content: Token 内容 + """ + ... + + def add_metadata(self, key: str, value: str) -> None: + """ + 添加元数据 + + Args: + key: 元数据键 + value: 元数据值 + """ + ... + + def get_metadata(self, key: str) -> Optional[str]: + """ + 获取元数据 + + Args: + key: 元数据键 + + Returns: + 元数据值,如果不存在返回 None + """ + ... + + def to_dict(self) -> Dict[str, any]: + """ + 转换为 Python 字典 + + Returns: + 包含 Token 所有信息的字典 + """ + ... + + def __repr__(self) -> str: ... + +class RegexRule: + """ + 正则表达式规则 + + 用于文本匹配和提取 + """ + + rule_type: str + priority: int + + def __init__(self, pattern: str, rule_type: str, priority: int) -> None: + """ + 创建正则规则 + + Args: + pattern: 正则表达式模式 + rule_type: 规则类型 + priority: 优先级(数字越大优先级越高) + + Raises: + ValueError: 如果正则表达式无效 + """ + ... + + def matches(self, text: str) -> bool: + """ + 测试文本是否匹配 + + Args: + text: 待测试的文本 + + Returns: + 是否匹配 + """ + ... + + def extract(self, text: str) -> Optional[List[str]]: + """ + 提取匹配的捕获组 + + Args: + text: 待提取的文本 + + Returns: + 捕获组列表,如果不匹配返回 None + """ + ... + + def find_all(self, text: str) -> List[Tuple[int, int, str]]: + """ + 查找所有匹配 + + Args: + text: 待搜索的文本 + + Returns: + 列表,每个元素为 (start, end, matched_text) + """ + ... + +class TextParser: + """ + 高性能文本解析器 + + 支持多规则、优先级排序的文本解析 + """ + + def __init__(self) -> None: + """创建新的解析器""" + ... + + def add_rule(self, pattern: str, rule_type: str, priority: int) -> None: + """ + 添加解析规则 + + Args: + pattern: 正则表达式模式 + rule_type: 规则类型 + priority: 优先级 + + Raises: + ValueError: 如果正则表达式无效 + """ + ... + + def parse_line(self, text: str) -> List[Tuple[str, str, int, int]]: + """ + 解析单行文本 + + Args: + text: 待解析的文本 + + Returns: + 列表,每个元素为 (type, content, start, end) + """ + ... + + def parse_lines(self, lines: List[str]) -> List[List[Dict[str, any]]]: + """ + 批量解析多行文本 + + Args: + lines: 文本行列表 + + Returns: + 每行的解析结果列表 + """ + ... + + def clear_rules(self) -> None: + """清除所有规则""" + ... + + def rule_count(self) -> int: + """ + 获取规则数量 + + Returns: + 当前规则数量 + """ + ... + +class FastMatcher: + """ + 快速字符串匹配器 + + 用于高效的多模式字符串匹配 + """ + + def __init__(self, patterns: List[str]) -> None: + """ + 创建匹配器 + + Args: + patterns: 模式列表 + """ + ... + + def contains_any(self, text: str) -> bool: + """ + 检查文本是否包含任意模式 + + Args: + text: 待检查的文本 + + Returns: + 是否包含任意模式 + """ + ... + + def find_matches(self, text: str) -> List[str]: + """ + 查找所有匹配的模式 + + Args: + text: 待搜索的文本 + + Returns: + 匹配的模式列表 + """ + ... + + def count_matches(self, text: str) -> Dict[str, int]: + """ + 统计每个模式的出现次数 + + Args: + text: 待统计的文本 + + Returns: + 字典,键为模式,值为出现次数 + """ + ... diff --git a/src/conventionalrp/base.py b/src/conventionalrp/base.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/core/__init__.py b/src/conventionalrp/core/__init__.py index 91d0f8f..08829b8 100644 --- a/src/conventionalrp/core/__init__.py +++ b/src/conventionalrp/core/__init__.py @@ -1,3 +1,6 @@ -""" -This file initializes the core module of the conventionalrp SDK. -""" +from .parser import Parser +from .processor import Processor +from .rules import Rule, RuleEngine + +__all__ = ["Parser", "Processor", "Rule", "RuleEngine"] + diff --git a/src/conventionalrp/core/processor.py b/src/conventionalrp/core/processor.py index bc74ffb..12ca32b 100644 --- a/src/conventionalrp/core/processor.py +++ b/src/conventionalrp/core/processor.py @@ -1,68 +1,104 @@ -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Callable +import logging +from .rules import RuleEngine, Rule + +logger = logging.getLogger(__name__) class Processor: - """处理器,用于处理解析后的token""" - def __init__(self, rules: Optional[Dict[str, Any]] = None): - """ - 初始化处理器 - - Args: - rules: 处理规则(可选) - """ self.rules = rules or {} - - def process_tokens(self, tokens: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - 处理token列表 + self.rule_engine = RuleEngine() + self.custom_processors: List[Callable] = [] + + self._load_rules_to_engine() - Args: - tokens: 解析后的token列表 + logger.info("Processor initialized with %d rules", + self.rule_engine.rule_count()) + + def _load_rules_to_engine(self): + if not isinstance(self.rules, dict): + return + + rules_list = self.rules.get("rules", []) + for rule_dict in rules_list: + if not isinstance(rule_dict, dict): + continue - Returns: - 处理后的数据列表 - """ + try: + self.rule_engine.add_rule_dict( + name=rule_dict.get("name", "unnamed"), + condition=rule_dict.get("condition", {}), + action=rule_dict.get("action", {}), + priority=rule_dict.get("priority", 50) + ) + except Exception as e: + logger.warning("Failed to load rule: %s", e) + + def add_rule(self, rule: Rule): + self.rule_engine.add_rule(rule) + logger.debug("Added rule: %s", rule.name) + + def add_processor(self, processor: Callable[[Dict[str, Any]], Dict[str, Any]]): + self.custom_processors.append(processor) + logger.debug("Added custom processor") + + def process_tokens( + self, + tokens: List[Dict[str, Any]], + apply_all_rules: bool = False + ) -> List[Dict[str, Any]]: + if not tokens: + logger.warning("Empty token list provided") + return [] + + logger.info("Processing %d tokens", len(tokens)) processed_data = [] - for token in tokens: - processed_token = self.apply_rules(token) - processed_data.append(processed_token) - return processed_data - - def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]: - """ - 对单个token应用规则 - Args: - token: 单个token - - Returns: - 处理后的token - """ - # 基础实现:直接返回token - # 可以在此添加更多处理逻辑 + for i, token in enumerate(tokens): + try: + processed_token = self.process_single_token(token, apply_all_rules) + processed_data.append(processed_token) + except Exception as e: + logger.error("Error processing token %d: %s", i, e) + # 发生错误时保留原始 token + processed_data.append(token) + + logger.info("Successfully processed %d tokens", len(processed_data)) + return processed_data + + def process_single_token( + self, + token: Dict[str, Any], + apply_all_rules: bool = False + ) -> Dict[str, Any]: processed = token.copy() - # 添加处理时间戳 + if self.rule_engine.rule_count() > 0: + processed = self.rule_engine.process(processed, apply_all_rules) + + for processor in self.custom_processors: + try: + processed = processor(processed) + except Exception as e: + logger.error("Custom processor failed: %s", e) + if "timestamp" in processed: processed["processed"] = True return processed - - def generate_output(self, processed_data: List[Dict[str, Any]], format_type: str) -> str: - """ - 生成指定格式的输出 + + def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]: + return self.process_single_token(token) + + def generate_output( + self, + processed_data: List[Dict[str, Any]], + format_type: str + ) -> str: + logger.info("Generating %s output for %d items", + format_type, len(processed_data)) - Args: - processed_data: 处理后的数据 - format_type: 输出格式 (json/html/markdown) - - Returns: - 格式化后的字符串 - - Raises: - ValueError: 不支持的格式类型 - """ if format_type == "json": return self.generate_json_output(processed_data) elif format_type == "html": @@ -71,20 +107,24 @@ class Processor: return self.generate_markdown_output(processed_data) else: raise ValueError(f"Unsupported format type: {format_type}") - + def generate_json_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成JSON格式输出""" import json return json.dumps(processed_data, ensure_ascii=False, indent=2) - + def generate_html_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成HTML格式输出""" return ( "" + "".join(f"

{data}

" for data in processed_data) + "" ) - + def generate_markdown_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成Markdown格式输出""" return "\n".join(f"- {data}" for data in processed_data) + + def get_statistics(self) -> Dict[str, Any]: + return { + "rule_count": self.rule_engine.rule_count(), + "custom_processor_count": len(self.custom_processors), + "has_rules_config": bool(self.rules), + } diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py new file mode 100644 index 0000000..f198d4e --- /dev/null +++ b/src/conventionalrp/core/rules.py @@ -0,0 +1,238 @@ +from typing import Dict, Any, Callable, List, Optional +from enum import Enum +import re + + +class RuleCondition(Enum): + """规则条件类型""" + EQUALS = "equals" + CONTAINS = "contains" + MATCHES = "matches" + STARTS_WITH = "starts_with" + ENDS_WITH = "ends_with" + IN_LIST = "in_list" + GREATER_THAN = "greater_than" + LESS_THAN = "less_than" + + +class Rule: + def __init__( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + self.name = name + self.condition = condition + self.action = action + self.priority = priority + self._compiled_patterns = {} + + self._precompile_patterns() + + def _precompile_patterns(self): + """预编译正则表达式以提高性能""" + if isinstance(self.condition, dict): + for key, value in self.condition.items(): + if isinstance(value, dict) and value.get("type") == "matches": + pattern = value.get("pattern") + if pattern: + self._compiled_patterns[key] = re.compile(pattern) + + def matches(self, data: Dict[str, Any]) -> bool: + """ + 检查数据是否匹配规则条件 + """ + if not isinstance(self.condition, dict): + return False + + for field, condition_spec in self.condition.items(): + if not self._check_field_condition(data, field, condition_spec): + return False + + return True + + def _check_field_condition( + self, + data: Dict[str, Any], + field: str, + condition: Any + ) -> bool: + """检查单个字段的条件""" + value = data.get(field) + + if not isinstance(condition, dict): + return value == condition + + condition_type = condition.get("type") + expected_value = condition.get("value") + + if condition_type == "equals": + return value == expected_value + elif condition_type == "contains": + return expected_value in str(value) if value else False + elif condition_type == "matches": + if field in self._compiled_patterns: + pattern = self._compiled_patterns[field] + return bool(pattern.search(str(value))) if value else False + return False + elif condition_type == "starts_with": + return str(value).startswith(expected_value) if value else False + elif condition_type == "ends_with": + return str(value).endswith(expected_value) if value else False + elif condition_type == "in_list": + return value in expected_value if isinstance(expected_value, list) else False + elif condition_type == "greater_than": + try: + return float(value) > float(expected_value) + except (ValueError, TypeError): + return False + elif condition_type == "less_than": + try: + return float(value) < float(expected_value) + except (ValueError, TypeError): + return False + + return False + + def apply(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + 对匹配的数据应用规则动作 + """ + result = data.copy() + + if not isinstance(self.action, dict): + return result + + action_type = self.action.get("type") + + if action_type == "set_field": + field = self.action.get("field") + value = self.action.get("value") + if field: + result[field] = value + + elif action_type == "add_field": + field = self.action.get("field") + value = self.action.get("value") + if field and field not in result: + result[field] = value + + elif action_type == "remove_field": + field = self.action.get("field") + if field and field in result: + del result[field] + + elif action_type == "transform": + field = self.action.get("field") + func_name = self.action.get("function") + if field and field in result and func_name: + result[field] = self._apply_transform(result[field], func_name) + + elif action_type == "add_tag": + tag = self.action.get("tag") + if tag: + if "tags" not in result: + result["tags"] = [] + if tag not in result["tags"]: + result["tags"].append(tag) + + elif action_type == "copy_field": + source = self.action.get("source") + target = self.action.get("target") + if source and target and source in result: + result[target] = result[source] + + return result + + def _apply_transform(self, value: Any, func_name: str) -> Any: + transforms = { + "upper": lambda x: str(x).upper(), + "lower": lambda x: str(x).lower(), + "strip": lambda x: str(x).strip(), + "int": lambda x: int(x), + "float": lambda x: float(x), + "len": lambda x: len(x) if hasattr(x, '__len__') else 0, + } + + func = transforms.get(func_name) + if func: + try: + return func(value) + except Exception: + return value + return value + + def __repr__(self) -> str: + return f"Rule(name={self.name}, priority={self.priority})" + + +class RuleEngine: + """ + 规则引擎 + """ + + def __init__(self): + self.rules: List[Rule] = [] + self._sorted = False + + def add_rule(self, rule: Rule): + self.rules.append(rule) + self._sorted = False + + def add_rule_dict( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + """ + 从字典添加规则 + """ + rule = Rule(name, condition, action, priority) + self.add_rule(rule) + + def _ensure_sorted(self): + """确保规则按优先级排序""" + if not self._sorted: + self.rules.sort(key=lambda r: r.priority, reverse=True) + self._sorted = True + + def process( + self, + data: Dict[str, Any], + apply_all: bool = False + ) -> Dict[str, Any]: + self._ensure_sorted() + result = data.copy() + + for rule in self.rules: + if rule.matches(result): + result = rule.apply(result) + if not apply_all: + break + + return result + + def process_batch( + self, + data_list: List[Dict[str, Any]], + apply_all: bool = False + ) -> List[Dict[str, Any]]: + return [self.process(data, apply_all) for data in data_list] + + def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]: + self._ensure_sorted() + return [rule for rule in self.rules if rule.matches(data)] + + def clear_rules(self): + self.rules.clear() + self._sorted = False + + def rule_count(self) -> int: + return len(self.rules) + + def __repr__(self) -> str: + return f"RuleEngine(rules={len(self.rules)})" diff --git a/src/conventionalrp/extractors/__init__.py b/src/conventionalrp/extractors/__init__.py index e693e03..e69de29 100644 --- a/src/conventionalrp/extractors/__init__.py +++ b/src/conventionalrp/extractors/__init__.py @@ -1,3 +0,0 @@ -""" -This file initializes the extractors module. -""" diff --git a/src/conventionalrp/extractors/rule_extractor.py b/src/conventionalrp/extractors/rule_extractor.py index bfc60c8..12b9e4b 100644 --- a/src/conventionalrp/extractors/rule_extractor.py +++ b/src/conventionalrp/extractors/rule_extractor.py @@ -15,12 +15,6 @@ class RuleExtractor(BaseExtractor): """规则提取器,用于从配置文件加载解析规则""" def __init__(self, config_file: Optional[str] = None): - """ - 初始化规则提取器 - - Args: - config_file: 规则配置文件路径(可选) - """ self.config_file = config_file self.rules: Dict[str, Any] = {} if config_file: @@ -29,16 +23,6 @@ class RuleExtractor(BaseExtractor): def load_rules_from_file(self, config_file: str) -> Dict[str, Any]: """ 从文件加载规则 - - Args: - config_file: 规则配置文件路径 - - Returns: - 解析后的规则字典 - - Raises: - FileNotFoundError: 文件不存在 - ValueError: 文件内容为空或格式错误 """ if not Path(config_file).exists(): raise FileNotFoundError(f"Rule file not found: {config_file}") @@ -56,12 +40,6 @@ class RuleExtractor(BaseExtractor): def load_rules(self, config_file: str) -> Dict[str, Any]: """ 加载规则(兼容旧接口) - - Args: - config_file: 规则配置文件路径 - - Returns: - 解析后的规则字典 """ self.rules = self.load_rules_from_file(config_file) return self.rules @@ -69,8 +47,5 @@ class RuleExtractor(BaseExtractor): def extract(self) -> Dict[str, Any]: """ 提取规则 - - Returns: - 规则字典 """ return self.rules diff --git a/src/conventionalrp/html_renderer.py b/src/conventionalrp/html_renderer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/json_renderer.py b/src/conventionalrp/json_renderer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/markdown_renderer.py b/src/conventionalrp/markdown_renderer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/plugins/__init__.py b/src/conventionalrp/plugins/__init__.py index c7bbee6..e759fd3 100644 --- a/src/conventionalrp/plugins/__init__.py +++ b/src/conventionalrp/plugins/__init__.py @@ -1,3 +1,17 @@ -""" -This file initializes the plugins module. -""" +from .base import ( + Plugin, + ParserPlugin, + ProcessorPlugin, + RendererPlugin, + AnalyzerPlugin, +) +from .plugin_manager import PluginManager + +__all__ = [ + "Plugin", + "ParserPlugin", + "ProcessorPlugin", + "RendererPlugin", + "AnalyzerPlugin", + "PluginManager", +] diff --git a/src/conventionalrp/plugins/base.py b/src/conventionalrp/plugins/base.py new file mode 100644 index 0000000..f9e92a7 --- /dev/null +++ b/src/conventionalrp/plugins/base.py @@ -0,0 +1,103 @@ +from typing import Any, Dict, List, Optional +from abc import ABC, abstractmethod +import logging + +logger = logging.getLogger(__name__) + + +class Plugin(ABC): + """ + 插件基类 + + 所有插件必须继承此类并实现必要的方法 + """ + + def __init__(self, name: str, version: str = "1.0.0"): + self.name = name + self.version = version + self.enabled = True + self.logger = logging.getLogger(f"conventionalrp.plugins.{name}") + + @abstractmethod + def initialize(self, config: Optional[Dict[str, Any]] = None): + pass + + @abstractmethod + def process(self, data: Any) -> Any: + pass + + def on_enable(self): + """插件启用时调用""" + self.logger.info(f"Plugin {self.name} enabled") + + def on_disable(self): + """插件禁用时调用""" + self.logger.info(f"Plugin {self.name} disabled") + + def get_metadata(self) -> Dict[str, Any]: + return { + "name": self.name, + "version": self.version, + "enabled": self.enabled, + "type": self.__class__.__name__, + } + + def __repr__(self) -> str: + return f"Plugin(name={self.name}, version={self.version}, enabled={self.enabled})" + + +class ParserPlugin(Plugin): + def __init__(self, name: str, version: str = "1.0.0"): + super().__init__(name, version) + self.priority = 50 + + @abstractmethod + def can_parse(self, text: str) -> bool: + """ + 判断是否可以解析给定文本 + """ + pass + + @abstractmethod + def parse(self, text: str) -> List[Dict[str, Any]]: + """ + 解析文本 + """ + pass + + +class ProcessorPlugin(Plugin): + @abstractmethod + def process_token(self, token: Dict[str, Any]) -> Dict[str, Any]: + """ + 处理单个 token + """ + pass + + def process(self, data: Any) -> Any: + """ + 处理数据(实现基类方法) + """ + if isinstance(data, dict): + return self.process_token(data) + elif isinstance(data, list): + return [self.process_token(token) for token in data] + return data + + +class RendererPlugin(Plugin): + @abstractmethod + def render(self, data: Any) -> str: + pass + + def process(self, data: Any) -> Any: + return self.render(data) + + +class AnalyzerPlugin(Plugin): + @abstractmethod + def analyze(self, data: Any) -> Dict[str, Any]: + pass + + def process(self, data: Any) -> Any: + return self.analyze(data) diff --git a/src/conventionalrp/plugins/plugin_manager.py b/src/conventionalrp/plugins/plugin_manager.py index 0d49a9c..0230bda 100644 --- a/src/conventionalrp/plugins/plugin_manager.py +++ b/src/conventionalrp/plugins/plugin_manager.py @@ -1,19 +1,204 @@ +""" +插件管理器 +""" + import os +import sys import importlib +import importlib.util +from pathlib import Path +from typing import Dict, List, Optional, Type, Any +import logging + +from .base import Plugin + +logger = logging.getLogger(__name__) class PluginManager: - def __init__(self, plugin_dir: str): - self.plugin_dir = plugin_dir - self.plugins = [] + def __init__(self, plugin_dirs: Optional[List[str]] = None): + self.plugin_dirs = plugin_dirs or [] + self.plugins: Dict[str, Plugin] = {} + self.plugin_classes: Dict[str, Type[Plugin]] = {} + + logger.info("PluginManager initialized with %d directories", + len(self.plugin_dirs)) + + def add_plugin_dir(self, directory: str): - def load_plugins(self): - for plugin in os.listdir(self.plugin_dir): - if plugin.endswith(".py"): - plugin_name = plugin.split(".")[0] - module = importlib.import_module(f"{self.plugin_dir}.{plugin_name}") - self.plugins.append(module) + if directory not in self.plugin_dirs: + self.plugin_dirs.append(directory) + logger.info(f"Added plugin directory: {directory}") + + def discover_plugins(self) -> List[str]: + discovered = [] + + for plugin_dir in self.plugin_dirs: + path = Path(plugin_dir) + if not path.exists(): + logger.warning(f"Plugin directory does not exist: {plugin_dir}") + continue + + if str(path.parent) not in sys.path: + sys.path.insert(0, str(path.parent)) - def run_plugins(self): - for plugin in self.plugins: - plugin.run() + for py_file in path.glob("*.py"): + if py_file.name.startswith("_"): + continue + + module_name = py_file.stem + discovered.append(module_name) + logger.debug(f"Discovered plugin module: {module_name}") + + logger.info(f"Discovered {len(discovered)} plugin modules") + return discovered + + def load_plugin_from_file(self, file_path: str) -> Optional[Type[Plugin]]: + try: + module_name = Path(file_path).stem + spec = importlib.util.spec_from_file_location(module_name, file_path) + + if spec is None or spec.loader is None: + logger.error(f"Failed to load plugin spec: {file_path}") + return None + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for name in dir(module): + obj = getattr(module, name) + if (isinstance(obj, type) and + issubclass(obj, Plugin) and + obj is not Plugin): + logger.info(f"Loaded plugin class: {name} from {file_path}") + return obj + + logger.warning(f"No Plugin subclass found in: {file_path}") + return None + + except Exception as e: + logger.error(f"Error loading plugin from {file_path}: {e}") + return None + + def load_plugin(self, module_name: str) -> Optional[str]: + try: + module = importlib.import_module(module_name) + + # 查找 Plugin 子类 + for name in dir(module): + obj = getattr(module, name) + if (isinstance(obj, type) and + issubclass(obj, Plugin) and + obj is not Plugin): + + plugin_class = obj + self.plugin_classes[name] = plugin_class + logger.info(f"Loaded plugin class: {name}") + return name + + logger.warning(f"No Plugin subclass found in module: {module_name}") + return None + + except Exception as e: + logger.error(f"Error loading plugin module {module_name}: {e}") + return None + + def register_plugin( + self, + plugin: Plugin, + replace: bool = False + ) -> bool: + if plugin.name in self.plugins and not replace: + logger.warning(f"Plugin {plugin.name} already registered") + return False + + self.plugins[plugin.name] = plugin + plugin.on_enable() + logger.info(f"Registered plugin: {plugin.name}") + return True + + def unregister_plugin(self, plugin_name: str) -> bool: + if plugin_name not in self.plugins: + logger.warning(f"Plugin {plugin_name} not found") + return False + + plugin = self.plugins[plugin_name] + plugin.on_disable() + del self.plugins[plugin_name] + logger.info(f"Unregistered plugin: {plugin_name}") + return True + + def get_plugin(self, plugin_name: str) -> Optional[Plugin]: + return self.plugins.get(plugin_name) + + def list_plugins(self) -> List[Dict[str, Any]]: + return [plugin.get_metadata() for plugin in self.plugins.values()] + + def enable_plugin(self, plugin_name: str) -> bool: + plugin = self.plugins.get(plugin_name) + if plugin is None: + logger.warning(f"Plugin {plugin_name} not found") + return False + + if not plugin.enabled: + plugin.enabled = True + plugin.on_enable() + logger.info(f"Enabled plugin: {plugin_name}") + + return True + + def disable_plugin(self, plugin_name: str) -> bool: + plugin = self.plugins.get(plugin_name) + if plugin is None: + logger.warning(f"Plugin {plugin_name} not found") + return False + + if plugin.enabled: + plugin.enabled = False + plugin.on_disable() + logger.info(f"Disabled plugin: {plugin_name}") + + return True + + def execute_plugins( + self, + data: Any, + plugin_type: Optional[Type[Plugin]] = None + ) -> Any: + result = data + + for plugin in self.plugins.values(): + if not plugin.enabled: + continue + + if plugin_type is not None and not isinstance(plugin, plugin_type): + continue + + try: + result = plugin.process(result) + logger.debug(f"Executed plugin: {plugin.name}") + except Exception as e: + logger.error(f"Error executing plugin {plugin.name}: {e}") + + return result + + def clear_plugins(self): + for plugin_name in list(self.plugins.keys()): + self.unregister_plugin(plugin_name) + + self.plugin_classes.clear() + logger.info("Cleared all plugins") + + def get_statistics(self) -> Dict[str, Any]: + enabled_count = sum(1 for p in self.plugins.values() if p.enabled) + + return { + "total_plugins": len(self.plugins), + "enabled_plugins": enabled_count, + "disabled_plugins": len(self.plugins) - enabled_count, + "plugin_classes": len(self.plugin_classes), + "plugin_directories": len(self.plugin_dirs), + } + + def __repr__(self) -> str: + return f"PluginManager(plugins={len(self.plugins)}, enabled={sum(1 for p in self.plugins.values() if p.enabled)})" diff --git a/src/conventionalrp/renderers/__init__.py b/src/conventionalrp/renderers/__init__.py index 6d5ac7a..e69de29 100644 --- a/src/conventionalrp/renderers/__init__.py +++ b/src/conventionalrp/renderers/__init__.py @@ -1,3 +0,0 @@ -""" -This file initializes the renderers module. -""" diff --git a/src/conventionalrp/renderers/html_renderer.py b/src/conventionalrp/renderers/html_renderer.py index 75efcd3..d435fc9 100644 --- a/src/conventionalrp/renderers/html_renderer.py +++ b/src/conventionalrp/renderers/html_renderer.py @@ -1,22 +1,423 @@ +from typing import Any, Dict, List, Optional from .base import BaseRenderer class HTMLRenderer(BaseRenderer): - def __init__(self): + THEMES = { + "light": { + "bg_color": "#ffffff", + "text_color": "#333333", + "header_bg": "#f5f5f5", + "border_color": "#e0e0e0", + "dialogue_color": "#4CAF50", + "dice_color": "#2196F3", + "narration_color": "#FF9800", + "system_color": "#9E9E9E", + "success_color": "#43a047", + "failure_color": "#e53935", + "code_bg": "#f5f5f5", + }, + "dark": { + "bg_color": "#1e1e1e", + "text_color": "#d4d4d4", + "header_bg": "#2d2d30", + "border_color": "#3e3e42", + "dialogue_color": "#6adb8d", + "dice_color": "#5fb3f5", + "narration_color": "#ffb74d", + "system_color": "#bdbdbd", + "success_color": "#66bb6a", + "failure_color": "#ef5350", + "code_bg": "#2d2d30", + }, + "fantasy": { + "bg_color": "#f9f6f1", + "text_color": "#3e2723", + "header_bg": "#d7ccc8", + "border_color": "#bcaaa4", + "dialogue_color": "#8d6e63", + "dice_color": "#5d4037", + "narration_color": "#795548", + "system_color": "#a1887f", + "success_color": "#7cb342", + "failure_color": "#c62828", + "code_bg": "#efebe9", + }, + } + + def __init__(self, theme: str = "light", custom_css: Optional[str] = None): super().__init__() - self.title = "TRPG Log Output" - - def render(self, data): - html_content = f"{self.title}" - html_content += "

TRPG Log Output

" - html_content += "" - return html_content - + self.title = "Rendered Log" + self.theme = theme if theme in self.THEMES else "light" + self.custom_css = custom_css + + def _get_css(self) -> str: + colors = self.THEMES[self.theme] + + css = f""" + * {{ + margin: 0; + padding: 0; + box-sizing: border-box; + }} + + body {{ + font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; + background-color: {colors['bg_color']}; + color: {colors['text_color']}; + line-height: 1.6; + padding: 20px; + max-width: 1200px; + margin: 0 auto; + }} + + header {{ + background: linear-gradient(135deg, {colors['header_bg']} 0%, {colors['border_color']} 100%); + padding: 30px; + border-radius: 10px; + margin-bottom: 30px; + box-shadow: 0 4px 6px rgba(0,0,0,0.1); + }} + + h1 {{ + font-size: 2.5em; + font-weight: 700; + margin-bottom: 10px; + color: {colors['text_color']}; + }} + + .subtitle {{ + font-size: 1.1em; + opacity: 0.8; + }} + + .stats {{ + display: flex; + gap: 20px; + margin-top: 20px; + flex-wrap: wrap; + }} + + .stat-item {{ + background: {colors['bg_color']}; + padding: 10px 20px; + border-radius: 5px; + font-size: 0.9em; + border: 1px solid {colors['border_color']}; + }} + + .stat-label {{ + opacity: 0.7; + margin-right: 5px; + }} + + .stat-value {{ + font-weight: bold; + }} + + .content-wrapper {{ + background: {colors['bg_color']}; + border-radius: 10px; + padding: 20px; + box-shadow: 0 2px 4px rgba(0,0,0,0.05); + }} + + .token {{ + margin: 15px 0; + padding: 15px; + border-left: 4px solid {colors['border_color']}; + border-radius: 5px; + background: {colors['header_bg']}; + transition: transform 0.2s, box-shadow 0.2s; + }} + + .token:hover {{ + transform: translateX(5px); + box-shadow: 0 4px 8px rgba(0,0,0,0.1); + }} + + .token.dialogue {{ + border-left-color: {colors['dialogue_color']}; + }} + + .token.dice {{ + border-left-color: {colors['dice_color']}; + }} + + .token.narration {{ + border-left-color: {colors['narration_color']}; + }} + + .token.system {{ + border-left-color: {colors['system_color']}; + }} + + .token.success {{ + border-left-color: {colors['success_color']}; + background: {colors['success_color']}15; + }} + + .token.failure {{ + border-left-color: {colors['failure_color']}; + background: {colors['failure_color']}15; + }} + + .token-header {{ + display: flex; + align-items: center; + gap: 10px; + margin-bottom: 8px; + }} + + .type-badge {{ + display: inline-block; + padding: 3px 10px; + border-radius: 12px; + font-size: 0.75em; + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.5px; + }} + + .type-badge.dialogue {{ + background: {colors['dialogue_color']}; + color: white; + }} + + .type-badge.dice {{ + background: {colors['dice_color']}; + color: white; + }} + + .type-badge.narration {{ + background: {colors['narration_color']}; + color: white; + }} + + .type-badge.system {{ + background: {colors['system_color']}; + color: white; + }} + + .speaker {{ + font-weight: 700; + font-size: 1.1em; + color: {colors['text_color']}; + }} + + .content {{ + margin-top: 8px; + line-height: 1.8; + font-size: 1em; + }} + + .metadata {{ + display: flex; + gap: 15px; + margin-top: 10px; + padding-top: 10px; + border-top: 1px solid {colors['border_color']}; + font-size: 0.85em; + opacity: 0.7; + flex-wrap: wrap; + }} + + .metadata-item {{ + display: flex; + align-items: center; + gap: 5px; + }} + + .tags {{ + display: flex; + gap: 5px; + flex-wrap: wrap; + }} + + .tag {{ + background: {colors['code_bg']}; + padding: 2px 8px; + border-radius: 3px; + font-size: 0.85em; + border: 1px solid {colors['border_color']}; + }} + + code {{ + background: {colors['code_bg']}; + padding: 2px 6px; + border-radius: 3px; + font-family: 'Courier New', monospace; + font-size: 0.9em; + }} + + .dice-result {{ + display: inline-block; + background: {colors['dice_color']}; + color: white; + padding: 3px 10px; + border-radius: 5px; + font-weight: bold; + margin-left: 5px; + }} + + footer {{ + margin-top: 40px; + padding: 20px; + text-align: center; + opacity: 0.6; + font-size: 0.9em; + }} + + @media (max-width: 768px) {{ + body {{ + padding: 10px; + }} + + h1 {{ + font-size: 1.8em; + }} + + .token {{ + padding: 10px; + }} + }} + """ + + if self.custom_css: + css += "\n" + self.custom_css + + return css + + def render(self, data: List[Dict[str, Any]]) -> str: + if data and not isinstance(data[0], dict): + data = [{"type": "text", "content": str(item)} for item in data] + + stats = self._calculate_stats(data) + + html = f""" + + + + + {self.title} + + + +
+

🎲 {self.title}

+
完整的游戏日志与数据分析
+
+
+ 总条目: + {stats['total']} +
+
+ 对话: + {stats['dialogue']} +
+
+ 骰子: + {stats['dice']} +
+
+ 旁白: + {stats['narration']} +
+
+ 系统: + {stats['system']} +
+
+
+ +
+""" + + for item in data: + html += self._render_token(item) + + html += """
+ + + +""" + + return html + + def _render_token(self, token: Dict[str, Any]) -> str: + token_type = token.get("type", "unknown") + speaker = token.get("speaker", "") + content = str(token.get("content", "")) + + content = (content + .replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """)) + + if token_type == "dice" and "result" in token: + content += f' {token["result"]}' + + html = f'
\n' + html += '
\n' + html += f' {token_type}\n' + + if speaker: + html += f' {speaker}\n' + + html += '
\n' + html += f'
{content}
\n' + + metadata_items = [] + + if "timestamp" in token: + metadata_items.append(f'⏰ {token["timestamp"]}') + + if "tags" in token and token["tags"]: + tags_html = '
' + for tag in token["tags"]: + tags_html += f'{tag}' + tags_html += '
' + metadata_items.append(tags_html) + + if "combat_data" in token: + combat = token["combat_data"] + if combat.get("type") == "damage": + metadata_items.append(f'⚔️ 伤害: {combat["amount"]}') + elif combat.get("type") == "healing": + metadata_items.append(f'💚 治疗: {combat["amount"]}') + + if metadata_items: + html += ' \n' + + html += '
\n' + + return html + + def _calculate_stats(self, data: List[Dict[str, Any]]) -> Dict[str, int]: + stats = { + "total": len(data), + "dialogue": 0, + "dice": 0, + "narration": 0, + "system": 0, + } + + for item in data: + token_type = item.get("type", "unknown") + if token_type in stats: + stats[token_type] += 1 + + return stats + def set_style(self, style): - # Implement style setting if needed - pass + """设置样式(向后兼容)""" + if style in self.THEMES: + self.theme = style + diff --git a/src/conventionalrp/renderers/json_renderer.py b/src/conventionalrp/renderers/json_renderer.py index 8dcdd6d..42d5fd4 100644 --- a/src/conventionalrp/renderers/json_renderer.py +++ b/src/conventionalrp/renderers/json_renderer.py @@ -1,11 +1,78 @@ +import json +from typing import Any, Dict, List from .base import BaseRenderer class JSONRenderer(BaseRenderer): - def render(self, data): - import json - - return json.dumps(data, ensure_ascii=False, indent=4) + def __init__(self, pretty: bool = True, indent: int = 2, sort_keys: bool = False): + super().__init__() + self.pretty = pretty + self.indent = indent if pretty else None + self.sort_keys = sort_keys + self.style = {} + + def render(self, data: Any) -> str: + if self.pretty and isinstance(data, list): + output = { + "metadata": { + "total_entries": len(data), + "renderer": "ConventionalRP JSONRenderer", + "version": "1.0.0", + }, + "statistics": self._calculate_stats(data), + "data": data, + } + else: + output = data + + return json.dumps( + output, + ensure_ascii=False, + indent=self.indent, + sort_keys=self.sort_keys, + ) + + def _calculate_stats(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: + stats = { + "dialogue": 0, + "dice": 0, + "narration": 0, + "system": 0, + "success": 0, + "failure": 0, + "other": 0, + } + + speakers = set() + + for item in data: + if not isinstance(item, dict): + continue + + token_type = item.get("type", "unknown") + if token_type in stats: + stats[token_type] += 1 + else: + stats["other"] += 1 + + if "speaker" in item and item["speaker"]: + speakers.add(item["speaker"]) + + stats["unique_speakers"] = len(speakers) + stats["speakers"] = sorted(list(speakers)) + + return stats def set_style(self, style): - self.style = style # Placeholder for potential styling options in the future + self.style = style + + # 从 style 中提取设置 + if isinstance(style, dict): + if "pretty" in style: + self.pretty = style["pretty"] + self.indent = 2 if self.pretty else None + if "indent" in style: + self.indent = style["indent"] + if "sort_keys" in style: + self.sort_keys = style["sort_keys"] + diff --git a/src/conventionalrp/renderers/markdown_renderer.py b/src/conventionalrp/renderers/markdown_renderer.py index 9df59a2..7080913 100644 --- a/src/conventionalrp/renderers/markdown_renderer.py +++ b/src/conventionalrp/renderers/markdown_renderer.py @@ -1,18 +1,19 @@ +""" +Markdown 渲染器 +""" + +from typing import Any, Dict, List, Union from .base import BaseRenderer -from typing import List, Dict, Any, Union class MarkdownRenderer(BaseRenderer): + def __init__(self, enable_syntax_hints: bool = True, enable_emoji: bool = True): + super().__init__() + self.enable_syntax_hints = enable_syntax_hints + self.enable_emoji = enable_emoji + self.style = {} + def render(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> str: - """ - Renders the given data in Markdown format. - - Args: - data: The data to render (can be list or dict). - - Returns: - str: The rendered Markdown string. - """ if isinstance(data, list): return self._render_list(data) elif isinstance(data, dict): @@ -21,40 +22,141 @@ class MarkdownRenderer(BaseRenderer): return str(data) def _render_list(self, data: List[Dict[str, Any]]) -> str: - """渲染列表数据为 Markdown""" - markdown_output = "# TRPG Log\n\n" - - for i, entry in enumerate(data, 1): - if entry.get("type") == "metadata": - markdown_output += f"## Entry {i}\n\n" - markdown_output += f"**Timestamp**: {entry.get('timestamp', 'N/A')} \n" - markdown_output += f"**Speaker**: {entry.get('speaker', 'N/A')} \n\n" - - content_items = entry.get("content", []) - if content_items: - markdown_output += "**Content**:\n\n" - for content in content_items: - content_type = content.get("type", "unknown") - content_text = content.get("content", "") - markdown_output += f"- [{content_type}] {content_text}\n" - markdown_output += "\n" - else: - markdown_output += f"- {entry}\n" + if data and not isinstance(data[0], dict): + data = [{"type": "text", "content": str(item)} for item in data] - return markdown_output + stats = self._calculate_stats(data) + + emoji_prefix = "🎲 " if self.enable_emoji else "" + md = f"# {emoji_prefix}\n\n" + + md += "## Statistics\n\n" + md += "| Type | Count |\n" + md += "|------|------|\n" + md += f"| Total | {stats['total']} |\n" + md += f"| Dialogue | {stats['dialogue']} |\n" + md += f"| Dice | {stats['dice']} |\n" + md += f"| Narration | {stats['narration']} |\n" + md += f"| System | {stats['system']} |\n" + md += f"| Metadata | {stats['metadata']} |\n\n" + + md += "---\n\n" + md += "## Detailed Log\n\n" + + # Render each token + for i, item in enumerate(data, 1): + md += self._render_token(item, i) + md += "\n" + + md += "---\n\n" + md += "_Generated by ConventionalRP Markdown Renderer_\n" + + return md + + def _render_token(self, token: Dict[str, Any], index: int) -> str: + token_type = token.get("type", "unknown") + + # 处理元数据类型(向后兼容) + if token_type == "metadata": + return self._render_metadata_token(token, index) + + speaker = token.get("speaker", "") + content = str(token.get("content", "")) + + type_emojis = { + "dialogue": "💬", + "dice": "🎲", + "narration": "📖", + "system": "⚙️", + "success": "✅", + "failure": "❌", + "text": "📄", + "unknown": "❓", + } + + emoji = type_emojis.get(token_type, "•") if self.enable_emoji else "-" + + md = f"### {emoji} [{index}] {token_type.upper()}\n\n" + + if speaker: + md += f"**说话者:** {speaker}\n\n" + + if token_type == "dice" and self.enable_syntax_hints: + md += f"```dice\n{content}\n```\n\n" + + if "result" in token: + md += f"**结果:** `{token['result']}`\n\n" + else: + md += f"{content}\n\n" + + metadata_lines = [] + + if "timestamp" in token: + metadata_lines.append(f"Timestamp: `{token['timestamp']}`") + + if "tags" in token and token["tags"]: + tags_str = " ".join([f"`{tag}`" for tag in token["tags"]]) + metadata_lines.append(f"Tags: {tags_str}") + + if "combat_data" in token: + combat = token["combat_data"] + if combat.get("type") == "damage": + metadata_lines.append(f"Damage: **{combat['amount']}** (Total: {combat.get('total_damage', '?')})") + elif combat.get("type") == "healing": + metadata_lines.append(f"Healing: **{combat['amount']}** (Total: {combat.get('total_healing', '?')})") + + if metadata_lines: + md += "> " + "\n> ".join(metadata_lines) + "\n\n" + + return md + + def _render_metadata_token(self, token: Dict[str, Any], index: int) -> str: + """渲染元数据类型的 token(向后兼容)""" + md = f"### Entry {index}\n\n" + md += f"**Timestamp**: {token.get('timestamp', 'N/A')} \n" + md += f"**Speaker**: {token.get('speaker', 'N/A')} \n\n" + + content_items = token.get("content", []) + if content_items: + md += "**Content**:\n\n" + for content in content_items: + content_type = content.get("type", "unknown") + content_text = content.get("content", "") + md += f"- [{content_type}] {content_text}\n" + md += "\n" + + return md def _render_dict(self, data: Dict[str, Any]) -> str: - """渲染字典数据为 Markdown""" markdown_output = "" for key, value in data.items(): markdown_output += f"## {key}\n\n{value}\n\n" return markdown_output + + def _calculate_stats(self, data: List[Dict[str, Any]]) -> Dict[str, int]: + stats = { + "total": len(data), + "dialogue": 0, + "dice": 0, + "narration": 0, + "system": 0, + "metadata": 0, + } + + for item in data: + token_type = item.get("type", "unknown") + if token_type in stats: + stats[token_type] += 1 + + return stats def set_style(self, style): - """ - Sets the style for the Markdown renderer. + self.style = style + + # 从 style 中提取设置 + if isinstance(style, dict): + if "emoji" in style: + self.enable_emoji = style["emoji"] + if "syntax_hints" in style: + self.enable_syntax_hints = style["syntax_hints"] - Args: - style (dict): A dictionary of style options. - """ - self.style = style # Currently, Markdown does not support styling, but this can be extended. diff --git a/src/conventionalrp/tokenizer.py b/src/conventionalrp/tokenizer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/utils/__init__.py b/src/conventionalrp/utils/__init__.py index 53b7600..dadb3cd 100644 --- a/src/conventionalrp/utils/__init__.py +++ b/src/conventionalrp/utils/__init__.py @@ -1 +1,32 @@ -"""This file initializes the utils module.""" +from .exceptions import ( + ConventionalRPError, + ParserError, + RuleError, + ProcessorError, + ValidationError, + ConfigurationError, + safe_execute, + format_error, + validate_not_none, + validate_type, + validate_not_empty, +) +from .logging_config import setup_logging, get_logger, LogContext + +__all__ = [ + "ConventionalRPError", + "ParserError", + "RuleError", + "ProcessorError", + "ValidationError", + "ConfigurationError", + "safe_execute", + "format_error", + "validate_not_none", + "validate_type", + "validate_not_empty", + "setup_logging", + "get_logger", + "LogContext", +] + diff --git a/src/conventionalrp/utils/exceptions.py b/src/conventionalrp/utils/exceptions.py new file mode 100644 index 0000000..28fa179 --- /dev/null +++ b/src/conventionalrp/utils/exceptions.py @@ -0,0 +1,104 @@ +from typing import Optional, Any, Dict +import traceback +import logging + +logger = logging.getLogger(__name__) + + +class ConventionalRPError(Exception): + """基础异常类""" + + def __init__( + self, + message: str, + details: Optional[Dict[str, Any]] = None, + cause: Optional[Exception] = None + ): + super().__init__(message) + self.message = message + self.details = details or {} + self.cause = cause + + def __str__(self) -> str: + result = self.message + if self.details: + details_str = ", ".join(f"{k}={v}" for k, v in self.details.items()) + result += f" ({details_str})" + if self.cause: + result += f"\nCaused by: {str(self.cause)}" + return result + + +class ParserError(ConventionalRPError): + """解析错误""" + pass + + +class RuleError(ConventionalRPError): + """规则相关错误""" + pass + + +class ProcessorError(ConventionalRPError): + """处理器错误""" + pass + + +class ValidationError(ConventionalRPError): + """验证错误""" + pass + + +class ConfigurationError(ConventionalRPError): + """配置错误""" + pass + + +def safe_execute(func, *args, default=None, error_msg="Operation failed", **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"{error_msg}: {e}") + return default + + +def format_error(error: Exception, include_traceback: bool = False) -> str: + error_type = type(error).__name__ + error_msg = str(error) + + result = f"[{error_type}] {error_msg}" + + if include_traceback: + tb = traceback.format_exc() + result += f"\n\nTraceback:\n{tb}" + + return result + + +def validate_not_none(value: Any, name: str): + if value is None: + raise ValidationError( + f"{name} cannot be None", + details={"parameter": name} + ) + + +def validate_type(value: Any, expected_type: type, name: str): + if not isinstance(value, expected_type): + raise ValidationError( + f"{name} must be of type {expected_type.__name__}, " + f"got {type(value).__name__}", + details={ + "parameter": name, + "expected_type": expected_type.__name__, + "actual_type": type(value).__name__ + } + ) + + +def validate_not_empty(value: Any, name: str): + if not value: + raise ValidationError( + f"{name} cannot be empty", + details={"parameter": name, "value": value} + ) diff --git a/src/conventionalrp/utils/logging_config.py b/src/conventionalrp/utils/logging_config.py new file mode 100644 index 0000000..956d6d4 --- /dev/null +++ b/src/conventionalrp/utils/logging_config.py @@ -0,0 +1,77 @@ +import logging +import sys +from pathlib import Path +from typing import Optional +from datetime import datetime + + +def setup_logging( + level: str = "INFO", + log_file: Optional[str] = None, + format_string: Optional[str] = None, + include_timestamp: bool = True, + include_module: bool = True +) -> logging.Logger: + log_level = getattr(logging, level.upper(), logging.INFO) + + if format_string is None: + parts = [] + if include_timestamp: + parts.append("%(asctime)s") + parts.append("[%(levelname)s]") + if include_module: + parts.append("%(name)s") + parts.append("%(message)s") + format_string = " - ".join(parts) + + logging.basicConfig( + level=log_level, + format=format_string, + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[] + ) + + logger = logging.getLogger("conventionalrp") + logger.setLevel(log_level) + + logger.handlers.clear() + + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(log_level) + console_formatter = logging.Formatter(format_string, datefmt="%Y-%m-%d %H:%M:%S") + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + if log_file: + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setLevel(log_level) + file_handler.setFormatter(console_formatter) + logger.addHandler(file_handler) + + logger.info(f"Logging to file: {log_file}") + + logger.info(f"Logging configured at {level} level") + return logger + + +def get_logger(name: str) -> logging.Logger: + return logging.getLogger(f"conventionalrp.{name}") + + +class LogContext: + def __init__(self, logger: logging.Logger, level: str): + self.logger = logger + self.level = getattr(logging, level.upper()) + self.original_level = None + + def __enter__(self): + self.original_level = self.logger.level + self.logger.setLevel(self.level) + return self.logger + + def __exit__(self, exc_type, exc_val, exc_tb): + self.logger.setLevel(self.original_level) + return False diff --git a/src/conventionalrp/utils/text_processing.py b/src/conventionalrp/utils/text_processing.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/lib.rs b/src/lib.rs index 8f19266..684001c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,253 @@ use pyo3::prelude::*; +use pyo3::types::{PyDict, PyList}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; -#[pyfunction] -fn sum_as_string(a: usize, b: usize) -> PyResult { - Ok((a + b).to_string()) +#[derive(Debug, Clone, Serialize, Deserialize)] +#[pyclass] +pub enum TokenType { + Metadata, + DiceRoll, + Dialogue, + Action, + Ooc, + System, + Text, + Unknown, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[pyclass] +pub struct Token { + #[pyo3(get, set)] + pub token_type: String, + #[pyo3(get, set)] + pub content: String, + #[pyo3(get, set)] + pub metadata: HashMap, +} + +#[pymethods] +impl Token { + #[new] + fn new(token_type: String, content: String) -> Self { + Token { + token_type, + content, + metadata: HashMap::new(), + } + } + + fn add_metadata(&mut self, key: String, value: String) { + self.metadata.insert(key, value); + } + + fn get_metadata(&self, key: &str) -> Option { + self.metadata.get(key).cloned() + } + fn to_dict(&self, py: Python) -> PyResult> { + let dict = PyDict::new(py); + dict.set_item("type", &self.token_type)?; + dict.set_item("content", &self.content)?; + + let metadata_dict = PyDict::new(py); + for (k, v) in &self.metadata { + metadata_dict.set_item(k, v)?; + } + dict.set_item("metadata", metadata_dict)?; + + Ok(dict.into()) + } + + fn __repr__(&self) -> String { + format!("Token(type={}, content={})", self.token_type, self.content) + } +} + +#[pyclass] +pub struct RegexRule { + pattern: Regex, + #[pyo3(get, set)] + pub rule_type: String, + #[pyo3(get, set)] + pub priority: i32, +} + +#[pymethods] +impl RegexRule { + #[new] + fn new(pattern: String, rule_type: String, priority: i32) -> PyResult { + let regex = Regex::new(&pattern) + .map_err(|e| PyErr::new::( + format!("Invalid regex pattern: {}", e) + ))?; + + Ok(RegexRule { + pattern: regex, + rule_type, + priority, + }) + } + + fn matches(&self, text: &str) -> bool { + self.pattern.is_match(text) + } + + + fn extract(&self, text: &str) -> Option> { + self.pattern.captures(text).map(|caps| { + caps.iter() + .skip(1) // 跳过完整匹配 + .filter_map(|m| m.map(|m| m.as_str().to_string())) + .collect() + }) + } + + fn find_all(&self, text: &str, py: Python) -> PyResult> { + let matches: Vec<(usize, usize, String)> = self.pattern + .find_iter(text) + .map(|m| (m.start(), m.end(), m.as_str().to_string())) + .collect(); + + let list = PyList::empty(py); + for (start, end, matched) in matches { + let dict = PyDict::new(py); + dict.set_item("start", start)?; + dict.set_item("end", end)?; + dict.set_item("text", matched)?; + list.append(dict)?; + } + + Ok(list.into()) + } +} + +#[pyclass] +pub struct TextParser { + rules: Vec<(Regex, String, i32)>, // (pattern, type, priority) +} + +#[pymethods] +impl TextParser { + #[new] + fn new() -> Self { + TextParser { rules: Vec::new() } + } + + fn add_rule(&mut self, pattern: String, rule_type: String, priority: i32) -> PyResult<()> { + let regex = Regex::new(&pattern) + .map_err(|e| PyErr::new::( + format!("Invalid regex: {}", e) + ))?; + + self.rules.push((regex, rule_type, priority)); + self.rules.sort_by(|a, b| b.2.cmp(&a.2)); + + Ok(()) + } + + fn parse_line(&self, text: &str) -> Vec<(String, String, usize, usize)> { + let mut results = Vec::new(); + let mut processed_ranges: Vec<(usize, usize)> = Vec::new(); + for (pattern, rule_type, _priority) in &self.rules { + for mat in pattern.find_iter(text) { + let start = mat.start(); + let end = mat.end(); + + let overlaps = processed_ranges.iter().any(|(s, e)| { + (start >= *s && start < *e) || (end > *s && end <= *e) || (start <= *s && end >= *e) + }); + + if !overlaps { + results.push(( + rule_type.clone(), + mat.as_str().to_string(), + start, + end, + )); + processed_ranges.push((start, end)); + } + } + } + + results.sort_by_key(|r| r.2); + results + } + fn parse_lines(&self, lines: Vec, py: Python) -> PyResult> { + let list = PyList::empty(py); + + for line in lines { + let parsed = self.parse_line(&line); + let line_result = PyList::empty(py); + + for (rule_type, content, start, end) in parsed { + let dict = PyDict::new(py); + dict.set_item("type", rule_type)?; + dict.set_item("content", content)?; + dict.set_item("start", start)?; + dict.set_item("end", end)?; + line_result.append(dict)?; + } + + list.append(line_result)?; + } + + Ok(list.into()) + } + + fn clear_rules(&mut self) { + self.rules.clear(); + } + + fn rule_count(&self) -> usize { + self.rules.len() + } } #[pyclass] -pub struct Base {} +pub struct FastMatcher { + patterns: Vec, +} + +#[pymethods] +impl FastMatcher { + #[new] + fn new(patterns: Vec) -> Self { + FastMatcher { patterns } + } + + + fn contains_any(&self, text: &str) -> bool { + self.patterns.iter().any(|p| text.contains(p)) + } + + + fn find_matches(&self, text: &str) -> Vec { + self.patterns + .iter() + .filter(|p| text.contains(*p)) + .cloned() + .collect() + } + + fn count_matches(&self, text: &str, py: Python) -> PyResult> { + let dict = PyDict::new(py); + for pattern in &self.patterns { + let count = text.matches(pattern.as_str()).count(); + dict.set_item(pattern, count)?; + } + Ok(dict.into()) + } +} -/// A Python module implemented in Rust. The name of this function must match -/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to -/// import the module. #[pymodule] fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { - m.add_function(wrap_pyfunction!(sum_as_string, m)?)?; - m.add_class::()?; + // 添加类 + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) } \ No newline at end of file -- cgit v1.2.3-70-g09d2