diff options
| author | 2025-11-04 21:46:47 +0800 | |
|---|---|---|
| committer | 2025-11-04 21:46:47 +0800 | |
| commit | 9b4da79dbcf0f14edb181b0108633a547b3d9fa6 (patch) | |
| tree | 658455c9a0405d01168269f6ae2b76edcff0fb2b /src | |
| parent | a44b88b70e3624e29e5c8a2c81d11bfcd5daaed0 (diff) | |
| download | conventional_role_play-9b4da79dbcf0f14edb181b0108633a547b3d9fa6.tar.gz conventional_role_play-9b4da79dbcf0f14edb181b0108633a547b3d9fa6.zip | |
refactor(auto_parser): Remove THULAC parser implementation from the project. This includes the entire thulac_parser.py file, which contained the logic for parsing TRPG logs using THULAC for Chinese lexical analysis. All related methods, classes, and configurations have been deleted.
Diffstat (limited to 'src')
| -rw-r--r-- | src/conventionalrp/core/auto_parser.py | 616 | ||||
| -rw-r--r-- | src/conventionalrp/core/thulac_parser.py | 502 |
2 files changed, 524 insertions, 594 deletions
diff --git a/src/conventionalrp/core/auto_parser.py b/src/conventionalrp/core/auto_parser.py index 0cb07e3..4f395ea 100644 --- a/src/conventionalrp/core/auto_parser.py +++ b/src/conventionalrp/core/auto_parser.py @@ -1,5 +1,8 @@ """ 自动分类解析器 (Auto Parser) + +使用 HanLP 进行智能文本分析和分类 +HanLP 提供了更准确的中文分词、词性标注、命名实体识别和依存句法分析 """ from typing import List, Dict, Optional, Union, Set @@ -10,11 +13,11 @@ logger = logging.getLogger(__name__) try: - import thulac - THULAC_AVAILABLE = True + import hanlp + HANLP_AVAILABLE = True except ImportError: - THULAC_AVAILABLE = False - logger.warning("THULAC not installed. Please install with: pip install thulac") + HANLP_AVAILABLE = False + logger.warning("HanLP not installed. Please install with: pip install hanlp") class AutoParser: @@ -27,71 +30,127 @@ class AutoParser: "ooc", # 场外讨论 ] - # THULAC 词性标注说明 - # n/名词 np/人名 ns/地名 ni/机构名 nz/其它专名 - # m/数词 q/量词 mq/数量词 t/时间词 f/方位词 s/处所词 - # v/动词 a/形容词 d/副词 - # h/前接成分 k/后接成分 - # i/习语 j/简称 r/代词 c/连词 p/介词 - # u/助词 y/语气助词 e/叹词 o/拟声词 - # g/语素 w/标点 x/其它 + # HanLP 词性标注说明(CTB 标注集) + # 名词: NN-普通名词 NR-人名 NS-地名 NT-机构名 NP-专有名词 + # 动词: VV-动词 VA-动作动词 VC-系动词 VE-有 + # 形容词: JJ-形容词 + # 代词: PN-代词 + # 副词: AD-副词 + # 数量: CD-数词 OD-序数词 M-量词 + # 介词/连词: P-介词 CC-并列连词 CS-从属连词 + # 助词: DEC-的 DEG-的 DER-得 DEV-地 AS-了/着/过 SP-句末助词 + # 语气词: IJ-感叹词 + # 标点: PU-标点 - # 词性到内容类型的映射策略 + # 词性到内容类型的映射策略(基于 HanLP CTB 标注) POS_WEIGHTS = { # 动词相关 - 倾向于动作 - 'v': {'action': 0.8, 'narration': 0.2}, + 'VV': {'action': 0.8, 'narration': 0.2}, # 动词 + 'VA': {'action': 0.85, 'narration': 0.15}, # 动作动词(更倾向于动作) + 'VC': {'dialogue': 0.5, 'narration': 0.5}, # 系动词(是/为等) + 'VE': {'narration': 0.6, 'dialogue': 0.4}, # 有 # 名词相关 - 倾向于旁白 - 'n': {'narration': 0.7, 'dialogue': 0.3}, - 'np': {'narration': 0.6, 'dialogue': 0.4}, # 人名可能出现在对话中 - 'ns': {'narration': 0.8, 'dialogue': 0.2}, # 地名 - 'ni': {'narration': 0.8, 'dialogue': 0.2}, # 机构名 - 'nz': {'narration': 0.7, 'dialogue': 0.3}, # 其它专名 + 'NN': {'narration': 0.7, 'dialogue': 0.3}, # 普通名词 + 'NR': {'narration': 0.6, 'dialogue': 0.4}, # 人名(可能出现在对话中) + 'NS': {'narration': 0.8, 'dialogue': 0.2}, # 地名 + 'NT': {'narration': 0.8, 'dialogue': 0.2}, # 机构名 + 'NP': {'narration': 0.7, 'dialogue': 0.3}, # 专有名词 # 代词 - 倾向于对话 - 'r': {'dialogue': 0.7, 'narration': 0.3}, + 'PN': {'dialogue': 0.75, 'narration': 0.25}, # 代词(我/你/他等) # 形容词 - 倾向于旁白或对话 - 'a': {'narration': 0.5, 'dialogue': 0.4, 'action': 0.1}, + 'JJ': {'narration': 0.5, 'dialogue': 0.4, 'action': 0.1}, # 副词 - 可以是任何类型 - 'd': {'dialogue': 0.4, 'narration': 0.4, 'action': 0.2}, - - # 助词、语气词 - 倾向于对话 - 'u': {'dialogue': 0.8, 'narration': 0.2}, - 'y': {'dialogue': 0.9, 'narration': 0.1}, # 语气助词 - 'e': {'dialogue': 0.8, 'action': 0.2}, # 叹词 - - # 量词、数词 - 倾向于旁白 - 'm': {'narration': 0.8, 'dialogue': 0.2}, - 'q': {'narration': 0.7, 'dialogue': 0.3}, - 'mq': {'narration': 0.8, 'dialogue': 0.2}, - - # 时间、方位、处所 - 倾向于旁白 - 't': {'narration': 0.8, 'dialogue': 0.2}, - 'f': {'narration': 0.7, 'dialogue': 0.3}, - 's': {'narration': 0.8, 'dialogue': 0.2}, + 'AD': {'dialogue': 0.4, 'narration': 0.4, 'action': 0.2}, + + # 助词 - 倾向于对话 + 'DEC': {'dialogue': 0.7, 'narration': 0.3}, # 的(结构助词) + 'DEG': {'dialogue': 0.7, 'narration': 0.3}, # 的(关联助词) + 'DER': {'dialogue': 0.6, 'action': 0.4}, # 得(动补) + 'DEV': {'action': 0.7, 'narration': 0.3}, # 地(状中) + 'AS': {'dialogue': 0.6, 'narration': 0.4}, # 了/着/过 + 'SP': {'dialogue': 0.85, 'narration': 0.15}, # 句末助词(吗/呢/吧等) + + # 感叹词 - 强烈倾向于对话 + 'IJ': {'dialogue': 0.9, 'action': 0.1}, # 感叹词(啊/哦/唉等) + + # 数量词 - 倾向于旁白 + 'CD': {'narration': 0.8, 'dialogue': 0.2}, # 数词 + 'OD': {'narration': 0.8, 'dialogue': 0.2}, # 序数词 + 'M': {'narration': 0.7, 'dialogue': 0.3}, # 量词 + + # 介词/连词 - 中性 + 'P': {'narration': 0.5, 'dialogue': 0.5}, # 介词 + 'CC': {'narration': 0.5, 'dialogue': 0.5}, # 并列连词 + 'CS': {'narration': 0.6, 'dialogue': 0.4}, # 从属连词 } - def __init__(self, seg_only: bool = False, user_dict: str = None): + def __init__(self, model: str = 'SMALL', tasks: Optional[List[str]] = None, + use_gpu: bool = False, devices: Optional[List[int]] = None): """ 初始化自动解析器 Args: - seg_only: 是否只进行分词(不标注词性) - user_dict: 用户自定义词典路径 + model: HanLP 模型规模 + - 'SMALL': 小型模型(快速,适合日常使用) + - 'LARGE': 大型模型(更准确,需要更多资源) + - 'MULTI_TASK': 多任务模型 + - 或自定义模型路径 + tasks: 要加载的任务列表,如 ['tok', 'pos', 'ner', 'dep'] + - tok: 分词 + - pos: 词性标注 + - ner: 命名实体识别 + - dep: 依存句法分析 + - None: 使用默认任务(分词+词性标注) + use_gpu: 是否使用 GPU 加速 + devices: GPU 设备 ID 列表 """ - if not THULAC_AVAILABLE: + if not HANLP_AVAILABLE: raise ImportError( - "THULAC is not installed. Please install with:\n" - "pip install thulac\n" + "HanLP is not installed. Please install with:\n" + "pip install hanlp\n" "Note: First installation may take a few minutes to download models." ) - self.thulac = thulac.thulac( - seg_only=seg_only, - user_dict=user_dict if user_dict else None - ) + logger.info(f"Initializing HanLP AutoParser with model={model}, tasks={tasks}") + + # 初始化 HanLP 流水线 + try: + # 根据任务需求选择合适的预训练模型 + if tasks is None: + # 默认使用分词和词性标注 + tasks = ['tok/fine', 'pos/ctb'] + + # 加载 HanLP 多任务模型 + if model == 'SMALL': + # 使用小型预训练模型 + self.hanlp = hanlp.load(hanlp.pretrained.mtl.CLOSE_TOK_POS_NER_SRL_DEP_SDP_CON_ELECTRA_SMALL_ZH) + elif model == 'LARGE': + # 使用大型预训练模型 + self.hanlp = hanlp.load(hanlp.pretrained.mtl.CLOSE_TOK_POS_NER_SRL_DEP_SDP_CON_ELECTRA_BASE_ZH) + elif model == 'MULTI_TASK': + # 使用完整多任务模型 + self.hanlp = hanlp.load(hanlp.pretrained.mtl.UD_ONTONOTES_TOK_POS_LEM_FEA_NER_SRL_DEP_SDP_CON_XLMR_BASE) + else: + # 使用自定义模型路径或名称 + self.hanlp = hanlp.load(model) + + # 配置设备 + if use_gpu and devices: + self.hanlp.to(f'cuda:{devices[0]}') + elif use_gpu: + self.hanlp.to('cuda') + + logger.info("HanLP model loaded successfully") + + except Exception as e: + logger.error(f"Failed to load HanLP model: {e}") + # 降级到基础分词模型 + logger.warning("Falling back to basic tokenizer") + self.hanlp = hanlp.load(hanlp.pretrained.tok.COARSE_ELECTRA_SMALL_ZH) # 可选的自定义关键词列表 self.custom_action_words: Set[str] = set() @@ -99,6 +158,23 @@ class AutoParser: self.custom_narration_words: Set[str] = set() self.custom_ooc_words: Set[str] = set() + # 动作动词关键词库(用于增强识别) + self.action_verbs = { + '走', '跑', '看', '听', '摸', '拿', '放', '打开', '关闭', + '推', '拉', '举', '扔', '跳', '爬', '坐', '站', '躺', + '进入', '离开', '接近', '远离', '转身', '回头', '低头', '抬头', + '微笑', '大笑', '哭', '喊', '叫', '念', '读', + '投掷', '检定', '攻击', '防御', '躲避', '施法', '释放', + '握', '抓', '松开', '敲', '踢', '打', '砍', '刺', + } + + # 对话标志词 + self.dialogue_markers = { + '说', '讲', '道', '问', '答', '回答', '询问', '告诉', + '我', '你', '他', '她', '我们', '你们', '他们', + '吗', '呢', '吧', '啊', '哦', '嗯', '唉', '哎', + } + # 统计信息 self.statistics = { "total_lines": 0, @@ -110,7 +186,7 @@ class AutoParser: for content_type in self.CONTENT_TYPES: self.statistics[f"{content_type}_count"] = 0 - logger.info(f"AutoParser initialized with THULAC seg_only={seg_only}") + logger.info(f"AutoParser initialized successfully") def load_custom_keywords(self, action_words: Optional[List[str]] = None, @@ -187,8 +263,10 @@ class AutoParser: "content": "内容文本", "content_type": "dialogue", "words": ["我", "喜欢", "你"], - "pos_tags": ["r", "v", "r"], + "pos_tags": ["PN", "VV", "PN"], "confidence": 0.85, + "entities": [...], # 命名实体 + "dependencies": [...], # 依存关系(如果可用) "analysis": {} # 词法分析详情 } """ @@ -202,6 +280,8 @@ class AutoParser: "content_type": "unknown", "words": [], "pos_tags": [], + "entities": [], + "dependencies": [], "confidence": 0.0, "analysis": {} } @@ -214,20 +294,49 @@ class AutoParser: text = line.strip() result["content"] = text - # 使用 THULAC 进行词法分析 + # 使用 HanLP 进行词法分析 try: - # THULAC 返回 [(word, pos), (word, pos), ...] - lac_result = self.thulac.cut(text, text=False) + # HanLP 返回字典,包含多个任务的结果 + hanlp_result = self.hanlp(text) - # 分离词和词性 - words = [item[0] for item in lac_result] - pos_tags = [item[1] for item in lac_result] + # 提取分词结果 + if 'tok/fine' in hanlp_result or 'tok' in hanlp_result: + words = hanlp_result.get('tok/fine') or hanlp_result.get('tok', []) + else: + # 如果没有分词结果,尝试使用第一个可用的分词任务 + words = [] + for key in hanlp_result.keys(): + if 'tok' in key.lower(): + words = hanlp_result[key] + break + + # 提取词性标注结果 + if 'pos/ctb' in hanlp_result or 'pos' in hanlp_result: + pos_tags = hanlp_result.get('pos/ctb') or hanlp_result.get('pos', []) + else: + # 如果没有词性结果,尝试使用第一个可用的词性任务 + pos_tags = [] + for key in hanlp_result.keys(): + if 'pos' in key.lower(): + pos_tags = hanlp_result[key] + break + + # 提取命名实体识别结果(如果可用) + if 'ner' in hanlp_result or 'ner/ontonotes' in hanlp_result: + entities = hanlp_result.get('ner/ontonotes') or hanlp_result.get('ner', []) + result["entities"] = entities + + # 提取依存句法分析结果(如果可用) + if 'dep' in hanlp_result: + result["dependencies"] = hanlp_result.get('dep', []) result["words"] = words result["pos_tags"] = pos_tags # 基于词法分析结果分类 - content_type, confidence, analysis = self._classify_by_thulac(words, pos_tags) + content_type, confidence, analysis = self._classify_by_hanlp( + words, pos_tags, result.get("entities", []), text + ) result["content_type"] = content_type result["confidence"] = confidence @@ -244,13 +353,16 @@ class AutoParser: return result - def _classify_by_thulac(self, words: List[str], pos_tags: List[str]) -> tuple: + def _classify_by_hanlp(self, words: List[str], pos_tags: List[str], + entities: List, text: str) -> tuple: """ - 基于 THULAC 词法分析结果进行分类 + 基于 HanLP 词法分析结果进行分类 Args: words: 分词结果 pos_tags: 词性标注结果 + entities: 命名实体识别结果 + text: 原始文本 Returns: (content_type, confidence, analysis_dict) @@ -265,39 +377,93 @@ class AutoParser: analysis = { "word_count": len(words), "pos_distribution": {}, - "custom_matches": [] + "custom_matches": [], + "key_features": [], + "entity_count": len(entities) if entities else 0, } # 统计词性分布 for pos in pos_tags: - if pos != 'w': # 忽略标点 + if pos != 'PU': # 忽略标点 analysis["pos_distribution"][pos] = analysis["pos_distribution"].get(pos, 0) + 1 # 基于词性加权计算类型分数 for i, (word, pos) in enumerate(zip(words, pos_tags)): # 跳过标点 - if pos == 'w': + if pos == 'PU': continue - # 检查自定义关键词(优先级最高) + # 检查自定义关键词(优先级最高,权重 2.0) if word in self.custom_action_words: - type_scores['action'] += 1.0 - analysis["custom_matches"].append({"word": word, "type": "action"}) + type_scores['action'] += 2.0 + analysis["custom_matches"].append({"word": word, "type": "action", "weight": 2.0}) elif word in self.custom_dialogue_words: - type_scores['dialogue'] += 1.0 - analysis["custom_matches"].append({"word": word, "type": "dialogue"}) + type_scores['dialogue'] += 2.0 + analysis["custom_matches"].append({"word": word, "type": "dialogue", "weight": 2.0}) elif word in self.custom_narration_words: - type_scores['narration'] += 1.0 - analysis["custom_matches"].append({"word": word, "type": "narration"}) + type_scores['narration'] += 2.0 + analysis["custom_matches"].append({"word": word, "type": "narration", "weight": 2.0}) elif word in self.custom_ooc_words: - type_scores['ooc'] += 1.0 - analysis["custom_matches"].append({"word": word, "type": "ooc"}) + type_scores['ooc'] += 2.0 + analysis["custom_matches"].append({"word": word, "type": "ooc", "weight": 2.0}) + + # 检查内置关键词库(权重 1.5) + if word in self.action_verbs: + type_scores['action'] += 1.5 + analysis["key_features"].append({"word": word, "type": "action_verb"}) + elif word in self.dialogue_markers: + type_scores['dialogue'] += 1.5 + analysis["key_features"].append({"word": word, "type": "dialogue_marker"}) # 应用词性权重 if pos in self.POS_WEIGHTS: weights = self.POS_WEIGHTS[pos] for content_type, weight in weights.items(): type_scores[content_type] += weight + else: + # 未知词性,根据前缀做简单判断 + if pos.startswith('V'): # 动词类 + type_scores['action'] += 0.5 + elif pos.startswith('N'): # 名词类 + type_scores['narration'] += 0.5 + + # 句末助词检测(强对话信号) + if pos_tags and pos_tags[-1] == 'SP': + type_scores['dialogue'] += 1.0 + analysis["key_features"].append({"feature": "sentence_particle", "position": "end"}) + + # 感叹词检测(强对话信号) + if 'IJ' in pos_tags: + type_scores['dialogue'] += 1.2 + analysis["key_features"].append({"feature": "interjection"}) + + # 人称代词检测(对话信号) + pronoun_count = sum(1 for pos in pos_tags if pos == 'PN') + if pronoun_count >= 2: + type_scores['dialogue'] += 0.8 + analysis["key_features"].append({"feature": "multiple_pronouns", "count": pronoun_count}) + + # 命名实体检测(旁白信号) + if entities and len(entities) > 0: + type_scores['narration'] += 0.5 * len(entities) + analysis["key_features"].append({"feature": "named_entities", "count": len(entities)}) + + # 动词占比检测(动作信号) + verb_count = sum(1 for pos in pos_tags if pos.startswith('V')) + if len(pos_tags) > 0: + verb_ratio = verb_count / len(pos_tags) + if verb_ratio > 0.3: + type_scores['action'] += verb_ratio + analysis["key_features"].append({"feature": "high_verb_ratio", "ratio": verb_ratio}) + + # 文本长度特征 + if len(text) < 10: + # 短文本更可能是对话或动作 + type_scores['dialogue'] += 0.3 + type_scores['action'] += 0.2 + elif len(text) > 50: + # 长文本更可能是旁白 + type_scores['narration'] += 0.3 # 归一化分数 total_score = sum(type_scores.values()) @@ -318,12 +484,13 @@ class AutoParser: return "unknown", 0.0, analysis - def parse_log_file(self, file_path: Union[str, Path]) -> List[Dict]: + def parse_log_file(self, file_path: Union[str, Path], batch_size: int = 32) -> List[Dict]: """ 批处理方法:按行解析日志文件 Args: file_path: 日志文件路径 + batch_size: 批处理大小,HanLP 支持批量处理以提高效率 Returns: 包含所有解析结果的列表,每个元素都是一个 dict @@ -341,31 +508,58 @@ class AutoParser: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() - # 批量处理以提高效率 - texts = [line.strip() for line in lines] + # 过滤空行并保留行号 + non_empty_lines = [(i+1, line.strip()) for i, line in enumerate(lines) if line.strip()] - for line_num, (line, text) in enumerate(zip(lines, texts), start=1): - if not text: - # 跳过空行 - continue + # 批量处理以提高效率 + for i in range(0, len(non_empty_lines), batch_size): + batch = non_empty_lines[i:i+batch_size] try: - result = self.parse_line(text, line_number=line_num) - results.append(result) + # 提取文本 + texts = [text for _, text in batch] + + # HanLP 批量处理 + hanlp_results = self.hanlp(texts) + + # 处理每个结果 + for j, (line_num, text) in enumerate(batch): + try: + # 提取当前文本的分析结果 + result = self._process_hanlp_batch_result( + text, hanlp_results, j, line_num + ) + results.append(result) + + # 更新统计 + self.statistics["parsed_lines"] += 1 + self.statistics[f"{result['content_type']}_count"] += 1 + + except Exception as e: + logger.error(f"Error processing line {line_num}: {e}") + self.statistics["error_lines"] += 1 + results.append({ + "line_number": line_num, + "raw_text": text, + "content": text, + "content_type": "unknown", + "words": [], + "pos_tags": [], + "entities": [], + "confidence": 0.0, + "analysis": {"error": str(e)} + }) + except Exception as e: - logger.error(f"Error parsing line {line_num}: {e}") - self.statistics["error_lines"] += 1 - # 添加错误记录 - results.append({ - "line_number": line_num, - "raw_text": line.strip(), - "content": text, - "content_type": "unknown", - "words": [], - "pos_tags": [], - "confidence": 0.0, - "analysis": {"error": str(e)} - }) + logger.error(f"Error in batch processing: {e}") + # 回退到逐行处理 + for line_num, text in batch: + try: + result = self.parse_line(text, line_number=line_num) + results.append(result) + except Exception as e2: + logger.error(f"Error parsing line {line_num}: {e2}") + self.statistics["error_lines"] += 1 except Exception as e: logger.error(f"Error reading file {file_path}: {e}") @@ -374,6 +568,82 @@ class AutoParser: logger.info(f"Successfully parsed {len(results)} lines from {file_path}") return results + def _process_hanlp_batch_result(self, text: str, hanlp_results: Dict, + index: int, line_num: int) -> Dict: + """ + 处理 HanLP 批量分析的单个结果 + + Args: + text: 原始文本 + hanlp_results: HanLP 批量处理返回的结果字典 + index: 当前文本在批次中的索引 + line_num: 行号 + + Returns: + 解析结果字典 + """ + self.statistics["total_lines"] += 1 + + result = { + "line_number": line_num, + "raw_text": text, + "content": text, + "content_type": "unknown", + "words": [], + "pos_tags": [], + "entities": [], + "dependencies": [], + "confidence": 0.0, + "analysis": {} + } + + try: + # 提取分词结果 + if 'tok/fine' in hanlp_results: + words = hanlp_results['tok/fine'][index] + elif 'tok' in hanlp_results: + words = hanlp_results['tok'][index] + else: + words = [] + + # 提取词性标注结果 + if 'pos/ctb' in hanlp_results: + pos_tags = hanlp_results['pos/ctb'][index] + elif 'pos' in hanlp_results: + pos_tags = hanlp_results['pos'][index] + else: + pos_tags = [] + + # 提取命名实体 + entities = [] + if 'ner/ontonotes' in hanlp_results: + entities = hanlp_results['ner/ontonotes'][index] + elif 'ner' in hanlp_results: + entities = hanlp_results['ner'][index] + + # 提取依存句法 + if 'dep' in hanlp_results: + result["dependencies"] = hanlp_results['dep'][index] + + result["words"] = words + result["pos_tags"] = pos_tags + result["entities"] = entities + + # 分类 + content_type, confidence, analysis = self._classify_by_hanlp( + words, pos_tags, entities, text + ) + + result["content_type"] = content_type + result["confidence"] = confidence + result["analysis"] = analysis + + except Exception as e: + logger.error(f"Error processing result for line {line_num}: {e}") + result["analysis"]["error"] = str(e) + + return result + def parse_text_batch(self, lines: List[str]) -> List[Dict]: """ 批处理方法:解析文本行列表 @@ -465,3 +735,165 @@ class AutoParser: grouped[content_type].append(item) return grouped + + def extract_entities(self, parsed_data: List[Dict]) -> Dict[str, List]: + """ + 提取所有命名实体 + + Args: + parsed_data: 解析结果列表 + + Returns: + 按实体类型分组的实体列表 + """ + entities_by_type = {} + + for item in parsed_data: + entities = item.get("entities", []) + if entities: + for entity in entities: + if isinstance(entity, tuple) and len(entity) >= 2: + entity_text, entity_type = entity[0], entity[1] + if entity_type not in entities_by_type: + entities_by_type[entity_type] = [] + entities_by_type[entity_type].append({ + "text": entity_text, + "line_number": item.get("line_number"), + "context": item.get("content") + }) + + return entities_by_type + + def get_word_frequency(self, parsed_data: List[Dict], + min_length: int = 2, + exclude_pos: Optional[List[str]] = None) -> Dict[str, int]: + """ + 统计词频 + + Args: + parsed_data: 解析结果列表 + min_length: 最小词长度 + exclude_pos: 要排除的词性列表(如 ['PU'] 排除标点) + + Returns: + 词频字典 + """ + if exclude_pos is None: + exclude_pos = ['PU'] # 默认排除标点 + + word_freq = {} + + for item in parsed_data: + words = item.get("words", []) + pos_tags = item.get("pos_tags", []) + + for word, pos in zip(words, pos_tags): + if len(word) >= min_length and pos not in exclude_pos: + word_freq[word] = word_freq.get(word, 0) + 1 + + # 按频率排序 + return dict(sorted(word_freq.items(), key=lambda x: x[1], reverse=True)) + + def analyze_dialogue_patterns(self, parsed_data: List[Dict]) -> Dict: + """ + 分析对话模式 + + Args: + parsed_data: 解析结果列表 + + Returns: + 对话分析统计 + """ + dialogue_items = self.filter_by_type(parsed_data, "dialogue") + + analysis = { + "total_dialogues": len(dialogue_items), + "avg_length": 0.0, + "common_patterns": {}, + "pronoun_usage": {}, + "sentence_particles": {}, + } + + if not dialogue_items: + return analysis + + total_length = 0 + + for item in dialogue_items: + words = item.get("words", []) + pos_tags = item.get("pos_tags", []) + + total_length += len(item.get("content", "")) + + # 统计代词使用 + for word, pos in zip(words, pos_tags): + if pos == 'PN': + analysis["pronoun_usage"][word] = analysis["pronoun_usage"].get(word, 0) + 1 + elif pos == 'SP': + analysis["sentence_particles"][word] = analysis["sentence_particles"].get(word, 0) + 1 + + analysis["avg_length"] = total_length / len(dialogue_items) + + return analysis + + def export_to_json(self, parsed_data: List[Dict], + output_path: Union[str, Path]) -> None: + """ + 导出解析结果为 JSON 文件 + + Args: + parsed_data: 解析结果列表 + output_path: 输出文件路径 + """ + import json + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(parsed_data, f, ensure_ascii=False, indent=2) + + logger.info(f"Exported {len(parsed_data)} items to {output_path}") + + def get_summary(self, parsed_data: List[Dict]) -> Dict: + """ + 获取解析结果摘要 + + Args: + parsed_data: 解析结果列表 + + Returns: + 摘要统计信息 + """ + type_counts = {content_type: 0 for content_type in self.CONTENT_TYPES} + confidence_sum = {content_type: 0.0 for content_type in self.CONTENT_TYPES} + + total_words = 0 + total_entities = 0 + + for item in parsed_data: + content_type = item.get("content_type", "unknown") + confidence = item.get("confidence", 0.0) + + type_counts[content_type] += 1 + confidence_sum[content_type] += confidence + + total_words += len(item.get("words", [])) + total_entities += len(item.get("entities", [])) + + # 计算平均置信度 + avg_confidence = {} + for content_type in self.CONTENT_TYPES: + if type_counts[content_type] > 0: + avg_confidence[content_type] = confidence_sum[content_type] / type_counts[content_type] + else: + avg_confidence[content_type] = 0.0 + + return { + "total_items": len(parsed_data), + "type_distribution": type_counts, + "avg_confidence": avg_confidence, + "total_words": total_words, + "total_entities": total_entities, + "avg_words_per_item": total_words / len(parsed_data) if parsed_data else 0.0, + } diff --git a/src/conventionalrp/core/thulac_parser.py b/src/conventionalrp/core/thulac_parser.py deleted file mode 100644 index d4f7706..0000000 --- a/src/conventionalrp/core/thulac_parser.py +++ /dev/null @@ -1,502 +0,0 @@ -""" -基于清华 THULAC 的智能解析器 - -使用 THULAC (THU Lexical Analyzer for Chinese) 进行中文词法分析, -自动识别 TRPG 日志中的对话、动作、旁白等内容类型, -大幅简化规则配置。 - -THULAC 是清华大学自然语言处理与社会人文计算实验室研制推出的 -一套中文词法分析工具包,具有中文分词和词性标注功能。 - -词性标注说明: -- n/名词 np/人名 ns/地名 ni/机构名 nz/其它专名 -- m/数词 q/量词 mq/数量词 t/时间词 f/方位词 s/处所词 -- v/动词 a/形容词 d/副词 -- h/前接成分 k/后接成分 -- i/习语 j/简称 r/代词 c/连词 p/介词 -- u/助词 y/语气助词 e/叹词 o/拟声词 -- g/语素 w/标点 x/其它 -""" - -import re -from typing import List, Dict, Optional, Tuple -import logging - -logger = logging.getLogger(__name__) - -try: - import thulac - THULAC_AVAILABLE = True -except ImportError: - THULAC_AVAILABLE = False - logger.warning("THULAC not installed. Please install with: pip install thulac") - - - -class THULACParser: - # 默认分隔符配置(可通过 load_rules 覆盖) - DEFAULT_DELIMITERS = { - "dialogue": [ - ('"', '"'), # 英文双引号 - ('"', '"'), # 中文双引号 - ("'", "'"), # 单引号 - ], - "thought": [ - ("【", "】"), # 中文方括号 - ("[", "]"), # 英文方括号 - ], - "action": [ - ("(", ")"), # 中文括号 - ("(", ")"), # 英文括号 - ("*", "*"), # 星号 - ("**", "**"), # 双星号 - ], - "ooc": [ - ("//", "\n"), # 双斜杠到行尾 - (">>", "\n"), # 双右尖括号到行尾 - ] - } - - POS_TYPE_MAPPING = { - # 动词相关 -> 动作 - "v": "action", # 动词 - - # 名词相关 -> 旁白 - "n": "narration", # 名词 - "np": "narration", # 人名 - "ns": "narration", # 地名 - "ni": "narration", # 机构名 - "nz": "narration", # 其它专名 - - # 代词 -> 对话(第一人称/第二人称倾向于对话) - "r": "dialogue", # 代词 - - # 副词/形容词 -> 旁白 - "d": "narration", # 副词 - "a": "narration", # 形容词 - - # 量词/数词 -> 旁白 - "m": "narration", # 数词 - "q": "narration", # 量词 - "mq": "narration", # 数量词 - - # 时间/方位/处所 -> 旁白 - "t": "narration", # 时间词 - "f": "narration", # 方位词 - "s": "narration", # 处所词 - - # 语气词/叹词 -> 对话 - "y": "dialogue", # 语气助词 - "e": "dialogue", # 叹词 - } - - # 动作动词关键词(优先级更高) - ACTION_VERBS = { - "走", "跑", "看", "听", "摸", "拿", "放", "打开", "关闭", - "推", "拉", "举", "扔", "跳", "爬", "坐", "站", "躺", - "进入", "离开", "接近", "远离", "转身", "回头", "低头", "抬头", - "微笑", "大笑", "哭", "喊", "叫", "说", "讲", "念", "读", - "投掷", "检定", "攻击", "防御", "躲避" - } - - # 对话相关关键词 - DIALOGUE_INDICATORS = { - "我", "你", "他", "她", "我们", "你们", "他们", - "吗", "呢", "啊", "哦", "嗯", "哼", "咦", "哎", - } - - def __init__(self, seg_only: bool = False, user_dict: str = None): - """ - 初始化 THULAC 解析器 - - Args: - seg_only: 是否只进行分词(不标注词性) - user_dict: 用户自定义词典路径 - """ - if not THULAC_AVAILABLE: - raise ImportError( - "THULAC is not installed. Please install with:\n" - "pip install thulac\n" - "Note: First installation may take a few minutes to download models." - ) - - self.seg_only = seg_only - self.thulac = thulac.thulac( - seg_only=seg_only, - user_dict=user_dict if user_dict else None - ) - - self.delimiters = self.DEFAULT_DELIMITERS.copy() - self.custom_words = {} - self.statistics = { - "total_parsed": 0, - "dialogue_count": 0, - "action_count": 0, - "thought_count": 0, - "narration_count": 0, - "ooc_count": 0, - } - - logger.info(f"THULACParser initialized with seg_only={seg_only}") - - def load_rules(self, rules_path: str = None, rules_dict: Dict = None): - """ - 加载简化的规则配置 - - Args: - rules_path: 规则文件路径(JSON5 格式) - rules_dict: 直接传入规则字典 - - 规则格式示例: - ```json - { - "delimiters": { - "dialogue": [["\"", "\""], [""", """]], - "action": [["(", ")"], ["*", "*"]], - "thought": [["【", "】"]] - }, - "custom_words": { - "骰子": "n", - "检定": "v", - "守秘人": "np" - } - } - ``` - """ - import json5 - from pathlib import Path - - if rules_path: - if not Path(rules_path).exists(): - raise FileNotFoundError(f"Rules file not found: {rules_path}") - with open(rules_path, "r", encoding="utf-8") as f: - rules_dict = json5.load(f) - - if not rules_dict: - logger.info("No rules provided, using default delimiters") - return - - # 加载分隔符配置 - if "delimiters" in rules_dict: - for content_type, delimiter_pairs in rules_dict["delimiters"].items(): - self.delimiters[content_type] = [tuple(pair) for pair in delimiter_pairs] - - # 加载自定义词汇(词 -> 词性) - if "custom_words" in rules_dict: - self.custom_words = rules_dict["custom_words"] - logger.info(f"Loaded {len(self.custom_words)} custom words") - - logger.info("Rules loaded successfully") - - def _extract_delimited_content(self, text: str) -> List[Dict]: - """ - 提取分隔符标记的内容 - - Returns: - List of {type, content, start, end, delimiter} - """ - results = [] - - for content_type, delimiter_pairs in self.delimiters.items(): - for start_delim, end_delim in delimiter_pairs: - # 转义正则特殊字符 - start_pattern = re.escape(start_delim) - end_pattern = re.escape(end_delim) - - # 处理到行尾的情况 - if end_delim == "\n": - pattern = f"{start_pattern}(.+?)(?:\n|$)" - else: - pattern = f"{start_pattern}(.+?){end_pattern}" - - for match in re.finditer(pattern, text): - results.append({ - "type": content_type, - "content": match.group(1), - "start": match.start(), - "end": match.end(), - "delimiter": (start_delim, end_delim), - "confidence": 1.0 # 分隔符匹配的置信度为 100% - }) - - results.sort(key=lambda x: x["start"]) - return results - - def _analyze_with_thulac(self, text: str) -> List[Dict]: - """ - 使用 THULAC 分析文本 - - Returns: - List of {type, content, words, tags, confidence} - """ - result = self.thulac.cut(text, text=False) # 返回 [(word, pos), ...] - - if not result: - return [{ - "type": "narration", - "content": text, - "words": [], - "tags": [], - "confidence": 0.5, - "method": "thulac" - }] - - # 分离词和词性 - words = [item[0] for item in result] - tags = [item[1] for item in result] - - # 应用自定义词性(如果有) - for i, word in enumerate(words): - if word in self.custom_words: - tags[i] = self.custom_words[word] - - # 基于词性和内容推断类型 - content_type = self._infer_content_type(words, tags) - confidence = self._calculate_confidence(words, tags, content_type) - - return [{ - "type": content_type, - "content": text, - "words": words, - "tags": tags, - "confidence": confidence, - "method": "thulac" - }] - - def _infer_content_type(self, words: List[str], tags: List[str]) -> str: - """ - 基于词性和内容推断内容类型 - - 策略: - 1. 检查是否包含动作动词 -> action - 2. 检查是否包含对话指示词 -> dialogue - 3. 统计主导词性 -> 按映射表判断 - """ - for word in words: - if word in self.ACTION_VERBS: - return "action" - - dialogue_indicators = sum(1 for w in words if w in self.DIALOGUE_INDICATORS) - if dialogue_indicators >= 2: # 至少2个对话指示词 - return "dialogue" - - pos_count = {} - for tag in tags: - if tag == "w": # 忽略标点 - continue - pos_count[tag] = pos_count.get(tag, 0) + 1 - - if not pos_count: - return "narration" - - # 找出最常见的词性 - dominant_pos = max(pos_count.items(), key=lambda x: x[1])[0] - - # 特殊规则:如果有动词,倾向于判断为动作 - if "v" in pos_count and pos_count["v"] >= len(words) * 0.3: - return "action" - - # 根据主导词性映射 - return self.POS_TYPE_MAPPING.get(dominant_pos, "narration") - - def _calculate_confidence(self, words: List[str], tags: List[str], - content_type: str) -> float: - """ - 计算分析置信度 - - 基于以下因素: - 1. 词性标注的一致性 - 2. 关键词匹配度 - 3. 文本长度 - """ - if not words or not tags: - return 0.5 - - base_confidence = 0.5 - - if content_type == "action": - action_word_count = sum(1 for w in words if w in self.ACTION_VERBS) - if action_word_count > 0: - base_confidence += 0.3 - elif content_type == "dialogue": - dialogue_word_count = sum(1 for w in words if w in self.DIALOGUE_INDICATORS) - if dialogue_word_count >= 2: - base_confidence += 0.3 - - unique_pos = len(set(tag for tag in tags if tag != "w")) - if unique_pos == 1: - base_confidence += 0.2 - elif unique_pos <= 3: - base_confidence += 0.1 - - return min(1.0, base_confidence) - - def _merge_results(self, delimited: List[Dict], thulac_results: List[Dict], - text: str) -> List[Dict]: - """ - 合并分隔符提取和 THULAC 分析结果 - - 优先级:分隔符标记 > THULAC 分析 - """ - if not delimited: - return thulac_results - - results = [] - covered_ranges = set() - - for item in delimited: - results.append(item) - for i in range(item["start"], item["end"]): - covered_ranges.add(i) - - uncovered_segments = [] - start = 0 - for i in range(len(text)): - if i in covered_ranges: - if start < i: - uncovered_segments.append((start, i)) - start = i + 1 - if start < len(text): - uncovered_segments.append((start, len(text))) - - for start, end in uncovered_segments: - segment = text[start:end].strip() - if segment: - thulac_result = self._analyze_with_thulac(segment) - for item in thulac_result: - item["start"] = start - item["end"] = end - results.append(item) - - results.sort(key=lambda x: x.get("start", 0)) - return results - - def parse_line(self, line: str) -> Dict: - """ - 解析单行日志 - - Args: - line: 日志行 - - Returns: - { - "metadata": {...}, - "content": [...] - } - """ - if not line or not line.strip(): - return {"metadata": {}, "content": []} - - # 提取元数据(时间戳、发言人等) - metadata = self._extract_metadata(line) - - # 移除元数据后的内容 - content_text = self._remove_metadata(line, metadata) - - # 1. 提取分隔符标记的内容 - delimited = self._extract_delimited_content(content_text) - - # 2. 使用 THULAC 分析未标记的内容 - thulac_results = [] - if not delimited or len(delimited) == 0: - thulac_results = self._analyze_with_thulac(content_text) - - # 3. 合并结果 - content = self._merge_results(delimited, thulac_results, content_text) - - # 更新统计 - self.statistics["total_parsed"] += 1 - for item in content: - type_key = f"{item['type']}_count" - if type_key in self.statistics: - self.statistics[type_key] += 1 - - return { - "metadata": metadata, - "content": content - } - - def _extract_metadata(self, line: str) -> Dict: - """提取元数据(时间戳、发言人)""" - metadata = {} - - # 常见的元数据格式 - patterns = [ - r"^\[(.+?)\]\s*<(.+?)>", # [时间] <发言人> - r"^(.+?)\s*\|\s*(.+?)\s*:", # 时间 | 发言人: - r"^<(.+?)>\s*@\s*(.+?)$", # <发言人> @ 时间 - ] - - for pattern in patterns: - match = re.search(pattern, line) - if match: - metadata["timestamp"] = match.group(1) - metadata["speaker"] = match.group(2) - break - - return metadata - - def _remove_metadata(self, line: str, metadata: Dict) -> str: - """移除元数据,返回纯内容""" - if not metadata: - return line - - # 移除匹配到的元数据部分 - patterns = [ - r"^\[.+?\]\s*<.+?>\s*", - r"^.+?\s*\|\s*.+?\s*:\s*", - r"^<.+?>\s*@\s*.+?\s*", - ] - - for pattern in patterns: - line = re.sub(pattern, "", line, count=1) - - return line.strip() - - def parse_log(self, log_path: str) -> List[Dict]: - """ - 解析完整的 TRPG 日志文件 - - Args: - log_path: 日志文件路径 - - Returns: - 解析结果列表 - """ - from pathlib import Path - - if not Path(log_path).exists(): - raise FileNotFoundError(f"Log file not found: {log_path}") - - with open(log_path, "r", encoding="utf-8") as f: - lines = f.readlines() - - results = [] - for i, line in enumerate(lines): - line = line.strip() - if not line: - continue - - try: - result = self.parse_line(line) - result["line_number"] = i + 1 - results.append(result) - except Exception as e: - logger.error(f"Error parsing line {i+1}: {e}") - results.append({ - "line_number": i + 1, - "error": str(e), - "raw_line": line - }) - - logger.info(f"Parsed {len(results)} lines from {log_path}") - return results - - def get_statistics(self) -> Dict: - """获取解析统计信息""" - return self.statistics.copy() - - def reset_statistics(self): - """重置统计信息""" - for key in self.statistics: - self.statistics[key] = 0 |
