diff options
Diffstat (limited to 'src/conventionalrp/plugins/plugin_manager.py')
| -rw-r--r-- | src/conventionalrp/plugins/plugin_manager.py | 209 |
1 files changed, 197 insertions, 12 deletions
diff --git a/src/conventionalrp/plugins/plugin_manager.py b/src/conventionalrp/plugins/plugin_manager.py index 0d49a9c..0230bda 100644 --- a/src/conventionalrp/plugins/plugin_manager.py +++ b/src/conventionalrp/plugins/plugin_manager.py @@ -1,19 +1,204 @@ +""" +插件管理器 +""" + import os +import sys import importlib +import importlib.util +from pathlib import Path +from typing import Dict, List, Optional, Type, Any +import logging + +from .base import Plugin + +logger = logging.getLogger(__name__) class PluginManager: - def __init__(self, plugin_dir: str): - self.plugin_dir = plugin_dir - self.plugins = [] + def __init__(self, plugin_dirs: Optional[List[str]] = None): + self.plugin_dirs = plugin_dirs or [] + self.plugins: Dict[str, Plugin] = {} + self.plugin_classes: Dict[str, Type[Plugin]] = {} + + logger.info("PluginManager initialized with %d directories", + len(self.plugin_dirs)) + + def add_plugin_dir(self, directory: str): - def load_plugins(self): - for plugin in os.listdir(self.plugin_dir): - if plugin.endswith(".py"): - plugin_name = plugin.split(".")[0] - module = importlib.import_module(f"{self.plugin_dir}.{plugin_name}") - self.plugins.append(module) + if directory not in self.plugin_dirs: + self.plugin_dirs.append(directory) + logger.info(f"Added plugin directory: {directory}") + + def discover_plugins(self) -> List[str]: + discovered = [] + + for plugin_dir in self.plugin_dirs: + path = Path(plugin_dir) + if not path.exists(): + logger.warning(f"Plugin directory does not exist: {plugin_dir}") + continue + + if str(path.parent) not in sys.path: + sys.path.insert(0, str(path.parent)) - def run_plugins(self): - for plugin in self.plugins: - plugin.run() + for py_file in path.glob("*.py"): + if py_file.name.startswith("_"): + continue + + module_name = py_file.stem + discovered.append(module_name) + logger.debug(f"Discovered plugin module: {module_name}") + + logger.info(f"Discovered {len(discovered)} plugin modules") + return discovered + + def load_plugin_from_file(self, file_path: str) -> Optional[Type[Plugin]]: + try: + module_name = Path(file_path).stem + spec = importlib.util.spec_from_file_location(module_name, file_path) + + if spec is None or spec.loader is None: + logger.error(f"Failed to load plugin spec: {file_path}") + return None + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for name in dir(module): + obj = getattr(module, name) + if (isinstance(obj, type) and + issubclass(obj, Plugin) and + obj is not Plugin): + logger.info(f"Loaded plugin class: {name} from {file_path}") + return obj + + logger.warning(f"No Plugin subclass found in: {file_path}") + return None + + except Exception as e: + logger.error(f"Error loading plugin from {file_path}: {e}") + return None + + def load_plugin(self, module_name: str) -> Optional[str]: + try: + module = importlib.import_module(module_name) + + # 查找 Plugin 子类 + for name in dir(module): + obj = getattr(module, name) + if (isinstance(obj, type) and + issubclass(obj, Plugin) and + obj is not Plugin): + + plugin_class = obj + self.plugin_classes[name] = plugin_class + logger.info(f"Loaded plugin class: {name}") + return name + + logger.warning(f"No Plugin subclass found in module: {module_name}") + return None + + except Exception as e: + logger.error(f"Error loading plugin module {module_name}: {e}") + return None + + def register_plugin( + self, + plugin: Plugin, + replace: bool = False + ) -> bool: + if plugin.name in self.plugins and not replace: + logger.warning(f"Plugin {plugin.name} already registered") + return False + + self.plugins[plugin.name] = plugin + plugin.on_enable() + logger.info(f"Registered plugin: {plugin.name}") + return True + + def unregister_plugin(self, plugin_name: str) -> bool: + if plugin_name not in self.plugins: + logger.warning(f"Plugin {plugin_name} not found") + return False + + plugin = self.plugins[plugin_name] + plugin.on_disable() + del self.plugins[plugin_name] + logger.info(f"Unregistered plugin: {plugin_name}") + return True + + def get_plugin(self, plugin_name: str) -> Optional[Plugin]: + return self.plugins.get(plugin_name) + + def list_plugins(self) -> List[Dict[str, Any]]: + return [plugin.get_metadata() for plugin in self.plugins.values()] + + def enable_plugin(self, plugin_name: str) -> bool: + plugin = self.plugins.get(plugin_name) + if plugin is None: + logger.warning(f"Plugin {plugin_name} not found") + return False + + if not plugin.enabled: + plugin.enabled = True + plugin.on_enable() + logger.info(f"Enabled plugin: {plugin_name}") + + return True + + def disable_plugin(self, plugin_name: str) -> bool: + plugin = self.plugins.get(plugin_name) + if plugin is None: + logger.warning(f"Plugin {plugin_name} not found") + return False + + if plugin.enabled: + plugin.enabled = False + plugin.on_disable() + logger.info(f"Disabled plugin: {plugin_name}") + + return True + + def execute_plugins( + self, + data: Any, + plugin_type: Optional[Type[Plugin]] = None + ) -> Any: + result = data + + for plugin in self.plugins.values(): + if not plugin.enabled: + continue + + if plugin_type is not None and not isinstance(plugin, plugin_type): + continue + + try: + result = plugin.process(result) + logger.debug(f"Executed plugin: {plugin.name}") + except Exception as e: + logger.error(f"Error executing plugin {plugin.name}: {e}") + + return result + + def clear_plugins(self): + for plugin_name in list(self.plugins.keys()): + self.unregister_plugin(plugin_name) + + self.plugin_classes.clear() + logger.info("Cleared all plugins") + + def get_statistics(self) -> Dict[str, Any]: + enabled_count = sum(1 for p in self.plugins.values() if p.enabled) + + return { + "total_plugins": len(self.plugins), + "enabled_plugins": enabled_count, + "disabled_plugins": len(self.plugins) - enabled_count, + "plugin_classes": len(self.plugin_classes), + "plugin_directories": len(self.plugin_dirs), + } + + def __repr__(self) -> str: + return f"PluginManager(plugins={len(self.plugins)}, enabled={sum(1 for p in self.plugins.values() if p.enabled)})" |
