diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/conventionalrp/core/thulac_parser.py | 523 |
1 files changed, 523 insertions, 0 deletions
diff --git a/src/conventionalrp/core/thulac_parser.py b/src/conventionalrp/core/thulac_parser.py new file mode 100644 index 0000000..075c3bd --- /dev/null +++ b/src/conventionalrp/core/thulac_parser.py @@ -0,0 +1,523 @@ +""" +基于清华 THULAC 的智能解析器 + +使用 THULAC (THU Lexical Analyzer for Chinese) 进行中文词法分析, +自动识别 TRPG 日志中的对话、动作、旁白等内容类型, +大幅简化规则配置。 + +THULAC 是清华大学自然语言处理与社会人文计算实验室研制推出的 +一套中文词法分析工具包,具有中文分词和词性标注功能。 + +词性标注说明: +- n/名词 np/人名 ns/地名 ni/机构名 nz/其它专名 +- m/数词 q/量词 mq/数量词 t/时间词 f/方位词 s/处所词 +- v/动词 a/形容词 d/副词 +- h/前接成分 k/后接成分 +- i/习语 j/简称 r/代词 c/连词 p/介词 +- u/助词 y/语气助词 e/叹词 o/拟声词 +- g/语素 w/标点 x/其它 +""" + +import re +from typing import List, Dict, Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + +try: + import thulac + THULAC_AVAILABLE = True +except ImportError: + THULAC_AVAILABLE = False + logger.warning("THULAC not installed. Please install with: pip install thulac") + + +class THULACParser: + """ + 基于 THULAC 的智能 TRPG 日志解析器 + + 特性: + 1. 使用 THULAC 进行中文分词和词性标注 + 2. 基于词性和上下文自动识别内容类型 + 3. 极简规则配置(只需配置分隔符) + 4. 支持自定义词典扩展 + """ + + # 默认分隔符配置(可通过 load_rules 覆盖) + DEFAULT_DELIMITERS = { + "dialogue": [ + ('"', '"'), # 英文双引号 + ('"', '"'), # 中文双引号 + ("'", "'"), # 单引号 + ], + "thought": [ + ("【", "】"), # 中文方括号 + ("[", "]"), # 英文方括号 + ], + "action": [ + ("(", ")"), # 中文括号 + ("(", ")"), # 英文括号 + ("*", "*"), # 星号 + ("**", "**"), # 双星号 + ], + "ooc": [ + ("//", "\n"), # 双斜杠到行尾 + (">>", "\n"), # 双右尖括号到行尾 + ] + } + + # THULAC 词性到内容类型的映射 + POS_TYPE_MAPPING = { + # 动词相关 -> 动作 + "v": "action", # 动词 + + # 名词相关 -> 旁白 + "n": "narration", # 名词 + "np": "narration", # 人名 + "ns": "narration", # 地名 + "ni": "narration", # 机构名 + "nz": "narration", # 其它专名 + + # 代词 -> 对话(第一人称/第二人称倾向于对话) + "r": "dialogue", # 代词 + + # 副词/形容词 -> 旁白 + "d": "narration", # 副词 + "a": "narration", # 形容词 + + # 量词/数词 -> 旁白 + "m": "narration", # 数词 + "q": "narration", # 量词 + "mq": "narration", # 数量词 + + # 时间/方位/处所 -> 旁白 + "t": "narration", # 时间词 + "f": "narration", # 方位词 + "s": "narration", # 处所词 + + # 语气词/叹词 -> 对话 + "y": "dialogue", # 语气助词 + "e": "dialogue", # 叹词 + } + + # 动作动词关键词(优先级更高) + ACTION_VERBS = { + "走", "跑", "看", "听", "摸", "拿", "放", "打开", "关闭", + "推", "拉", "举", "扔", "跳", "爬", "坐", "站", "躺", + "进入", "离开", "接近", "远离", "转身", "回头", "低头", "抬头", + "微笑", "大笑", "哭", "喊", "叫", "说", "讲", "念", "读", + "投掷", "检定", "攻击", "防御", "躲避" + } + + # 对话相关关键词 + DIALOGUE_INDICATORS = { + "我", "你", "他", "她", "我们", "你们", "他们", + "吗", "呢", "啊", "哦", "嗯", "哼", "咦", "哎", + } + + def __init__(self, seg_only: bool = False, user_dict: str = None): + """ + 初始化 THULAC 解析器 + + Args: + seg_only: 是否只进行分词(不标注词性) + user_dict: 用户自定义词典路径 + """ + if not THULAC_AVAILABLE: + raise ImportError( + "THULAC is not installed. Please install with:\n" + "pip install thulac\n" + "Note: First installation may take a few minutes to download models." + ) + + self.seg_only = seg_only + self.thulac = thulac.thulac( + seg_only=seg_only, + user_dict=user_dict if user_dict else None + ) + + self.delimiters = self.DEFAULT_DELIMITERS.copy() + self.custom_words = {} + self.statistics = { + "total_parsed": 0, + "dialogue_count": 0, + "action_count": 0, + "thought_count": 0, + "narration_count": 0, + "ooc_count": 0, + } + + logger.info(f"THULACParser initialized with seg_only={seg_only}") + + def load_rules(self, rules_path: str = None, rules_dict: Dict = None): + """ + 加载简化的规则配置 + + Args: + rules_path: 规则文件路径(JSON5 格式) + rules_dict: 直接传入规则字典 + + 规则格式示例: + { + "delimiters": { + "dialogue": [["\"", "\""], [""", """]], + "action": [["(", ")"], ["*", "*"]], + "thought": [["【", "】"]] + }, + "custom_words": { + "骰子": "n", + "检定": "v", + "守秘人": "np" + } + } + """ + import json5 + from pathlib import Path + + if rules_path: + if not Path(rules_path).exists(): + raise FileNotFoundError(f"Rules file not found: {rules_path}") + with open(rules_path, "r", encoding="utf-8") as f: + rules_dict = json5.load(f) + + if not rules_dict: + logger.info("No rules provided, using default delimiters") + return + + # 加载分隔符配置 + if "delimiters" in rules_dict: + for content_type, delimiter_pairs in rules_dict["delimiters"].items(): + self.delimiters[content_type] = [tuple(pair) for pair in delimiter_pairs] + + # 加载自定义词汇(词 -> 词性) + if "custom_words" in rules_dict: + self.custom_words = rules_dict["custom_words"] + logger.info(f"Loaded {len(self.custom_words)} custom words") + + logger.info("Rules loaded successfully") + + def _extract_delimited_content(self, text: str) -> List[Dict]: + """ + 提取分隔符标记的内容 + + Returns: + List of {type, content, start, end, delimiter} + """ + results = [] + + for content_type, delimiter_pairs in self.delimiters.items(): + for start_delim, end_delim in delimiter_pairs: + # 转义正则特殊字符 + start_pattern = re.escape(start_delim) + end_pattern = re.escape(end_delim) + + # 处理到行尾的情况 + if end_delim == "\n": + pattern = f"{start_pattern}(.+?)(?:\n|$)" + else: + pattern = f"{start_pattern}(.+?){end_pattern}" + + for match in re.finditer(pattern, text): + results.append({ + "type": content_type, + "content": match.group(1), + "start": match.start(), + "end": match.end(), + "delimiter": (start_delim, end_delim), + "confidence": 1.0 # 分隔符匹配的置信度为 100% + }) + + # 按位置排序 + results.sort(key=lambda x: x["start"]) + return results + + def _analyze_with_thulac(self, text: str) -> List[Dict]: + """ + 使用 THULAC 分析文本 + + Returns: + List of {type, content, words, tags, confidence} + """ + # 使用 THULAC 进行分词和词性标注 + result = self.thulac.cut(text, text=False) # 返回 [(word, pos), ...] + + if not result: + return [{ + "type": "narration", + "content": text, + "words": [], + "tags": [], + "confidence": 0.5, + "method": "thulac" + }] + + # 分离词和词性 + words = [item[0] for item in result] + tags = [item[1] for item in result] + + # 应用自定义词性(如果有) + for i, word in enumerate(words): + if word in self.custom_words: + tags[i] = self.custom_words[word] + + # 基于词性和内容推断类型 + content_type = self._infer_content_type(words, tags) + + # 计算置信度 + confidence = self._calculate_confidence(words, tags, content_type) + + return [{ + "type": content_type, + "content": text, + "words": words, + "tags": tags, + "confidence": confidence, + "method": "thulac" + }] + + def _infer_content_type(self, words: List[str], tags: List[str]) -> str: + """ + 基于词性和内容推断内容类型 + + 策略: + 1. 检查是否包含动作动词 -> action + 2. 检查是否包含对话指示词 -> dialogue + 3. 统计主导词性 -> 按映射表判断 + """ + # 策略1: 检查动作动词 + for word in words: + if word in self.ACTION_VERBS: + return "action" + + # 策略2: 检查对话指示词 + dialogue_indicators = sum(1 for w in words if w in self.DIALOGUE_INDICATORS) + if dialogue_indicators >= 2: # 至少2个对话指示词 + return "dialogue" + + # 策略3: 统计词性 + pos_count = {} + for tag in tags: + if tag == "w": # 忽略标点 + continue + pos_count[tag] = pos_count.get(tag, 0) + 1 + + if not pos_count: + return "narration" + + # 找出最常见的词性 + dominant_pos = max(pos_count.items(), key=lambda x: x[1])[0] + + # 特殊规则:如果有动词,倾向于判断为动作 + if "v" in pos_count and pos_count["v"] >= len(words) * 0.3: + return "action" + + # 根据主导词性映射 + return self.POS_TYPE_MAPPING.get(dominant_pos, "narration") + + def _calculate_confidence(self, words: List[str], tags: List[str], + content_type: str) -> float: + """ + 计算分析置信度 + + 基于以下因素: + 1. 词性标注的一致性 + 2. 关键词匹配度 + 3. 文本长度 + """ + if not words or not tags: + return 0.5 + + base_confidence = 0.5 + + # 因素1: 关键词匹配 + if content_type == "action": + action_word_count = sum(1 for w in words if w in self.ACTION_VERBS) + if action_word_count > 0: + base_confidence += 0.3 + elif content_type == "dialogue": + dialogue_word_count = sum(1 for w in words if w in self.DIALOGUE_INDICATORS) + if dialogue_word_count >= 2: + base_confidence += 0.3 + + # 因素2: 词性一致性 + unique_pos = len(set(tag for tag in tags if tag != "w")) + if unique_pos == 1: + base_confidence += 0.2 + elif unique_pos <= 3: + base_confidence += 0.1 + + return min(1.0, base_confidence) + + def _merge_results(self, delimited: List[Dict], thulac_results: List[Dict], + text: str) -> List[Dict]: + """ + 合并分隔符提取和 THULAC 分析结果 + + 优先级:分隔符标记 > THULAC 分析 + """ + if not delimited: + return thulac_results + + results = [] + covered_ranges = set() + + # 首先添加所有分隔符标记的内容 + for item in delimited: + results.append(item) + for i in range(item["start"], item["end"]): + covered_ranges.add(i) + + # 然后分析未被覆盖的文本片段 + uncovered_segments = [] + start = 0 + for i in range(len(text)): + if i in covered_ranges: + if start < i: + uncovered_segments.append((start, i)) + start = i + 1 + if start < len(text): + uncovered_segments.append((start, len(text))) + + # 对未覆盖的片段使用 THULAC 分析 + for start, end in uncovered_segments: + segment = text[start:end].strip() + if segment: + thulac_result = self._analyze_with_thulac(segment) + for item in thulac_result: + item["start"] = start + item["end"] = end + results.append(item) + + # 按位置重新排序 + results.sort(key=lambda x: x.get("start", 0)) + return results + + def parse_line(self, line: str) -> Dict: + """ + 解析单行日志 + + Args: + line: 日志行 + + Returns: + { + "metadata": {...}, + "content": [...] + } + """ + if not line or not line.strip(): + return {"metadata": {}, "content": []} + + # 提取元数据(时间戳、发言人等) + metadata = self._extract_metadata(line) + + # 移除元数据后的内容 + content_text = self._remove_metadata(line, metadata) + + # 1. 提取分隔符标记的内容 + delimited = self._extract_delimited_content(content_text) + + # 2. 使用 THULAC 分析未标记的内容 + thulac_results = [] + if not delimited or len(delimited) == 0: + thulac_results = self._analyze_with_thulac(content_text) + + # 3. 合并结果 + content = self._merge_results(delimited, thulac_results, content_text) + + # 更新统计 + self.statistics["total_parsed"] += 1 + for item in content: + type_key = f"{item['type']}_count" + if type_key in self.statistics: + self.statistics[type_key] += 1 + + return { + "metadata": metadata, + "content": content + } + + def _extract_metadata(self, line: str) -> Dict: + """提取元数据(时间戳、发言人)""" + metadata = {} + + # 常见的元数据格式 + patterns = [ + r"^\[(.+?)\]\s*<(.+?)>", # [时间] <发言人> + r"^(.+?)\s*\|\s*(.+?)\s*:", # 时间 | 发言人: + r"^<(.+?)>\s*@\s*(.+?)$", # <发言人> @ 时间 + ] + + for pattern in patterns: + match = re.search(pattern, line) + if match: + metadata["timestamp"] = match.group(1) + metadata["speaker"] = match.group(2) + break + + return metadata + + def _remove_metadata(self, line: str, metadata: Dict) -> str: + """移除元数据,返回纯内容""" + if not metadata: + return line + + # 移除匹配到的元数据部分 + patterns = [ + r"^\[.+?\]\s*<.+?>\s*", + r"^.+?\s*\|\s*.+?\s*:\s*", + r"^<.+?>\s*@\s*.+?\s*", + ] + + for pattern in patterns: + line = re.sub(pattern, "", line, count=1) + + return line.strip() + + def parse_log(self, log_path: str) -> List[Dict]: + """ + 解析完整的 TRPG 日志文件 + + Args: + log_path: 日志文件路径 + + Returns: + 解析结果列表 + """ + from pathlib import Path + + if not Path(log_path).exists(): + raise FileNotFoundError(f"Log file not found: {log_path}") + + with open(log_path, "r", encoding="utf-8") as f: + lines = f.readlines() + + results = [] + for i, line in enumerate(lines): + line = line.strip() + if not line: + continue + + try: + result = self.parse_line(line) + result["line_number"] = i + 1 + results.append(result) + except Exception as e: + logger.error(f"Error parsing line {i+1}: {e}") + results.append({ + "line_number": i + 1, + "error": str(e), + "raw_line": line + }) + + logger.info(f"Parsed {len(results)} lines from {log_path}") + return results + + def get_statistics(self) -> Dict: + """获取解析统计信息""" + return self.statistics.copy() + + def reset_statistics(self): + """重置统计信息""" + for key in self.statistics: + self.statistics[key] = 0 |
