From cbc653ffd0ea9abf4360623dc7a7651e1a49cc61 Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Sat, 25 Oct 2025 00:30:48 +0800 Subject: feat: Implement plugin system with combat tracker and dice analyzer - Added `plugin_system_demo.py` to demonstrate basic plugin usage, processing, and analysis. - Created `CombatTrackerPlugin` for tracking combat statistics including damage and healing. - Developed `DiceAnalyzerPlugin` for analyzing dice rolls and calculating success rates. - Introduced `renderer_demo.py` for rendering output in HTML, Markdown, and JSON formats. - Implemented `rule_system_demo.py` to showcase rule engine capabilities with various examples. - Established core rule engine functionality in `rules.py` with support for conditions and actions. - Enhanced base plugin structure in `base.py` to support different plugin types (Processor, Renderer, Analyzer). - Added custom exception handling in `exceptions.py` for better error management. - Configured logging setup in `logging_config.py` for improved logging capabilities. - Created unit tests in `test_rust_core.py` to validate core functionalities and performance. --- src/conventionalrp/__init__.py | 36 +- src/conventionalrp/__main__.py | 9 - src/conventionalrp/_core.pyi | 238 +++++++++++- src/conventionalrp/base.py | 0 src/conventionalrp/core/__init__.py | 9 +- src/conventionalrp/core/processor.py | 150 +++++--- src/conventionalrp/core/rules.py | 238 ++++++++++++ src/conventionalrp/extractors/__init__.py | 3 - src/conventionalrp/extractors/rule_extractor.py | 25 -- src/conventionalrp/html_renderer.py | 0 src/conventionalrp/json_renderer.py | 0 src/conventionalrp/markdown_renderer.py | 0 src/conventionalrp/plugins/__init__.py | 20 +- src/conventionalrp/plugins/base.py | 103 +++++ src/conventionalrp/plugins/plugin_manager.py | 209 ++++++++++- src/conventionalrp/renderers/__init__.py | 3 - src/conventionalrp/renderers/html_renderer.py | 433 +++++++++++++++++++++- src/conventionalrp/renderers/json_renderer.py | 77 +++- src/conventionalrp/renderers/markdown_renderer.py | 176 +++++++-- src/conventionalrp/tokenizer.py | 0 src/conventionalrp/utils/__init__.py | 33 +- src/conventionalrp/utils/exceptions.py | 104 ++++++ src/conventionalrp/utils/logging_config.py | 77 ++++ src/conventionalrp/utils/text_processing.py | 0 src/lib.rs | 252 ++++++++++++- 25 files changed, 2009 insertions(+), 186 deletions(-) delete mode 100644 src/conventionalrp/base.py create mode 100644 src/conventionalrp/core/rules.py delete mode 100644 src/conventionalrp/html_renderer.py delete mode 100644 src/conventionalrp/json_renderer.py delete mode 100644 src/conventionalrp/markdown_renderer.py create mode 100644 src/conventionalrp/plugins/base.py delete mode 100644 src/conventionalrp/tokenizer.py create mode 100644 src/conventionalrp/utils/exceptions.py create mode 100644 src/conventionalrp/utils/logging_config.py delete mode 100644 src/conventionalrp/utils/text_processing.py (limited to 'src') diff --git a/src/conventionalrp/__init__.py b/src/conventionalrp/__init__.py index 06dbd63..ab4b17d 100644 --- a/src/conventionalrp/__init__.py +++ b/src/conventionalrp/__init__.py @@ -1,15 +1,45 @@ +""" +Conventional Role Play SDK (ConventionalRP) +""" + import sys from importlib.metadata import version from . import _core +from .core import Parser, Processor, Rule, RuleEngine +from .utils import ( + setup_logging, + get_logger, + ConventionalRPError, + ParserError, + RuleError, + ProcessorError, + ValidationError, + ConfigurationError, +) -__all__ = ["_core", "__version__"] +__all__ = [ + "Parser", + "Processor", + "Rule", + "RuleEngine", + "setup_logging", + "get_logger", + "ConventionalRPError", + "ParserError", + "RuleError", + "ProcessorError", + "ValidationError", + "ConfigurationError", + "__version__", +] if sys.version_info >= (3, 8): - # For Python 3.8+ __version__ = version("conventionalrp") elif sys.version_info < (3, 8): from pkg_resources import get_distribution - # For Python < 3.8 __version__ = get_distribution("conventionalrp").version + +_default_logger = setup_logging(level="INFO") + diff --git a/src/conventionalrp/__main__.py b/src/conventionalrp/__main__.py index fa4f45e..e69de29 100644 --- a/src/conventionalrp/__main__.py +++ b/src/conventionalrp/__main__.py @@ -1,9 +0,0 @@ -from ._core import sum_as_string - - -def main(): - print(sum_as_string(1, 2)) - - -if __name__ == "__main__": - main() diff --git a/src/conventionalrp/_core.pyi b/src/conventionalrp/_core.pyi index 0805887..6f87269 100644 --- a/src/conventionalrp/_core.pyi +++ b/src/conventionalrp/_core.pyi @@ -1,3 +1,237 @@ -def sum_as_string(a: int, b: int) -> str: ... +""" +提供高性能的文本解析和匹配功能 +""" -class Base: ... +from typing import Dict, List, Optional, Tuple + +class Base: + """基础类(向后兼容)""" + ... + +class Token: + """ + 解析后的 Token + + 表示文本中的一个语义单元,包含类型、内容和元数据 + """ + + token_type: str + content: str + metadata: Dict[str, str] + + def __init__(self, token_type: str, content: str) -> None: + """ + 创建新的 Token + + Args: + token_type: Token 类型 + content: Token 内容 + """ + ... + + def add_metadata(self, key: str, value: str) -> None: + """ + 添加元数据 + + Args: + key: 元数据键 + value: 元数据值 + """ + ... + + def get_metadata(self, key: str) -> Optional[str]: + """ + 获取元数据 + + Args: + key: 元数据键 + + Returns: + 元数据值,如果不存在返回 None + """ + ... + + def to_dict(self) -> Dict[str, any]: + """ + 转换为 Python 字典 + + Returns: + 包含 Token 所有信息的字典 + """ + ... + + def __repr__(self) -> str: ... + +class RegexRule: + """ + 正则表达式规则 + + 用于文本匹配和提取 + """ + + rule_type: str + priority: int + + def __init__(self, pattern: str, rule_type: str, priority: int) -> None: + """ + 创建正则规则 + + Args: + pattern: 正则表达式模式 + rule_type: 规则类型 + priority: 优先级(数字越大优先级越高) + + Raises: + ValueError: 如果正则表达式无效 + """ + ... + + def matches(self, text: str) -> bool: + """ + 测试文本是否匹配 + + Args: + text: 待测试的文本 + + Returns: + 是否匹配 + """ + ... + + def extract(self, text: str) -> Optional[List[str]]: + """ + 提取匹配的捕获组 + + Args: + text: 待提取的文本 + + Returns: + 捕获组列表,如果不匹配返回 None + """ + ... + + def find_all(self, text: str) -> List[Tuple[int, int, str]]: + """ + 查找所有匹配 + + Args: + text: 待搜索的文本 + + Returns: + 列表,每个元素为 (start, end, matched_text) + """ + ... + +class TextParser: + """ + 高性能文本解析器 + + 支持多规则、优先级排序的文本解析 + """ + + def __init__(self) -> None: + """创建新的解析器""" + ... + + def add_rule(self, pattern: str, rule_type: str, priority: int) -> None: + """ + 添加解析规则 + + Args: + pattern: 正则表达式模式 + rule_type: 规则类型 + priority: 优先级 + + Raises: + ValueError: 如果正则表达式无效 + """ + ... + + def parse_line(self, text: str) -> List[Tuple[str, str, int, int]]: + """ + 解析单行文本 + + Args: + text: 待解析的文本 + + Returns: + 列表,每个元素为 (type, content, start, end) + """ + ... + + def parse_lines(self, lines: List[str]) -> List[List[Dict[str, any]]]: + """ + 批量解析多行文本 + + Args: + lines: 文本行列表 + + Returns: + 每行的解析结果列表 + """ + ... + + def clear_rules(self) -> None: + """清除所有规则""" + ... + + def rule_count(self) -> int: + """ + 获取规则数量 + + Returns: + 当前规则数量 + """ + ... + +class FastMatcher: + """ + 快速字符串匹配器 + + 用于高效的多模式字符串匹配 + """ + + def __init__(self, patterns: List[str]) -> None: + """ + 创建匹配器 + + Args: + patterns: 模式列表 + """ + ... + + def contains_any(self, text: str) -> bool: + """ + 检查文本是否包含任意模式 + + Args: + text: 待检查的文本 + + Returns: + 是否包含任意模式 + """ + ... + + def find_matches(self, text: str) -> List[str]: + """ + 查找所有匹配的模式 + + Args: + text: 待搜索的文本 + + Returns: + 匹配的模式列表 + """ + ... + + def count_matches(self, text: str) -> Dict[str, int]: + """ + 统计每个模式的出现次数 + + Args: + text: 待统计的文本 + + Returns: + 字典,键为模式,值为出现次数 + """ + ... diff --git a/src/conventionalrp/base.py b/src/conventionalrp/base.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/core/__init__.py b/src/conventionalrp/core/__init__.py index 91d0f8f..08829b8 100644 --- a/src/conventionalrp/core/__init__.py +++ b/src/conventionalrp/core/__init__.py @@ -1,3 +1,6 @@ -""" -This file initializes the core module of the conventionalrp SDK. -""" +from .parser import Parser +from .processor import Processor +from .rules import Rule, RuleEngine + +__all__ = ["Parser", "Processor", "Rule", "RuleEngine"] + diff --git a/src/conventionalrp/core/processor.py b/src/conventionalrp/core/processor.py index bc74ffb..12ca32b 100644 --- a/src/conventionalrp/core/processor.py +++ b/src/conventionalrp/core/processor.py @@ -1,68 +1,104 @@ -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Callable +import logging +from .rules import RuleEngine, Rule + +logger = logging.getLogger(__name__) class Processor: - """处理器,用于处理解析后的token""" - def __init__(self, rules: Optional[Dict[str, Any]] = None): - """ - 初始化处理器 - - Args: - rules: 处理规则(可选) - """ self.rules = rules or {} - - def process_tokens(self, tokens: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - 处理token列表 + self.rule_engine = RuleEngine() + self.custom_processors: List[Callable] = [] + + self._load_rules_to_engine() - Args: - tokens: 解析后的token列表 + logger.info("Processor initialized with %d rules", + self.rule_engine.rule_count()) + + def _load_rules_to_engine(self): + if not isinstance(self.rules, dict): + return + + rules_list = self.rules.get("rules", []) + for rule_dict in rules_list: + if not isinstance(rule_dict, dict): + continue - Returns: - 处理后的数据列表 - """ + try: + self.rule_engine.add_rule_dict( + name=rule_dict.get("name", "unnamed"), + condition=rule_dict.get("condition", {}), + action=rule_dict.get("action", {}), + priority=rule_dict.get("priority", 50) + ) + except Exception as e: + logger.warning("Failed to load rule: %s", e) + + def add_rule(self, rule: Rule): + self.rule_engine.add_rule(rule) + logger.debug("Added rule: %s", rule.name) + + def add_processor(self, processor: Callable[[Dict[str, Any]], Dict[str, Any]]): + self.custom_processors.append(processor) + logger.debug("Added custom processor") + + def process_tokens( + self, + tokens: List[Dict[str, Any]], + apply_all_rules: bool = False + ) -> List[Dict[str, Any]]: + if not tokens: + logger.warning("Empty token list provided") + return [] + + logger.info("Processing %d tokens", len(tokens)) processed_data = [] - for token in tokens: - processed_token = self.apply_rules(token) - processed_data.append(processed_token) - return processed_data - - def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]: - """ - 对单个token应用规则 - Args: - token: 单个token - - Returns: - 处理后的token - """ - # 基础实现:直接返回token - # 可以在此添加更多处理逻辑 + for i, token in enumerate(tokens): + try: + processed_token = self.process_single_token(token, apply_all_rules) + processed_data.append(processed_token) + except Exception as e: + logger.error("Error processing token %d: %s", i, e) + # 发生错误时保留原始 token + processed_data.append(token) + + logger.info("Successfully processed %d tokens", len(processed_data)) + return processed_data + + def process_single_token( + self, + token: Dict[str, Any], + apply_all_rules: bool = False + ) -> Dict[str, Any]: processed = token.copy() - # 添加处理时间戳 + if self.rule_engine.rule_count() > 0: + processed = self.rule_engine.process(processed, apply_all_rules) + + for processor in self.custom_processors: + try: + processed = processor(processed) + except Exception as e: + logger.error("Custom processor failed: %s", e) + if "timestamp" in processed: processed["processed"] = True return processed - - def generate_output(self, processed_data: List[Dict[str, Any]], format_type: str) -> str: - """ - 生成指定格式的输出 + + def apply_rules(self, token: Dict[str, Any]) -> Dict[str, Any]: + return self.process_single_token(token) + + def generate_output( + self, + processed_data: List[Dict[str, Any]], + format_type: str + ) -> str: + logger.info("Generating %s output for %d items", + format_type, len(processed_data)) - Args: - processed_data: 处理后的数据 - format_type: 输出格式 (json/html/markdown) - - Returns: - 格式化后的字符串 - - Raises: - ValueError: 不支持的格式类型 - """ if format_type == "json": return self.generate_json_output(processed_data) elif format_type == "html": @@ -71,20 +107,24 @@ class Processor: return self.generate_markdown_output(processed_data) else: raise ValueError(f"Unsupported format type: {format_type}") - + def generate_json_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成JSON格式输出""" import json return json.dumps(processed_data, ensure_ascii=False, indent=2) - + def generate_html_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成HTML格式输出""" return ( "" + "".join(f"
{data}
" for data in processed_data) + "" ) - + def generate_markdown_output(self, processed_data: List[Dict[str, Any]]) -> str: - """生成Markdown格式输出""" return "\n".join(f"- {data}" for data in processed_data) + + def get_statistics(self) -> Dict[str, Any]: + return { + "rule_count": self.rule_engine.rule_count(), + "custom_processor_count": len(self.custom_processors), + "has_rules_config": bool(self.rules), + } diff --git a/src/conventionalrp/core/rules.py b/src/conventionalrp/core/rules.py new file mode 100644 index 0000000..f198d4e --- /dev/null +++ b/src/conventionalrp/core/rules.py @@ -0,0 +1,238 @@ +from typing import Dict, Any, Callable, List, Optional +from enum import Enum +import re + + +class RuleCondition(Enum): + """规则条件类型""" + EQUALS = "equals" + CONTAINS = "contains" + MATCHES = "matches" + STARTS_WITH = "starts_with" + ENDS_WITH = "ends_with" + IN_LIST = "in_list" + GREATER_THAN = "greater_than" + LESS_THAN = "less_than" + + +class Rule: + def __init__( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + self.name = name + self.condition = condition + self.action = action + self.priority = priority + self._compiled_patterns = {} + + self._precompile_patterns() + + def _precompile_patterns(self): + """预编译正则表达式以提高性能""" + if isinstance(self.condition, dict): + for key, value in self.condition.items(): + if isinstance(value, dict) and value.get("type") == "matches": + pattern = value.get("pattern") + if pattern: + self._compiled_patterns[key] = re.compile(pattern) + + def matches(self, data: Dict[str, Any]) -> bool: + """ + 检查数据是否匹配规则条件 + """ + if not isinstance(self.condition, dict): + return False + + for field, condition_spec in self.condition.items(): + if not self._check_field_condition(data, field, condition_spec): + return False + + return True + + def _check_field_condition( + self, + data: Dict[str, Any], + field: str, + condition: Any + ) -> bool: + """检查单个字段的条件""" + value = data.get(field) + + if not isinstance(condition, dict): + return value == condition + + condition_type = condition.get("type") + expected_value = condition.get("value") + + if condition_type == "equals": + return value == expected_value + elif condition_type == "contains": + return expected_value in str(value) if value else False + elif condition_type == "matches": + if field in self._compiled_patterns: + pattern = self._compiled_patterns[field] + return bool(pattern.search(str(value))) if value else False + return False + elif condition_type == "starts_with": + return str(value).startswith(expected_value) if value else False + elif condition_type == "ends_with": + return str(value).endswith(expected_value) if value else False + elif condition_type == "in_list": + return value in expected_value if isinstance(expected_value, list) else False + elif condition_type == "greater_than": + try: + return float(value) > float(expected_value) + except (ValueError, TypeError): + return False + elif condition_type == "less_than": + try: + return float(value) < float(expected_value) + except (ValueError, TypeError): + return False + + return False + + def apply(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + 对匹配的数据应用规则动作 + """ + result = data.copy() + + if not isinstance(self.action, dict): + return result + + action_type = self.action.get("type") + + if action_type == "set_field": + field = self.action.get("field") + value = self.action.get("value") + if field: + result[field] = value + + elif action_type == "add_field": + field = self.action.get("field") + value = self.action.get("value") + if field and field not in result: + result[field] = value + + elif action_type == "remove_field": + field = self.action.get("field") + if field and field in result: + del result[field] + + elif action_type == "transform": + field = self.action.get("field") + func_name = self.action.get("function") + if field and field in result and func_name: + result[field] = self._apply_transform(result[field], func_name) + + elif action_type == "add_tag": + tag = self.action.get("tag") + if tag: + if "tags" not in result: + result["tags"] = [] + if tag not in result["tags"]: + result["tags"].append(tag) + + elif action_type == "copy_field": + source = self.action.get("source") + target = self.action.get("target") + if source and target and source in result: + result[target] = result[source] + + return result + + def _apply_transform(self, value: Any, func_name: str) -> Any: + transforms = { + "upper": lambda x: str(x).upper(), + "lower": lambda x: str(x).lower(), + "strip": lambda x: str(x).strip(), + "int": lambda x: int(x), + "float": lambda x: float(x), + "len": lambda x: len(x) if hasattr(x, '__len__') else 0, + } + + func = transforms.get(func_name) + if func: + try: + return func(value) + except Exception: + return value + return value + + def __repr__(self) -> str: + return f"Rule(name={self.name}, priority={self.priority})" + + +class RuleEngine: + """ + 规则引擎 + """ + + def __init__(self): + self.rules: List[Rule] = [] + self._sorted = False + + def add_rule(self, rule: Rule): + self.rules.append(rule) + self._sorted = False + + def add_rule_dict( + self, + name: str, + condition: Dict[str, Any], + action: Dict[str, Any], + priority: int = 50 + ): + """ + 从字典添加规则 + """ + rule = Rule(name, condition, action, priority) + self.add_rule(rule) + + def _ensure_sorted(self): + """确保规则按优先级排序""" + if not self._sorted: + self.rules.sort(key=lambda r: r.priority, reverse=True) + self._sorted = True + + def process( + self, + data: Dict[str, Any], + apply_all: bool = False + ) -> Dict[str, Any]: + self._ensure_sorted() + result = data.copy() + + for rule in self.rules: + if rule.matches(result): + result = rule.apply(result) + if not apply_all: + break + + return result + + def process_batch( + self, + data_list: List[Dict[str, Any]], + apply_all: bool = False + ) -> List[Dict[str, Any]]: + return [self.process(data, apply_all) for data in data_list] + + def find_matching_rules(self, data: Dict[str, Any]) -> List[Rule]: + self._ensure_sorted() + return [rule for rule in self.rules if rule.matches(data)] + + def clear_rules(self): + self.rules.clear() + self._sorted = False + + def rule_count(self) -> int: + return len(self.rules) + + def __repr__(self) -> str: + return f"RuleEngine(rules={len(self.rules)})" diff --git a/src/conventionalrp/extractors/__init__.py b/src/conventionalrp/extractors/__init__.py index e693e03..e69de29 100644 --- a/src/conventionalrp/extractors/__init__.py +++ b/src/conventionalrp/extractors/__init__.py @@ -1,3 +0,0 @@ -""" -This file initializes the extractors module. -""" diff --git a/src/conventionalrp/extractors/rule_extractor.py b/src/conventionalrp/extractors/rule_extractor.py index bfc60c8..12b9e4b 100644 --- a/src/conventionalrp/extractors/rule_extractor.py +++ b/src/conventionalrp/extractors/rule_extractor.py @@ -15,12 +15,6 @@ class RuleExtractor(BaseExtractor): """规则提取器,用于从配置文件加载解析规则""" def __init__(self, config_file: Optional[str] = None): - """ - 初始化规则提取器 - - Args: - config_file: 规则配置文件路径(可选) - """ self.config_file = config_file self.rules: Dict[str, Any] = {} if config_file: @@ -29,16 +23,6 @@ class RuleExtractor(BaseExtractor): def load_rules_from_file(self, config_file: str) -> Dict[str, Any]: """ 从文件加载规则 - - Args: - config_file: 规则配置文件路径 - - Returns: - 解析后的规则字典 - - Raises: - FileNotFoundError: 文件不存在 - ValueError: 文件内容为空或格式错误 """ if not Path(config_file).exists(): raise FileNotFoundError(f"Rule file not found: {config_file}") @@ -56,12 +40,6 @@ class RuleExtractor(BaseExtractor): def load_rules(self, config_file: str) -> Dict[str, Any]: """ 加载规则(兼容旧接口) - - Args: - config_file: 规则配置文件路径 - - Returns: - 解析后的规则字典 """ self.rules = self.load_rules_from_file(config_file) return self.rules @@ -69,8 +47,5 @@ class RuleExtractor(BaseExtractor): def extract(self) -> Dict[str, Any]: """ 提取规则 - - Returns: - 规则字典 """ return self.rules diff --git a/src/conventionalrp/html_renderer.py b/src/conventionalrp/html_renderer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/json_renderer.py b/src/conventionalrp/json_renderer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/markdown_renderer.py b/src/conventionalrp/markdown_renderer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/conventionalrp/plugins/__init__.py b/src/conventionalrp/plugins/__init__.py index c7bbee6..e759fd3 100644 --- a/src/conventionalrp/plugins/__init__.py +++ b/src/conventionalrp/plugins/__init__.py @@ -1,3 +1,17 @@ -""" -This file initializes the plugins module. -""" +from .base import ( + Plugin, + ParserPlugin, + ProcessorPlugin, + RendererPlugin, + AnalyzerPlugin, +) +from .plugin_manager import PluginManager + +__all__ = [ + "Plugin", + "ParserPlugin", + "ProcessorPlugin", + "RendererPlugin", + "AnalyzerPlugin", + "PluginManager", +] diff --git a/src/conventionalrp/plugins/base.py b/src/conventionalrp/plugins/base.py new file mode 100644 index 0000000..f9e92a7 --- /dev/null +++ b/src/conventionalrp/plugins/base.py @@ -0,0 +1,103 @@ +from typing import Any, Dict, List, Optional +from abc import ABC, abstractmethod +import logging + +logger = logging.getLogger(__name__) + + +class Plugin(ABC): + """ + 插件基类 + + 所有插件必须继承此类并实现必要的方法 + """ + + def __init__(self, name: str, version: str = "1.0.0"): + self.name = name + self.version = version + self.enabled = True + self.logger = logging.getLogger(f"conventionalrp.plugins.{name}") + + @abstractmethod + def initialize(self, config: Optional[Dict[str, Any]] = None): + pass + + @abstractmethod + def process(self, data: Any) -> Any: + pass + + def on_enable(self): + """插件启用时调用""" + self.logger.info(f"Plugin {self.name} enabled") + + def on_disable(self): + """插件禁用时调用""" + self.logger.info(f"Plugin {self.name} disabled") + + def get_metadata(self) -> Dict[str, Any]: + return { + "name": self.name, + "version": self.version, + "enabled": self.enabled, + "type": self.__class__.__name__, + } + + def __repr__(self) -> str: + return f"Plugin(name={self.name}, version={self.version}, enabled={self.enabled})" + + +class ParserPlugin(Plugin): + def __init__(self, name: str, version: str = "1.0.0"): + super().__init__(name, version) + self.priority = 50 + + @abstractmethod + def can_parse(self, text: str) -> bool: + """ + 判断是否可以解析给定文本 + """ + pass + + @abstractmethod + def parse(self, text: str) -> List[Dict[str, Any]]: + """ + 解析文本 + """ + pass + + +class ProcessorPlugin(Plugin): + @abstractmethod + def process_token(self, token: Dict[str, Any]) -> Dict[str, Any]: + """ + 处理单个 token + """ + pass + + def process(self, data: Any) -> Any: + """ + 处理数据(实现基类方法) + """ + if isinstance(data, dict): + return self.process_token(data) + elif isinstance(data, list): + return [self.process_token(token) for token in data] + return data + + +class RendererPlugin(Plugin): + @abstractmethod + def render(self, data: Any) -> str: + pass + + def process(self, data: Any) -> Any: + return self.render(data) + + +class AnalyzerPlugin(Plugin): + @abstractmethod + def analyze(self, data: Any) -> Dict[str, Any]: + pass + + def process(self, data: Any) -> Any: + return self.analyze(data) diff --git a/src/conventionalrp/plugins/plugin_manager.py b/src/conventionalrp/plugins/plugin_manager.py index 0d49a9c..0230bda 100644 --- a/src/conventionalrp/plugins/plugin_manager.py +++ b/src/conventionalrp/plugins/plugin_manager.py @@ -1,19 +1,204 @@ +""" +插件管理器 +""" + import os +import sys import importlib +import importlib.util +from pathlib import Path +from typing import Dict, List, Optional, Type, Any +import logging + +from .base import Plugin + +logger = logging.getLogger(__name__) class PluginManager: - def __init__(self, plugin_dir: str): - self.plugin_dir = plugin_dir - self.plugins = [] + def __init__(self, plugin_dirs: Optional[List[str]] = None): + self.plugin_dirs = plugin_dirs or [] + self.plugins: Dict[str, Plugin] = {} + self.plugin_classes: Dict[str, Type[Plugin]] = {} + + logger.info("PluginManager initialized with %d directories", + len(self.plugin_dirs)) + + def add_plugin_dir(self, directory: str): - def load_plugins(self): - for plugin in os.listdir(self.plugin_dir): - if plugin.endswith(".py"): - plugin_name = plugin.split(".")[0] - module = importlib.import_module(f"{self.plugin_dir}.{plugin_name}") - self.plugins.append(module) + if directory not in self.plugin_dirs: + self.plugin_dirs.append(directory) + logger.info(f"Added plugin directory: {directory}") + + def discover_plugins(self) -> List[str]: + discovered = [] + + for plugin_dir in self.plugin_dirs: + path = Path(plugin_dir) + if not path.exists(): + logger.warning(f"Plugin directory does not exist: {plugin_dir}") + continue + + if str(path.parent) not in sys.path: + sys.path.insert(0, str(path.parent)) - def run_plugins(self): - for plugin in self.plugins: - plugin.run() + for py_file in path.glob("*.py"): + if py_file.name.startswith("_"): + continue + + module_name = py_file.stem + discovered.append(module_name) + logger.debug(f"Discovered plugin module: {module_name}") + + logger.info(f"Discovered {len(discovered)} plugin modules") + return discovered + + def load_plugin_from_file(self, file_path: str) -> Optional[Type[Plugin]]: + try: + module_name = Path(file_path).stem + spec = importlib.util.spec_from_file_location(module_name, file_path) + + if spec is None or spec.loader is None: + logger.error(f"Failed to load plugin spec: {file_path}") + return None + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for name in dir(module): + obj = getattr(module, name) + if (isinstance(obj, type) and + issubclass(obj, Plugin) and + obj is not Plugin): + logger.info(f"Loaded plugin class: {name} from {file_path}") + return obj + + logger.warning(f"No Plugin subclass found in: {file_path}") + return None + + except Exception as e: + logger.error(f"Error loading plugin from {file_path}: {e}") + return None + + def load_plugin(self, module_name: str) -> Optional[str]: + try: + module = importlib.import_module(module_name) + + # 查找 Plugin 子类 + for name in dir(module): + obj = getattr(module, name) + if (isinstance(obj, type) and + issubclass(obj, Plugin) and + obj is not Plugin): + + plugin_class = obj + self.plugin_classes[name] = plugin_class + logger.info(f"Loaded plugin class: {name}") + return name + + logger.warning(f"No Plugin subclass found in module: {module_name}") + return None + + except Exception as e: + logger.error(f"Error loading plugin module {module_name}: {e}") + return None + + def register_plugin( + self, + plugin: Plugin, + replace: bool = False + ) -> bool: + if plugin.name in self.plugins and not replace: + logger.warning(f"Plugin {plugin.name} already registered") + return False + + self.plugins[plugin.name] = plugin + plugin.on_enable() + logger.info(f"Registered plugin: {plugin.name}") + return True + + def unregister_plugin(self, plugin_name: str) -> bool: + if plugin_name not in self.plugins: + logger.warning(f"Plugin {plugin_name} not found") + return False + + plugin = self.plugins[plugin_name] + plugin.on_disable() + del self.plugins[plugin_name] + logger.info(f"Unregistered plugin: {plugin_name}") + return True + + def get_plugin(self, plugin_name: str) -> Optional[Plugin]: + return self.plugins.get(plugin_name) + + def list_plugins(self) -> List[Dict[str, Any]]: + return [plugin.get_metadata() for plugin in self.plugins.values()] + + def enable_plugin(self, plugin_name: str) -> bool: + plugin = self.plugins.get(plugin_name) + if plugin is None: + logger.warning(f"Plugin {plugin_name} not found") + return False + + if not plugin.enabled: + plugin.enabled = True + plugin.on_enable() + logger.info(f"Enabled plugin: {plugin_name}") + + return True + + def disable_plugin(self, plugin_name: str) -> bool: + plugin = self.plugins.get(plugin_name) + if plugin is None: + logger.warning(f"Plugin {plugin_name} not found") + return False + + if plugin.enabled: + plugin.enabled = False + plugin.on_disable() + logger.info(f"Disabled plugin: {plugin_name}") + + return True + + def execute_plugins( + self, + data: Any, + plugin_type: Optional[Type[Plugin]] = None + ) -> Any: + result = data + + for plugin in self.plugins.values(): + if not plugin.enabled: + continue + + if plugin_type is not None and not isinstance(plugin, plugin_type): + continue + + try: + result = plugin.process(result) + logger.debug(f"Executed plugin: {plugin.name}") + except Exception as e: + logger.error(f"Error executing plugin {plugin.name}: {e}") + + return result + + def clear_plugins(self): + for plugin_name in list(self.plugins.keys()): + self.unregister_plugin(plugin_name) + + self.plugin_classes.clear() + logger.info("Cleared all plugins") + + def get_statistics(self) -> Dict[str, Any]: + enabled_count = sum(1 for p in self.plugins.values() if p.enabled) + + return { + "total_plugins": len(self.plugins), + "enabled_plugins": enabled_count, + "disabled_plugins": len(self.plugins) - enabled_count, + "plugin_classes": len(self.plugin_classes), + "plugin_directories": len(self.plugin_dirs), + } + + def __repr__(self) -> str: + return f"PluginManager(plugins={len(self.plugins)}, enabled={sum(1 for p in self.plugins.values() if p.enabled)})" diff --git a/src/conventionalrp/renderers/__init__.py b/src/conventionalrp/renderers/__init__.py index 6d5ac7a..e69de29 100644 --- a/src/conventionalrp/renderers/__init__.py +++ b/src/conventionalrp/renderers/__init__.py @@ -1,3 +0,0 @@ -""" -This file initializes the renderers module. -""" diff --git a/src/conventionalrp/renderers/html_renderer.py b/src/conventionalrp/renderers/html_renderer.py index 75efcd3..d435fc9 100644 --- a/src/conventionalrp/renderers/html_renderer.py +++ b/src/conventionalrp/renderers/html_renderer.py @@ -1,22 +1,423 @@ +from typing import Any, Dict, List, Optional from .base import BaseRenderer class HTMLRenderer(BaseRenderer): - def __init__(self): + THEMES = { + "light": { + "bg_color": "#ffffff", + "text_color": "#333333", + "header_bg": "#f5f5f5", + "border_color": "#e0e0e0", + "dialogue_color": "#4CAF50", + "dice_color": "#2196F3", + "narration_color": "#FF9800", + "system_color": "#9E9E9E", + "success_color": "#43a047", + "failure_color": "#e53935", + "code_bg": "#f5f5f5", + }, + "dark": { + "bg_color": "#1e1e1e", + "text_color": "#d4d4d4", + "header_bg": "#2d2d30", + "border_color": "#3e3e42", + "dialogue_color": "#6adb8d", + "dice_color": "#5fb3f5", + "narration_color": "#ffb74d", + "system_color": "#bdbdbd", + "success_color": "#66bb6a", + "failure_color": "#ef5350", + "code_bg": "#2d2d30", + }, + "fantasy": { + "bg_color": "#f9f6f1", + "text_color": "#3e2723", + "header_bg": "#d7ccc8", + "border_color": "#bcaaa4", + "dialogue_color": "#8d6e63", + "dice_color": "#5d4037", + "narration_color": "#795548", + "system_color": "#a1887f", + "success_color": "#7cb342", + "failure_color": "#c62828", + "code_bg": "#efebe9", + }, + } + + def __init__(self, theme: str = "light", custom_css: Optional[str] = None): super().__init__() - self.title = "TRPG Log Output" - - def render(self, data): - html_content = f"