aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/conventionalrp
diff options
context:
space:
mode:
author简律纯 <i@jyunko.cn>2025-10-29 22:30:10 +0800
committer简律纯 <i@jyunko.cn>2025-10-29 22:30:10 +0800
commit75ef2a5677eddc14917c738815982c60c197e43c (patch)
tree1aea25d83e3eb95261d3baff8bb704a6f37785fb /src/conventionalrp
parentdcaf3a04f343f9f01191056fff9eb43f04d2b45f (diff)
downloadconventional_role_play-75ef2a5677eddc14917c738815982c60c197e43c.tar.gz
conventional_role_play-75ef2a5677eddc14917c738815982c60c197e43c.zip
feat: add AutoParser class for automated content classification
Diffstat (limited to 'src/conventionalrp')
-rw-r--r--src/conventionalrp/__init__.py3
-rw-r--r--src/conventionalrp/core/__init__.py3
-rw-r--r--src/conventionalrp/core/auto_parser.py467
3 files changed, 471 insertions, 2 deletions
diff --git a/src/conventionalrp/__init__.py b/src/conventionalrp/__init__.py
index ab4b17d..d5ff537 100644
--- a/src/conventionalrp/__init__.py
+++ b/src/conventionalrp/__init__.py
@@ -6,7 +6,7 @@ import sys
from importlib.metadata import version
from . import _core
-from .core import Parser, Processor, Rule, RuleEngine
+from .core import Parser, Processor, Rule, RuleEngine, AutoParser
from .utils import (
setup_logging,
get_logger,
@@ -23,6 +23,7 @@ __all__ = [
"Processor",
"Rule",
"RuleEngine",
+ "AutoParser",
"setup_logging",
"get_logger",
"ConventionalRPError",
diff --git a/src/conventionalrp/core/__init__.py b/src/conventionalrp/core/__init__.py
index 08829b8..d9fb94d 100644
--- a/src/conventionalrp/core/__init__.py
+++ b/src/conventionalrp/core/__init__.py
@@ -1,6 +1,7 @@
from .parser import Parser
from .processor import Processor
from .rules import Rule, RuleEngine
+from .auto_parser import AutoParser
-__all__ = ["Parser", "Processor", "Rule", "RuleEngine"]
+__all__ = ["Parser", "Processor", "Rule", "RuleEngine", "AutoParser"]
diff --git a/src/conventionalrp/core/auto_parser.py b/src/conventionalrp/core/auto_parser.py
new file mode 100644
index 0000000..0cb07e3
--- /dev/null
+++ b/src/conventionalrp/core/auto_parser.py
@@ -0,0 +1,467 @@
+"""
+自动分类解析器 (Auto Parser)
+"""
+
+from typing import List, Dict, Optional, Union, Set
+from pathlib import Path
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+try:
+ import thulac
+ THULAC_AVAILABLE = True
+except ImportError:
+ THULAC_AVAILABLE = False
+ logger.warning("THULAC not installed. Please install with: pip install thulac")
+
+
+class AutoParser:
+
+ CONTENT_TYPES = [
+ "dialogue", # 对话
+ "action", # 动作
+ "narration", # 旁白
+ "unknown", # 未知
+ "ooc", # 场外讨论
+ ]
+
+ # THULAC 词性标注说明
+ # n/名词 np/人名 ns/地名 ni/机构名 nz/其它专名
+ # m/数词 q/量词 mq/数量词 t/时间词 f/方位词 s/处所词
+ # v/动词 a/形容词 d/副词
+ # h/前接成分 k/后接成分
+ # i/习语 j/简称 r/代词 c/连词 p/介词
+ # u/助词 y/语气助词 e/叹词 o/拟声词
+ # g/语素 w/标点 x/其它
+
+ # 词性到内容类型的映射策略
+ POS_WEIGHTS = {
+ # 动词相关 - 倾向于动作
+ 'v': {'action': 0.8, 'narration': 0.2},
+
+ # 名词相关 - 倾向于旁白
+ 'n': {'narration': 0.7, 'dialogue': 0.3},
+ 'np': {'narration': 0.6, 'dialogue': 0.4}, # 人名可能出现在对话中
+ 'ns': {'narration': 0.8, 'dialogue': 0.2}, # 地名
+ 'ni': {'narration': 0.8, 'dialogue': 0.2}, # 机构名
+ 'nz': {'narration': 0.7, 'dialogue': 0.3}, # 其它专名
+
+ # 代词 - 倾向于对话
+ 'r': {'dialogue': 0.7, 'narration': 0.3},
+
+ # 形容词 - 倾向于旁白或对话
+ 'a': {'narration': 0.5, 'dialogue': 0.4, 'action': 0.1},
+
+ # 副词 - 可以是任何类型
+ 'd': {'dialogue': 0.4, 'narration': 0.4, 'action': 0.2},
+
+ # 助词、语气词 - 倾向于对话
+ 'u': {'dialogue': 0.8, 'narration': 0.2},
+ 'y': {'dialogue': 0.9, 'narration': 0.1}, # 语气助词
+ 'e': {'dialogue': 0.8, 'action': 0.2}, # 叹词
+
+ # 量词、数词 - 倾向于旁白
+ 'm': {'narration': 0.8, 'dialogue': 0.2},
+ 'q': {'narration': 0.7, 'dialogue': 0.3},
+ 'mq': {'narration': 0.8, 'dialogue': 0.2},
+
+ # 时间、方位、处所 - 倾向于旁白
+ 't': {'narration': 0.8, 'dialogue': 0.2},
+ 'f': {'narration': 0.7, 'dialogue': 0.3},
+ 's': {'narration': 0.8, 'dialogue': 0.2},
+ }
+
+ def __init__(self, seg_only: bool = False, user_dict: str = None):
+ """
+ 初始化自动解析器
+
+ Args:
+ seg_only: 是否只进行分词(不标注词性)
+ user_dict: 用户自定义词典路径
+ """
+ if not THULAC_AVAILABLE:
+ raise ImportError(
+ "THULAC is not installed. Please install with:\n"
+ "pip install thulac\n"
+ "Note: First installation may take a few minutes to download models."
+ )
+
+ self.thulac = thulac.thulac(
+ seg_only=seg_only,
+ user_dict=user_dict if user_dict else None
+ )
+
+ # 可选的自定义关键词列表
+ self.custom_action_words: Set[str] = set()
+ self.custom_dialogue_words: Set[str] = set()
+ self.custom_narration_words: Set[str] = set()
+ self.custom_ooc_words: Set[str] = set()
+
+ # 统计信息
+ self.statistics = {
+ "total_lines": 0,
+ "parsed_lines": 0,
+ "error_lines": 0,
+ }
+
+ # 初始化每种类型的计数
+ for content_type in self.CONTENT_TYPES:
+ self.statistics[f"{content_type}_count"] = 0
+
+ logger.info(f"AutoParser initialized with THULAC seg_only={seg_only}")
+
+ def load_custom_keywords(self,
+ action_words: Optional[List[str]] = None,
+ dialogue_words: Optional[List[str]] = None,
+ narration_words: Optional[List[str]] = None,
+ ooc_words: Optional[List[str]] = None):
+ """
+ 加载自定义关键词列表
+
+ Args:
+ action_words: 动作关键词列表
+ dialogue_words: 对话关键词列表
+ narration_words: 旁白关键词列表
+ ooc_words: OOC关键词列表
+ """
+ if action_words:
+ self.custom_action_words.update(action_words)
+ logger.info(f"Loaded {len(action_words)} custom action words")
+
+ if dialogue_words:
+ self.custom_dialogue_words.update(dialogue_words)
+ logger.info(f"Loaded {len(dialogue_words)} custom dialogue words")
+
+ if narration_words:
+ self.custom_narration_words.update(narration_words)
+ logger.info(f"Loaded {len(narration_words)} custom narration words")
+
+ if ooc_words:
+ self.custom_ooc_words.update(ooc_words)
+ logger.info(f"Loaded {len(ooc_words)} custom ooc words")
+
+ def load_keywords_from_file(self, file_path: Union[str, Path],
+ content_type: str):
+ """
+ 从文件加载关键词列表
+
+ Args:
+ file_path: 关键词文件路径,每行一个关键词
+ content_type: 内容类型 ('action', 'dialogue', 'narration')
+ """
+ file_path = Path(file_path)
+
+ if not file_path.exists():
+ raise FileNotFoundError(f"Keywords file not found: {file_path}")
+
+ with open(file_path, 'r', encoding='utf-8') as f:
+ keywords = [line.strip() for line in f if line.strip()]
+
+ if content_type == 'action':
+ self.custom_action_words.update(keywords)
+ elif content_type == 'dialogue':
+ self.custom_dialogue_words.update(keywords)
+ elif content_type == 'narration':
+ self.custom_narration_words.update(keywords)
+ elif content_type == 'ooc':
+ self.custom_ooc_words.update(keywords)
+ else:
+ raise ValueError(f"Unknown content type: {content_type}")
+
+ logger.info(f"Loaded {len(keywords)} {content_type} keywords from {file_path}")
+
+ def parse_line(self, line: str, line_number: int = None) -> Dict:
+ """
+ 解析单行日志,返回包含分类信息的字典
+
+ Args:
+ line: 要解析的文本行
+ line_number: 行号(可选)
+
+ Returns:
+ {
+ "line_number": 1,
+ "raw_text": "原始文本",
+ "content": "内容文本",
+ "content_type": "dialogue",
+ "words": ["我", "喜欢", "你"],
+ "pos_tags": ["r", "v", "r"],
+ "confidence": 0.85,
+ "analysis": {} # 词法分析详情
+ }
+ """
+ self.statistics["total_lines"] += 1
+
+ # 基础结果结构
+ result = {
+ "line_number": line_number,
+ "raw_text": line,
+ "content": "",
+ "content_type": "unknown",
+ "words": [],
+ "pos_tags": [],
+ "confidence": 0.0,
+ "analysis": {}
+ }
+
+ # 空行处理
+ if not line or not line.strip():
+ result["content_type"] = "unknown"
+ return result
+
+ text = line.strip()
+ result["content"] = text
+
+ # 使用 THULAC 进行词法分析
+ try:
+ # THULAC 返回 [(word, pos), (word, pos), ...]
+ lac_result = self.thulac.cut(text, text=False)
+
+ # 分离词和词性
+ words = [item[0] for item in lac_result]
+ pos_tags = [item[1] for item in lac_result]
+
+ result["words"] = words
+ result["pos_tags"] = pos_tags
+
+ # 基于词法分析结果分类
+ content_type, confidence, analysis = self._classify_by_thulac(words, pos_tags)
+
+ result["content_type"] = content_type
+ result["confidence"] = confidence
+ result["analysis"] = analysis
+
+ # 更新统计
+ self.statistics["parsed_lines"] += 1
+ self.statistics[f"{content_type}_count"] += 1
+
+ except Exception as e:
+ logger.error(f"Error analyzing line {line_number}: {e}")
+ self.statistics["error_lines"] += 1
+ result["analysis"]["error"] = str(e)
+
+ return result
+
+ def _classify_by_thulac(self, words: List[str], pos_tags: List[str]) -> tuple:
+ """
+ 基于 THULAC 词法分析结果进行分类
+
+ Args:
+ words: 分词结果
+ pos_tags: 词性标注结果
+
+ Returns:
+ (content_type, confidence, analysis_dict)
+ """
+ if not words or not pos_tags:
+ return "unknown", 0.0, {}
+
+ # 初始化类型分数(排除 unknown)
+ type_scores = {content_type: 0.0 for content_type in self.CONTENT_TYPES if content_type != 'unknown'}
+
+ # 分析详情
+ analysis = {
+ "word_count": len(words),
+ "pos_distribution": {},
+ "custom_matches": []
+ }
+
+ # 统计词性分布
+ for pos in pos_tags:
+ if pos != 'w': # 忽略标点
+ analysis["pos_distribution"][pos] = analysis["pos_distribution"].get(pos, 0) + 1
+
+ # 基于词性加权计算类型分数
+ for i, (word, pos) in enumerate(zip(words, pos_tags)):
+ # 跳过标点
+ if pos == 'w':
+ continue
+
+ # 检查自定义关键词(优先级最高)
+ if word in self.custom_action_words:
+ type_scores['action'] += 1.0
+ analysis["custom_matches"].append({"word": word, "type": "action"})
+ elif word in self.custom_dialogue_words:
+ type_scores['dialogue'] += 1.0
+ analysis["custom_matches"].append({"word": word, "type": "dialogue"})
+ elif word in self.custom_narration_words:
+ type_scores['narration'] += 1.0
+ analysis["custom_matches"].append({"word": word, "type": "narration"})
+ elif word in self.custom_ooc_words:
+ type_scores['ooc'] += 1.0
+ analysis["custom_matches"].append({"word": word, "type": "ooc"})
+
+ # 应用词性权重
+ if pos in self.POS_WEIGHTS:
+ weights = self.POS_WEIGHTS[pos]
+ for content_type, weight in weights.items():
+ type_scores[content_type] += weight
+
+ # 归一化分数
+ total_score = sum(type_scores.values())
+ if total_score > 0:
+ for content_type in type_scores:
+ type_scores[content_type] /= total_score
+
+ # 选择得分最高的类型
+ if type_scores:
+ content_type = max(type_scores.items(), key=lambda x: x[1])
+ analysis["type_scores"] = type_scores
+
+ # 如果最高分太低,标记为 unknown
+ if content_type[1] < 0.3:
+ return "unknown", content_type[1], analysis
+
+ return content_type[0], content_type[1], analysis
+
+ return "unknown", 0.0, analysis
+
+ def parse_log_file(self, file_path: Union[str, Path]) -> List[Dict]:
+ """
+ 批处理方法:按行解析日志文件
+
+ Args:
+ file_path: 日志文件路径
+
+ Returns:
+ 包含所有解析结果的列表,每个元素都是一个 dict
+ """
+ file_path = Path(file_path)
+
+ if not file_path.exists():
+ raise FileNotFoundError(f"Log file not found: {file_path}")
+
+ logger.info(f"Parsing log file: {file_path}")
+
+ results = []
+
+ try:
+ with open(file_path, 'r', encoding='utf-8') as f:
+ lines = f.readlines()
+
+ # 批量处理以提高效率
+ texts = [line.strip() for line in lines]
+
+ for line_num, (line, text) in enumerate(zip(lines, texts), start=1):
+ if not text:
+ # 跳过空行
+ continue
+
+ try:
+ result = self.parse_line(text, line_number=line_num)
+ results.append(result)
+ except Exception as e:
+ logger.error(f"Error parsing line {line_num}: {e}")
+ self.statistics["error_lines"] += 1
+ # 添加错误记录
+ results.append({
+ "line_number": line_num,
+ "raw_text": line.strip(),
+ "content": text,
+ "content_type": "unknown",
+ "words": [],
+ "pos_tags": [],
+ "confidence": 0.0,
+ "analysis": {"error": str(e)}
+ })
+
+ except Exception as e:
+ logger.error(f"Error reading file {file_path}: {e}")
+ raise
+
+ logger.info(f"Successfully parsed {len(results)} lines from {file_path}")
+ return results
+
+ def parse_text_batch(self, lines: List[str]) -> List[Dict]:
+ """
+ 批处理方法:解析文本行列表
+
+ Args:
+ lines: 文本行列表
+
+ Returns:
+ 包含所有解析结果的列表
+ """
+ results = []
+
+ for line_num, line in enumerate(lines, start=1):
+ try:
+ result = self.parse_line(line, line_number=line_num)
+ results.append(result)
+ except Exception as e:
+ logger.error(f"Error parsing line {line_num}: {e}")
+ self.statistics["error_lines"] += 1
+ results.append({
+ "line_number": line_num,
+ "raw_text": line,
+ "content": line.strip(),
+ "content_type": "unknown",
+ "words": [],
+ "pos_tags": [],
+ "confidence": 0.0,
+ "analysis": {"error": str(e)}
+ })
+
+ return results
+
+ def get_statistics(self) -> Dict:
+ """获取解析统计信息"""
+ stats = self.statistics.copy()
+
+ # 计算成功率
+ if stats["total_lines"] > 0:
+ stats["success_rate"] = stats["parsed_lines"] / stats["total_lines"]
+ stats["error_rate"] = stats["error_lines"] / stats["total_lines"]
+ else:
+ stats["success_rate"] = 0.0
+ stats["error_rate"] = 0.0
+
+ return stats
+
+ def reset_statistics(self):
+ """重置统计信息"""
+ for key in self.statistics:
+ self.statistics[key] = 0
+
+ def get_content_types(self) -> List[str]:
+ """获取所有支持的内容类型"""
+ return self.CONTENT_TYPES.copy()
+
+ def filter_by_type(self, parsed_data: List[Dict],
+ content_type: str) -> List[Dict]:
+ """
+ 按内容类型过滤解析结果
+
+ Args:
+ parsed_data: 解析结果列表
+ content_type: 要过滤的内容类型
+
+ Returns:
+ 过滤后的结果列表
+ """
+ if content_type not in self.CONTENT_TYPES:
+ logger.warning(f"Unknown content type: {content_type}")
+ return []
+
+ return [item for item in parsed_data if item["content_type"] == content_type]
+
+ def group_by_type(self, parsed_data: List[Dict]) -> Dict[str, List[Dict]]:
+ """
+ 按内容类型分组
+
+ Args:
+ parsed_data: 解析结果列表
+
+ Returns:
+ 按类型分组的字典
+ """
+ grouped = {content_type: [] for content_type in self.CONTENT_TYPES}
+
+ for item in parsed_data:
+ content_type = item.get("content_type", "unknown")
+ if content_type in grouped:
+ grouped[content_type].append(item)
+
+ return grouped