aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/conventionalrp
diff options
context:
space:
mode:
author简律纯 <i@jyunko.cn>2025-11-04 21:46:47 +0800
committer简律纯 <i@jyunko.cn>2025-11-04 21:46:47 +0800
commit9b4da79dbcf0f14edb181b0108633a547b3d9fa6 (patch)
tree658455c9a0405d01168269f6ae2b76edcff0fb2b /src/conventionalrp
parenta44b88b70e3624e29e5c8a2c81d11bfcd5daaed0 (diff)
downloadconventional_role_play-9b4da79dbcf0f14edb181b0108633a547b3d9fa6.tar.gz
conventional_role_play-9b4da79dbcf0f14edb181b0108633a547b3d9fa6.zip
refactor(auto_parser): Remove THULAC parser implementation from the project. This includes the entire thulac_parser.py file, which contained the logic for parsing TRPG logs using THULAC for Chinese lexical analysis. All related methods, classes, and configurations have been deleted.
Diffstat (limited to 'src/conventionalrp')
-rw-r--r--src/conventionalrp/core/auto_parser.py616
-rw-r--r--src/conventionalrp/core/thulac_parser.py502
2 files changed, 524 insertions, 594 deletions
diff --git a/src/conventionalrp/core/auto_parser.py b/src/conventionalrp/core/auto_parser.py
index 0cb07e3..4f395ea 100644
--- a/src/conventionalrp/core/auto_parser.py
+++ b/src/conventionalrp/core/auto_parser.py
@@ -1,5 +1,8 @@
"""
自动分类解析器 (Auto Parser)
+
+使用 HanLP 进行智能文本分析和分类
+HanLP 提供了更准确的中文分词、词性标注、命名实体识别和依存句法分析
"""
from typing import List, Dict, Optional, Union, Set
@@ -10,11 +13,11 @@ logger = logging.getLogger(__name__)
try:
- import thulac
- THULAC_AVAILABLE = True
+ import hanlp
+ HANLP_AVAILABLE = True
except ImportError:
- THULAC_AVAILABLE = False
- logger.warning("THULAC not installed. Please install with: pip install thulac")
+ HANLP_AVAILABLE = False
+ logger.warning("HanLP not installed. Please install with: pip install hanlp")
class AutoParser:
@@ -27,71 +30,127 @@ class AutoParser:
"ooc", # 场外讨论
]
- # THULAC 词性标注说明
- # n/名词 np/人名 ns/地名 ni/机构名 nz/其它专名
- # m/数词 q/量词 mq/数量词 t/时间词 f/方位词 s/处所词
- # v/动词 a/形容词 d/副词
- # h/前接成分 k/后接成分
- # i/习语 j/简称 r/代词 c/连词 p/介词
- # u/助词 y/语气助词 e/叹词 o/拟声词
- # g/语素 w/标点 x/其它
+ # HanLP 词性标注说明(CTB 标注集)
+ # 名词: NN-普通名词 NR-人名 NS-地名 NT-机构名 NP-专有名词
+ # 动词: VV-动词 VA-动作动词 VC-系动词 VE-有
+ # 形容词: JJ-形容词
+ # 代词: PN-代词
+ # 副词: AD-副词
+ # 数量: CD-数词 OD-序数词 M-量词
+ # 介词/连词: P-介词 CC-并列连词 CS-从属连词
+ # 助词: DEC-的 DEG-的 DER-得 DEV-地 AS-了/着/过 SP-句末助词
+ # 语气词: IJ-感叹词
+ # 标点: PU-标点
- # 词性到内容类型的映射策略
+ # 词性到内容类型的映射策略(基于 HanLP CTB 标注)
POS_WEIGHTS = {
# 动词相关 - 倾向于动作
- 'v': {'action': 0.8, 'narration': 0.2},
+ 'VV': {'action': 0.8, 'narration': 0.2}, # 动词
+ 'VA': {'action': 0.85, 'narration': 0.15}, # 动作动词(更倾向于动作)
+ 'VC': {'dialogue': 0.5, 'narration': 0.5}, # 系动词(是/为等)
+ 'VE': {'narration': 0.6, 'dialogue': 0.4}, # 有
# 名词相关 - 倾向于旁白
- 'n': {'narration': 0.7, 'dialogue': 0.3},
- 'np': {'narration': 0.6, 'dialogue': 0.4}, # 人名可能出现在对话中
- 'ns': {'narration': 0.8, 'dialogue': 0.2}, # 地名
- 'ni': {'narration': 0.8, 'dialogue': 0.2}, # 机构名
- 'nz': {'narration': 0.7, 'dialogue': 0.3}, # 其它专名
+ 'NN': {'narration': 0.7, 'dialogue': 0.3}, # 普通名词
+ 'NR': {'narration': 0.6, 'dialogue': 0.4}, # 人名(可能出现在对话中)
+ 'NS': {'narration': 0.8, 'dialogue': 0.2}, # 地名
+ 'NT': {'narration': 0.8, 'dialogue': 0.2}, # 机构名
+ 'NP': {'narration': 0.7, 'dialogue': 0.3}, # 专有名词
# 代词 - 倾向于对话
- 'r': {'dialogue': 0.7, 'narration': 0.3},
+ 'PN': {'dialogue': 0.75, 'narration': 0.25}, # 代词(我/你/他等)
# 形容词 - 倾向于旁白或对话
- 'a': {'narration': 0.5, 'dialogue': 0.4, 'action': 0.1},
+ 'JJ': {'narration': 0.5, 'dialogue': 0.4, 'action': 0.1},
# 副词 - 可以是任何类型
- 'd': {'dialogue': 0.4, 'narration': 0.4, 'action': 0.2},
-
- # 助词、语气词 - 倾向于对话
- 'u': {'dialogue': 0.8, 'narration': 0.2},
- 'y': {'dialogue': 0.9, 'narration': 0.1}, # 语气助词
- 'e': {'dialogue': 0.8, 'action': 0.2}, # 叹词
-
- # 量词、数词 - 倾向于旁白
- 'm': {'narration': 0.8, 'dialogue': 0.2},
- 'q': {'narration': 0.7, 'dialogue': 0.3},
- 'mq': {'narration': 0.8, 'dialogue': 0.2},
-
- # 时间、方位、处所 - 倾向于旁白
- 't': {'narration': 0.8, 'dialogue': 0.2},
- 'f': {'narration': 0.7, 'dialogue': 0.3},
- 's': {'narration': 0.8, 'dialogue': 0.2},
+ 'AD': {'dialogue': 0.4, 'narration': 0.4, 'action': 0.2},
+
+ # 助词 - 倾向于对话
+ 'DEC': {'dialogue': 0.7, 'narration': 0.3}, # 的(结构助词)
+ 'DEG': {'dialogue': 0.7, 'narration': 0.3}, # 的(关联助词)
+ 'DER': {'dialogue': 0.6, 'action': 0.4}, # 得(动补)
+ 'DEV': {'action': 0.7, 'narration': 0.3}, # 地(状中)
+ 'AS': {'dialogue': 0.6, 'narration': 0.4}, # 了/着/过
+ 'SP': {'dialogue': 0.85, 'narration': 0.15}, # 句末助词(吗/呢/吧等)
+
+ # 感叹词 - 强烈倾向于对话
+ 'IJ': {'dialogue': 0.9, 'action': 0.1}, # 感叹词(啊/哦/唉等)
+
+ # 数量词 - 倾向于旁白
+ 'CD': {'narration': 0.8, 'dialogue': 0.2}, # 数词
+ 'OD': {'narration': 0.8, 'dialogue': 0.2}, # 序数词
+ 'M': {'narration': 0.7, 'dialogue': 0.3}, # 量词
+
+ # 介词/连词 - 中性
+ 'P': {'narration': 0.5, 'dialogue': 0.5}, # 介词
+ 'CC': {'narration': 0.5, 'dialogue': 0.5}, # 并列连词
+ 'CS': {'narration': 0.6, 'dialogue': 0.4}, # 从属连词
}
- def __init__(self, seg_only: bool = False, user_dict: str = None):
+ def __init__(self, model: str = 'SMALL', tasks: Optional[List[str]] = None,
+ use_gpu: bool = False, devices: Optional[List[int]] = None):
"""
初始化自动解析器
Args:
- seg_only: 是否只进行分词(不标注词性)
- user_dict: 用户自定义词典路径
+ model: HanLP 模型规模
+ - 'SMALL': 小型模型(快速,适合日常使用)
+ - 'LARGE': 大型模型(更准确,需要更多资源)
+ - 'MULTI_TASK': 多任务模型
+ - 或自定义模型路径
+ tasks: 要加载的任务列表,如 ['tok', 'pos', 'ner', 'dep']
+ - tok: 分词
+ - pos: 词性标注
+ - ner: 命名实体识别
+ - dep: 依存句法分析
+ - None: 使用默认任务(分词+词性标注)
+ use_gpu: 是否使用 GPU 加速
+ devices: GPU 设备 ID 列表
"""
- if not THULAC_AVAILABLE:
+ if not HANLP_AVAILABLE:
raise ImportError(
- "THULAC is not installed. Please install with:\n"
- "pip install thulac\n"
+ "HanLP is not installed. Please install with:\n"
+ "pip install hanlp\n"
"Note: First installation may take a few minutes to download models."
)
- self.thulac = thulac.thulac(
- seg_only=seg_only,
- user_dict=user_dict if user_dict else None
- )
+ logger.info(f"Initializing HanLP AutoParser with model={model}, tasks={tasks}")
+
+ # 初始化 HanLP 流水线
+ try:
+ # 根据任务需求选择合适的预训练模型
+ if tasks is None:
+ # 默认使用分词和词性标注
+ tasks = ['tok/fine', 'pos/ctb']
+
+ # 加载 HanLP 多任务模型
+ if model == 'SMALL':
+ # 使用小型预训练模型
+ self.hanlp = hanlp.load(hanlp.pretrained.mtl.CLOSE_TOK_POS_NER_SRL_DEP_SDP_CON_ELECTRA_SMALL_ZH)
+ elif model == 'LARGE':
+ # 使用大型预训练模型
+ self.hanlp = hanlp.load(hanlp.pretrained.mtl.CLOSE_TOK_POS_NER_SRL_DEP_SDP_CON_ELECTRA_BASE_ZH)
+ elif model == 'MULTI_TASK':
+ # 使用完整多任务模型
+ self.hanlp = hanlp.load(hanlp.pretrained.mtl.UD_ONTONOTES_TOK_POS_LEM_FEA_NER_SRL_DEP_SDP_CON_XLMR_BASE)
+ else:
+ # 使用自定义模型路径或名称
+ self.hanlp = hanlp.load(model)
+
+ # 配置设备
+ if use_gpu and devices:
+ self.hanlp.to(f'cuda:{devices[0]}')
+ elif use_gpu:
+ self.hanlp.to('cuda')
+
+ logger.info("HanLP model loaded successfully")
+
+ except Exception as e:
+ logger.error(f"Failed to load HanLP model: {e}")
+ # 降级到基础分词模型
+ logger.warning("Falling back to basic tokenizer")
+ self.hanlp = hanlp.load(hanlp.pretrained.tok.COARSE_ELECTRA_SMALL_ZH)
# 可选的自定义关键词列表
self.custom_action_words: Set[str] = set()
@@ -99,6 +158,23 @@ class AutoParser:
self.custom_narration_words: Set[str] = set()
self.custom_ooc_words: Set[str] = set()
+ # 动作动词关键词库(用于增强识别)
+ self.action_verbs = {
+ '走', '跑', '看', '听', '摸', '拿', '放', '打开', '关闭',
+ '推', '拉', '举', '扔', '跳', '爬', '坐', '站', '躺',
+ '进入', '离开', '接近', '远离', '转身', '回头', '低头', '抬头',
+ '微笑', '大笑', '哭', '喊', '叫', '念', '读',
+ '投掷', '检定', '攻击', '防御', '躲避', '施法', '释放',
+ '握', '抓', '松开', '敲', '踢', '打', '砍', '刺',
+ }
+
+ # 对话标志词
+ self.dialogue_markers = {
+ '说', '讲', '道', '问', '答', '回答', '询问', '告诉',
+ '我', '你', '他', '她', '我们', '你们', '他们',
+ '吗', '呢', '吧', '啊', '哦', '嗯', '唉', '哎',
+ }
+
# 统计信息
self.statistics = {
"total_lines": 0,
@@ -110,7 +186,7 @@ class AutoParser:
for content_type in self.CONTENT_TYPES:
self.statistics[f"{content_type}_count"] = 0
- logger.info(f"AutoParser initialized with THULAC seg_only={seg_only}")
+ logger.info(f"AutoParser initialized successfully")
def load_custom_keywords(self,
action_words: Optional[List[str]] = None,
@@ -187,8 +263,10 @@ class AutoParser:
"content": "内容文本",
"content_type": "dialogue",
"words": ["我", "喜欢", "你"],
- "pos_tags": ["r", "v", "r"],
+ "pos_tags": ["PN", "VV", "PN"],
"confidence": 0.85,
+ "entities": [...], # 命名实体
+ "dependencies": [...], # 依存关系(如果可用)
"analysis": {} # 词法分析详情
}
"""
@@ -202,6 +280,8 @@ class AutoParser:
"content_type": "unknown",
"words": [],
"pos_tags": [],
+ "entities": [],
+ "dependencies": [],
"confidence": 0.0,
"analysis": {}
}
@@ -214,20 +294,49 @@ class AutoParser:
text = line.strip()
result["content"] = text
- # 使用 THULAC 进行词法分析
+ # 使用 HanLP 进行词法分析
try:
- # THULAC 返回 [(word, pos), (word, pos), ...]
- lac_result = self.thulac.cut(text, text=False)
+ # HanLP 返回字典,包含多个任务的结果
+ hanlp_result = self.hanlp(text)
- # 分离词和词性
- words = [item[0] for item in lac_result]
- pos_tags = [item[1] for item in lac_result]
+ # 提取分词结果
+ if 'tok/fine' in hanlp_result or 'tok' in hanlp_result:
+ words = hanlp_result.get('tok/fine') or hanlp_result.get('tok', [])
+ else:
+ # 如果没有分词结果,尝试使用第一个可用的分词任务
+ words = []
+ for key in hanlp_result.keys():
+ if 'tok' in key.lower():
+ words = hanlp_result[key]
+ break
+
+ # 提取词性标注结果
+ if 'pos/ctb' in hanlp_result or 'pos' in hanlp_result:
+ pos_tags = hanlp_result.get('pos/ctb') or hanlp_result.get('pos', [])
+ else:
+ # 如果没有词性结果,尝试使用第一个可用的词性任务
+ pos_tags = []
+ for key in hanlp_result.keys():
+ if 'pos' in key.lower():
+ pos_tags = hanlp_result[key]
+ break
+
+ # 提取命名实体识别结果(如果可用)
+ if 'ner' in hanlp_result or 'ner/ontonotes' in hanlp_result:
+ entities = hanlp_result.get('ner/ontonotes') or hanlp_result.get('ner', [])
+ result["entities"] = entities
+
+ # 提取依存句法分析结果(如果可用)
+ if 'dep' in hanlp_result:
+ result["dependencies"] = hanlp_result.get('dep', [])
result["words"] = words
result["pos_tags"] = pos_tags
# 基于词法分析结果分类
- content_type, confidence, analysis = self._classify_by_thulac(words, pos_tags)
+ content_type, confidence, analysis = self._classify_by_hanlp(
+ words, pos_tags, result.get("entities", []), text
+ )
result["content_type"] = content_type
result["confidence"] = confidence
@@ -244,13 +353,16 @@ class AutoParser:
return result
- def _classify_by_thulac(self, words: List[str], pos_tags: List[str]) -> tuple:
+ def _classify_by_hanlp(self, words: List[str], pos_tags: List[str],
+ entities: List, text: str) -> tuple:
"""
- 基于 THULAC 词法分析结果进行分类
+ 基于 HanLP 词法分析结果进行分类
Args:
words: 分词结果
pos_tags: 词性标注结果
+ entities: 命名实体识别结果
+ text: 原始文本
Returns:
(content_type, confidence, analysis_dict)
@@ -265,39 +377,93 @@ class AutoParser:
analysis = {
"word_count": len(words),
"pos_distribution": {},
- "custom_matches": []
+ "custom_matches": [],
+ "key_features": [],
+ "entity_count": len(entities) if entities else 0,
}
# 统计词性分布
for pos in pos_tags:
- if pos != 'w': # 忽略标点
+ if pos != 'PU': # 忽略标点
analysis["pos_distribution"][pos] = analysis["pos_distribution"].get(pos, 0) + 1
# 基于词性加权计算类型分数
for i, (word, pos) in enumerate(zip(words, pos_tags)):
# 跳过标点
- if pos == 'w':
+ if pos == 'PU':
continue
- # 检查自定义关键词(优先级最高)
+ # 检查自定义关键词(优先级最高,权重 2.0)
if word in self.custom_action_words:
- type_scores['action'] += 1.0
- analysis["custom_matches"].append({"word": word, "type": "action"})
+ type_scores['action'] += 2.0
+ analysis["custom_matches"].append({"word": word, "type": "action", "weight": 2.0})
elif word in self.custom_dialogue_words:
- type_scores['dialogue'] += 1.0
- analysis["custom_matches"].append({"word": word, "type": "dialogue"})
+ type_scores['dialogue'] += 2.0
+ analysis["custom_matches"].append({"word": word, "type": "dialogue", "weight": 2.0})
elif word in self.custom_narration_words:
- type_scores['narration'] += 1.0
- analysis["custom_matches"].append({"word": word, "type": "narration"})
+ type_scores['narration'] += 2.0
+ analysis["custom_matches"].append({"word": word, "type": "narration", "weight": 2.0})
elif word in self.custom_ooc_words:
- type_scores['ooc'] += 1.0
- analysis["custom_matches"].append({"word": word, "type": "ooc"})
+ type_scores['ooc'] += 2.0
+ analysis["custom_matches"].append({"word": word, "type": "ooc", "weight": 2.0})
+
+ # 检查内置关键词库(权重 1.5)
+ if word in self.action_verbs:
+ type_scores['action'] += 1.5
+ analysis["key_features"].append({"word": word, "type": "action_verb"})
+ elif word in self.dialogue_markers:
+ type_scores['dialogue'] += 1.5
+ analysis["key_features"].append({"word": word, "type": "dialogue_marker"})
# 应用词性权重
if pos in self.POS_WEIGHTS:
weights = self.POS_WEIGHTS[pos]
for content_type, weight in weights.items():
type_scores[content_type] += weight
+ else:
+ # 未知词性,根据前缀做简单判断
+ if pos.startswith('V'): # 动词类
+ type_scores['action'] += 0.5
+ elif pos.startswith('N'): # 名词类
+ type_scores['narration'] += 0.5
+
+ # 句末助词检测(强对话信号)
+ if pos_tags and pos_tags[-1] == 'SP':
+ type_scores['dialogue'] += 1.0
+ analysis["key_features"].append({"feature": "sentence_particle", "position": "end"})
+
+ # 感叹词检测(强对话信号)
+ if 'IJ' in pos_tags:
+ type_scores['dialogue'] += 1.2
+ analysis["key_features"].append({"feature": "interjection"})
+
+ # 人称代词检测(对话信号)
+ pronoun_count = sum(1 for pos in pos_tags if pos == 'PN')
+ if pronoun_count >= 2:
+ type_scores['dialogue'] += 0.8
+ analysis["key_features"].append({"feature": "multiple_pronouns", "count": pronoun_count})
+
+ # 命名实体检测(旁白信号)
+ if entities and len(entities) > 0:
+ type_scores['narration'] += 0.5 * len(entities)
+ analysis["key_features"].append({"feature": "named_entities", "count": len(entities)})
+
+ # 动词占比检测(动作信号)
+ verb_count = sum(1 for pos in pos_tags if pos.startswith('V'))
+ if len(pos_tags) > 0:
+ verb_ratio = verb_count / len(pos_tags)
+ if verb_ratio > 0.3:
+ type_scores['action'] += verb_ratio
+ analysis["key_features"].append({"feature": "high_verb_ratio", "ratio": verb_ratio})
+
+ # 文本长度特征
+ if len(text) < 10:
+ # 短文本更可能是对话或动作
+ type_scores['dialogue'] += 0.3
+ type_scores['action'] += 0.2
+ elif len(text) > 50:
+ # 长文本更可能是旁白
+ type_scores['narration'] += 0.3
# 归一化分数
total_score = sum(type_scores.values())
@@ -318,12 +484,13 @@ class AutoParser:
return "unknown", 0.0, analysis
- def parse_log_file(self, file_path: Union[str, Path]) -> List[Dict]:
+ def parse_log_file(self, file_path: Union[str, Path], batch_size: int = 32) -> List[Dict]:
"""
批处理方法:按行解析日志文件
Args:
file_path: 日志文件路径
+ batch_size: 批处理大小,HanLP 支持批量处理以提高效率
Returns:
包含所有解析结果的列表,每个元素都是一个 dict
@@ -341,31 +508,58 @@ class AutoParser:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
- # 批量处理以提高效率
- texts = [line.strip() for line in lines]
+ # 过滤空行并保留行号
+ non_empty_lines = [(i+1, line.strip()) for i, line in enumerate(lines) if line.strip()]
- for line_num, (line, text) in enumerate(zip(lines, texts), start=1):
- if not text:
- # 跳过空行
- continue
+ # 批量处理以提高效率
+ for i in range(0, len(non_empty_lines), batch_size):
+ batch = non_empty_lines[i:i+batch_size]
try:
- result = self.parse_line(text, line_number=line_num)
- results.append(result)
+ # 提取文本
+ texts = [text for _, text in batch]
+
+ # HanLP 批量处理
+ hanlp_results = self.hanlp(texts)
+
+ # 处理每个结果
+ for j, (line_num, text) in enumerate(batch):
+ try:
+ # 提取当前文本的分析结果
+ result = self._process_hanlp_batch_result(
+ text, hanlp_results, j, line_num
+ )
+ results.append(result)
+
+ # 更新统计
+ self.statistics["parsed_lines"] += 1
+ self.statistics[f"{result['content_type']}_count"] += 1
+
+ except Exception as e:
+ logger.error(f"Error processing line {line_num}: {e}")
+ self.statistics["error_lines"] += 1
+ results.append({
+ "line_number": line_num,
+ "raw_text": text,
+ "content": text,
+ "content_type": "unknown",
+ "words": [],
+ "pos_tags": [],
+ "entities": [],
+ "confidence": 0.0,
+ "analysis": {"error": str(e)}
+ })
+
except Exception as e:
- logger.error(f"Error parsing line {line_num}: {e}")
- self.statistics["error_lines"] += 1
- # 添加错误记录
- results.append({
- "line_number": line_num,
- "raw_text": line.strip(),
- "content": text,
- "content_type": "unknown",
- "words": [],
- "pos_tags": [],
- "confidence": 0.0,
- "analysis": {"error": str(e)}
- })
+ logger.error(f"Error in batch processing: {e}")
+ # 回退到逐行处理
+ for line_num, text in batch:
+ try:
+ result = self.parse_line(text, line_number=line_num)
+ results.append(result)
+ except Exception as e2:
+ logger.error(f"Error parsing line {line_num}: {e2}")
+ self.statistics["error_lines"] += 1
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
@@ -374,6 +568,82 @@ class AutoParser:
logger.info(f"Successfully parsed {len(results)} lines from {file_path}")
return results
+ def _process_hanlp_batch_result(self, text: str, hanlp_results: Dict,
+ index: int, line_num: int) -> Dict:
+ """
+ 处理 HanLP 批量分析的单个结果
+
+ Args:
+ text: 原始文本
+ hanlp_results: HanLP 批量处理返回的结果字典
+ index: 当前文本在批次中的索引
+ line_num: 行号
+
+ Returns:
+ 解析结果字典
+ """
+ self.statistics["total_lines"] += 1
+
+ result = {
+ "line_number": line_num,
+ "raw_text": text,
+ "content": text,
+ "content_type": "unknown",
+ "words": [],
+ "pos_tags": [],
+ "entities": [],
+ "dependencies": [],
+ "confidence": 0.0,
+ "analysis": {}
+ }
+
+ try:
+ # 提取分词结果
+ if 'tok/fine' in hanlp_results:
+ words = hanlp_results['tok/fine'][index]
+ elif 'tok' in hanlp_results:
+ words = hanlp_results['tok'][index]
+ else:
+ words = []
+
+ # 提取词性标注结果
+ if 'pos/ctb' in hanlp_results:
+ pos_tags = hanlp_results['pos/ctb'][index]
+ elif 'pos' in hanlp_results:
+ pos_tags = hanlp_results['pos'][index]
+ else:
+ pos_tags = []
+
+ # 提取命名实体
+ entities = []
+ if 'ner/ontonotes' in hanlp_results:
+ entities = hanlp_results['ner/ontonotes'][index]
+ elif 'ner' in hanlp_results:
+ entities = hanlp_results['ner'][index]
+
+ # 提取依存句法
+ if 'dep' in hanlp_results:
+ result["dependencies"] = hanlp_results['dep'][index]
+
+ result["words"] = words
+ result["pos_tags"] = pos_tags
+ result["entities"] = entities
+
+ # 分类
+ content_type, confidence, analysis = self._classify_by_hanlp(
+ words, pos_tags, entities, text
+ )
+
+ result["content_type"] = content_type
+ result["confidence"] = confidence
+ result["analysis"] = analysis
+
+ except Exception as e:
+ logger.error(f"Error processing result for line {line_num}: {e}")
+ result["analysis"]["error"] = str(e)
+
+ return result
+
def parse_text_batch(self, lines: List[str]) -> List[Dict]:
"""
批处理方法:解析文本行列表
@@ -465,3 +735,165 @@ class AutoParser:
grouped[content_type].append(item)
return grouped
+
+ def extract_entities(self, parsed_data: List[Dict]) -> Dict[str, List]:
+ """
+ 提取所有命名实体
+
+ Args:
+ parsed_data: 解析结果列表
+
+ Returns:
+ 按实体类型分组的实体列表
+ """
+ entities_by_type = {}
+
+ for item in parsed_data:
+ entities = item.get("entities", [])
+ if entities:
+ for entity in entities:
+ if isinstance(entity, tuple) and len(entity) >= 2:
+ entity_text, entity_type = entity[0], entity[1]
+ if entity_type not in entities_by_type:
+ entities_by_type[entity_type] = []
+ entities_by_type[entity_type].append({
+ "text": entity_text,
+ "line_number": item.get("line_number"),
+ "context": item.get("content")
+ })
+
+ return entities_by_type
+
+ def get_word_frequency(self, parsed_data: List[Dict],
+ min_length: int = 2,
+ exclude_pos: Optional[List[str]] = None) -> Dict[str, int]:
+ """
+ 统计词频
+
+ Args:
+ parsed_data: 解析结果列表
+ min_length: 最小词长度
+ exclude_pos: 要排除的词性列表(如 ['PU'] 排除标点)
+
+ Returns:
+ 词频字典
+ """
+ if exclude_pos is None:
+ exclude_pos = ['PU'] # 默认排除标点
+
+ word_freq = {}
+
+ for item in parsed_data:
+ words = item.get("words", [])
+ pos_tags = item.get("pos_tags", [])
+
+ for word, pos in zip(words, pos_tags):
+ if len(word) >= min_length and pos not in exclude_pos:
+ word_freq[word] = word_freq.get(word, 0) + 1
+
+ # 按频率排序
+ return dict(sorted(word_freq.items(), key=lambda x: x[1], reverse=True))
+
+ def analyze_dialogue_patterns(self, parsed_data: List[Dict]) -> Dict:
+ """
+ 分析对话模式
+
+ Args:
+ parsed_data: 解析结果列表
+
+ Returns:
+ 对话分析统计
+ """
+ dialogue_items = self.filter_by_type(parsed_data, "dialogue")
+
+ analysis = {
+ "total_dialogues": len(dialogue_items),
+ "avg_length": 0.0,
+ "common_patterns": {},
+ "pronoun_usage": {},
+ "sentence_particles": {},
+ }
+
+ if not dialogue_items:
+ return analysis
+
+ total_length = 0
+
+ for item in dialogue_items:
+ words = item.get("words", [])
+ pos_tags = item.get("pos_tags", [])
+
+ total_length += len(item.get("content", ""))
+
+ # 统计代词使用
+ for word, pos in zip(words, pos_tags):
+ if pos == 'PN':
+ analysis["pronoun_usage"][word] = analysis["pronoun_usage"].get(word, 0) + 1
+ elif pos == 'SP':
+ analysis["sentence_particles"][word] = analysis["sentence_particles"].get(word, 0) + 1
+
+ analysis["avg_length"] = total_length / len(dialogue_items)
+
+ return analysis
+
+ def export_to_json(self, parsed_data: List[Dict],
+ output_path: Union[str, Path]) -> None:
+ """
+ 导出解析结果为 JSON 文件
+
+ Args:
+ parsed_data: 解析结果列表
+ output_path: 输出文件路径
+ """
+ import json
+
+ output_path = Path(output_path)
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with open(output_path, 'w', encoding='utf-8') as f:
+ json.dump(parsed_data, f, ensure_ascii=False, indent=2)
+
+ logger.info(f"Exported {len(parsed_data)} items to {output_path}")
+
+ def get_summary(self, parsed_data: List[Dict]) -> Dict:
+ """
+ 获取解析结果摘要
+
+ Args:
+ parsed_data: 解析结果列表
+
+ Returns:
+ 摘要统计信息
+ """
+ type_counts = {content_type: 0 for content_type in self.CONTENT_TYPES}
+ confidence_sum = {content_type: 0.0 for content_type in self.CONTENT_TYPES}
+
+ total_words = 0
+ total_entities = 0
+
+ for item in parsed_data:
+ content_type = item.get("content_type", "unknown")
+ confidence = item.get("confidence", 0.0)
+
+ type_counts[content_type] += 1
+ confidence_sum[content_type] += confidence
+
+ total_words += len(item.get("words", []))
+ total_entities += len(item.get("entities", []))
+
+ # 计算平均置信度
+ avg_confidence = {}
+ for content_type in self.CONTENT_TYPES:
+ if type_counts[content_type] > 0:
+ avg_confidence[content_type] = confidence_sum[content_type] / type_counts[content_type]
+ else:
+ avg_confidence[content_type] = 0.0
+
+ return {
+ "total_items": len(parsed_data),
+ "type_distribution": type_counts,
+ "avg_confidence": avg_confidence,
+ "total_words": total_words,
+ "total_entities": total_entities,
+ "avg_words_per_item": total_words / len(parsed_data) if parsed_data else 0.0,
+ }
diff --git a/src/conventionalrp/core/thulac_parser.py b/src/conventionalrp/core/thulac_parser.py
deleted file mode 100644
index d4f7706..0000000
--- a/src/conventionalrp/core/thulac_parser.py
+++ /dev/null
@@ -1,502 +0,0 @@
-"""
-基于清华 THULAC 的智能解析器
-
-使用 THULAC (THU Lexical Analyzer for Chinese) 进行中文词法分析,
-自动识别 TRPG 日志中的对话、动作、旁白等内容类型,
-大幅简化规则配置。
-
-THULAC 是清华大学自然语言处理与社会人文计算实验室研制推出的
-一套中文词法分析工具包,具有中文分词和词性标注功能。
-
-词性标注说明:
-- n/名词 np/人名 ns/地名 ni/机构名 nz/其它专名
-- m/数词 q/量词 mq/数量词 t/时间词 f/方位词 s/处所词
-- v/动词 a/形容词 d/副词
-- h/前接成分 k/后接成分
-- i/习语 j/简称 r/代词 c/连词 p/介词
-- u/助词 y/语气助词 e/叹词 o/拟声词
-- g/语素 w/标点 x/其它
-"""
-
-import re
-from typing import List, Dict, Optional, Tuple
-import logging
-
-logger = logging.getLogger(__name__)
-
-try:
- import thulac
- THULAC_AVAILABLE = True
-except ImportError:
- THULAC_AVAILABLE = False
- logger.warning("THULAC not installed. Please install with: pip install thulac")
-
-
-
-class THULACParser:
- # 默认分隔符配置(可通过 load_rules 覆盖)
- DEFAULT_DELIMITERS = {
- "dialogue": [
- ('"', '"'), # 英文双引号
- ('"', '"'), # 中文双引号
- ("'", "'"), # 单引号
- ],
- "thought": [
- ("【", "】"), # 中文方括号
- ("[", "]"), # 英文方括号
- ],
- "action": [
- ("(", ")"), # 中文括号
- ("(", ")"), # 英文括号
- ("*", "*"), # 星号
- ("**", "**"), # 双星号
- ],
- "ooc": [
- ("//", "\n"), # 双斜杠到行尾
- (">>", "\n"), # 双右尖括号到行尾
- ]
- }
-
- POS_TYPE_MAPPING = {
- # 动词相关 -> 动作
- "v": "action", # 动词
-
- # 名词相关 -> 旁白
- "n": "narration", # 名词
- "np": "narration", # 人名
- "ns": "narration", # 地名
- "ni": "narration", # 机构名
- "nz": "narration", # 其它专名
-
- # 代词 -> 对话(第一人称/第二人称倾向于对话)
- "r": "dialogue", # 代词
-
- # 副词/形容词 -> 旁白
- "d": "narration", # 副词
- "a": "narration", # 形容词
-
- # 量词/数词 -> 旁白
- "m": "narration", # 数词
- "q": "narration", # 量词
- "mq": "narration", # 数量词
-
- # 时间/方位/处所 -> 旁白
- "t": "narration", # 时间词
- "f": "narration", # 方位词
- "s": "narration", # 处所词
-
- # 语气词/叹词 -> 对话
- "y": "dialogue", # 语气助词
- "e": "dialogue", # 叹词
- }
-
- # 动作动词关键词(优先级更高)
- ACTION_VERBS = {
- "走", "跑", "看", "听", "摸", "拿", "放", "打开", "关闭",
- "推", "拉", "举", "扔", "跳", "爬", "坐", "站", "躺",
- "进入", "离开", "接近", "远离", "转身", "回头", "低头", "抬头",
- "微笑", "大笑", "哭", "喊", "叫", "说", "讲", "念", "读",
- "投掷", "检定", "攻击", "防御", "躲避"
- }
-
- # 对话相关关键词
- DIALOGUE_INDICATORS = {
- "我", "你", "他", "她", "我们", "你们", "他们",
- "吗", "呢", "啊", "哦", "嗯", "哼", "咦", "哎",
- }
-
- def __init__(self, seg_only: bool = False, user_dict: str = None):
- """
- 初始化 THULAC 解析器
-
- Args:
- seg_only: 是否只进行分词(不标注词性)
- user_dict: 用户自定义词典路径
- """
- if not THULAC_AVAILABLE:
- raise ImportError(
- "THULAC is not installed. Please install with:\n"
- "pip install thulac\n"
- "Note: First installation may take a few minutes to download models."
- )
-
- self.seg_only = seg_only
- self.thulac = thulac.thulac(
- seg_only=seg_only,
- user_dict=user_dict if user_dict else None
- )
-
- self.delimiters = self.DEFAULT_DELIMITERS.copy()
- self.custom_words = {}
- self.statistics = {
- "total_parsed": 0,
- "dialogue_count": 0,
- "action_count": 0,
- "thought_count": 0,
- "narration_count": 0,
- "ooc_count": 0,
- }
-
- logger.info(f"THULACParser initialized with seg_only={seg_only}")
-
- def load_rules(self, rules_path: str = None, rules_dict: Dict = None):
- """
- 加载简化的规则配置
-
- Args:
- rules_path: 规则文件路径(JSON5 格式)
- rules_dict: 直接传入规则字典
-
- 规则格式示例:
- ```json
- {
- "delimiters": {
- "dialogue": [["\"", "\""], [""", """]],
- "action": [["(", ")"], ["*", "*"]],
- "thought": [["【", "】"]]
- },
- "custom_words": {
- "骰子": "n",
- "检定": "v",
- "守秘人": "np"
- }
- }
- ```
- """
- import json5
- from pathlib import Path
-
- if rules_path:
- if not Path(rules_path).exists():
- raise FileNotFoundError(f"Rules file not found: {rules_path}")
- with open(rules_path, "r", encoding="utf-8") as f:
- rules_dict = json5.load(f)
-
- if not rules_dict:
- logger.info("No rules provided, using default delimiters")
- return
-
- # 加载分隔符配置
- if "delimiters" in rules_dict:
- for content_type, delimiter_pairs in rules_dict["delimiters"].items():
- self.delimiters[content_type] = [tuple(pair) for pair in delimiter_pairs]
-
- # 加载自定义词汇(词 -> 词性)
- if "custom_words" in rules_dict:
- self.custom_words = rules_dict["custom_words"]
- logger.info(f"Loaded {len(self.custom_words)} custom words")
-
- logger.info("Rules loaded successfully")
-
- def _extract_delimited_content(self, text: str) -> List[Dict]:
- """
- 提取分隔符标记的内容
-
- Returns:
- List of {type, content, start, end, delimiter}
- """
- results = []
-
- for content_type, delimiter_pairs in self.delimiters.items():
- for start_delim, end_delim in delimiter_pairs:
- # 转义正则特殊字符
- start_pattern = re.escape(start_delim)
- end_pattern = re.escape(end_delim)
-
- # 处理到行尾的情况
- if end_delim == "\n":
- pattern = f"{start_pattern}(.+?)(?:\n|$)"
- else:
- pattern = f"{start_pattern}(.+?){end_pattern}"
-
- for match in re.finditer(pattern, text):
- results.append({
- "type": content_type,
- "content": match.group(1),
- "start": match.start(),
- "end": match.end(),
- "delimiter": (start_delim, end_delim),
- "confidence": 1.0 # 分隔符匹配的置信度为 100%
- })
-
- results.sort(key=lambda x: x["start"])
- return results
-
- def _analyze_with_thulac(self, text: str) -> List[Dict]:
- """
- 使用 THULAC 分析文本
-
- Returns:
- List of {type, content, words, tags, confidence}
- """
- result = self.thulac.cut(text, text=False) # 返回 [(word, pos), ...]
-
- if not result:
- return [{
- "type": "narration",
- "content": text,
- "words": [],
- "tags": [],
- "confidence": 0.5,
- "method": "thulac"
- }]
-
- # 分离词和词性
- words = [item[0] for item in result]
- tags = [item[1] for item in result]
-
- # 应用自定义词性(如果有)
- for i, word in enumerate(words):
- if word in self.custom_words:
- tags[i] = self.custom_words[word]
-
- # 基于词性和内容推断类型
- content_type = self._infer_content_type(words, tags)
- confidence = self._calculate_confidence(words, tags, content_type)
-
- return [{
- "type": content_type,
- "content": text,
- "words": words,
- "tags": tags,
- "confidence": confidence,
- "method": "thulac"
- }]
-
- def _infer_content_type(self, words: List[str], tags: List[str]) -> str:
- """
- 基于词性和内容推断内容类型
-
- 策略:
- 1. 检查是否包含动作动词 -> action
- 2. 检查是否包含对话指示词 -> dialogue
- 3. 统计主导词性 -> 按映射表判断
- """
- for word in words:
- if word in self.ACTION_VERBS:
- return "action"
-
- dialogue_indicators = sum(1 for w in words if w in self.DIALOGUE_INDICATORS)
- if dialogue_indicators >= 2: # 至少2个对话指示词
- return "dialogue"
-
- pos_count = {}
- for tag in tags:
- if tag == "w": # 忽略标点
- continue
- pos_count[tag] = pos_count.get(tag, 0) + 1
-
- if not pos_count:
- return "narration"
-
- # 找出最常见的词性
- dominant_pos = max(pos_count.items(), key=lambda x: x[1])[0]
-
- # 特殊规则:如果有动词,倾向于判断为动作
- if "v" in pos_count and pos_count["v"] >= len(words) * 0.3:
- return "action"
-
- # 根据主导词性映射
- return self.POS_TYPE_MAPPING.get(dominant_pos, "narration")
-
- def _calculate_confidence(self, words: List[str], tags: List[str],
- content_type: str) -> float:
- """
- 计算分析置信度
-
- 基于以下因素:
- 1. 词性标注的一致性
- 2. 关键词匹配度
- 3. 文本长度
- """
- if not words or not tags:
- return 0.5
-
- base_confidence = 0.5
-
- if content_type == "action":
- action_word_count = sum(1 for w in words if w in self.ACTION_VERBS)
- if action_word_count > 0:
- base_confidence += 0.3
- elif content_type == "dialogue":
- dialogue_word_count = sum(1 for w in words if w in self.DIALOGUE_INDICATORS)
- if dialogue_word_count >= 2:
- base_confidence += 0.3
-
- unique_pos = len(set(tag for tag in tags if tag != "w"))
- if unique_pos == 1:
- base_confidence += 0.2
- elif unique_pos <= 3:
- base_confidence += 0.1
-
- return min(1.0, base_confidence)
-
- def _merge_results(self, delimited: List[Dict], thulac_results: List[Dict],
- text: str) -> List[Dict]:
- """
- 合并分隔符提取和 THULAC 分析结果
-
- 优先级:分隔符标记 > THULAC 分析
- """
- if not delimited:
- return thulac_results
-
- results = []
- covered_ranges = set()
-
- for item in delimited:
- results.append(item)
- for i in range(item["start"], item["end"]):
- covered_ranges.add(i)
-
- uncovered_segments = []
- start = 0
- for i in range(len(text)):
- if i in covered_ranges:
- if start < i:
- uncovered_segments.append((start, i))
- start = i + 1
- if start < len(text):
- uncovered_segments.append((start, len(text)))
-
- for start, end in uncovered_segments:
- segment = text[start:end].strip()
- if segment:
- thulac_result = self._analyze_with_thulac(segment)
- for item in thulac_result:
- item["start"] = start
- item["end"] = end
- results.append(item)
-
- results.sort(key=lambda x: x.get("start", 0))
- return results
-
- def parse_line(self, line: str) -> Dict:
- """
- 解析单行日志
-
- Args:
- line: 日志行
-
- Returns:
- {
- "metadata": {...},
- "content": [...]
- }
- """
- if not line or not line.strip():
- return {"metadata": {}, "content": []}
-
- # 提取元数据(时间戳、发言人等)
- metadata = self._extract_metadata(line)
-
- # 移除元数据后的内容
- content_text = self._remove_metadata(line, metadata)
-
- # 1. 提取分隔符标记的内容
- delimited = self._extract_delimited_content(content_text)
-
- # 2. 使用 THULAC 分析未标记的内容
- thulac_results = []
- if not delimited or len(delimited) == 0:
- thulac_results = self._analyze_with_thulac(content_text)
-
- # 3. 合并结果
- content = self._merge_results(delimited, thulac_results, content_text)
-
- # 更新统计
- self.statistics["total_parsed"] += 1
- for item in content:
- type_key = f"{item['type']}_count"
- if type_key in self.statistics:
- self.statistics[type_key] += 1
-
- return {
- "metadata": metadata,
- "content": content
- }
-
- def _extract_metadata(self, line: str) -> Dict:
- """提取元数据(时间戳、发言人)"""
- metadata = {}
-
- # 常见的元数据格式
- patterns = [
- r"^\[(.+?)\]\s*<(.+?)>", # [时间] <发言人>
- r"^(.+?)\s*\|\s*(.+?)\s*:", # 时间 | 发言人:
- r"^<(.+?)>\s*@\s*(.+?)$", # <发言人> @ 时间
- ]
-
- for pattern in patterns:
- match = re.search(pattern, line)
- if match:
- metadata["timestamp"] = match.group(1)
- metadata["speaker"] = match.group(2)
- break
-
- return metadata
-
- def _remove_metadata(self, line: str, metadata: Dict) -> str:
- """移除元数据,返回纯内容"""
- if not metadata:
- return line
-
- # 移除匹配到的元数据部分
- patterns = [
- r"^\[.+?\]\s*<.+?>\s*",
- r"^.+?\s*\|\s*.+?\s*:\s*",
- r"^<.+?>\s*@\s*.+?\s*",
- ]
-
- for pattern in patterns:
- line = re.sub(pattern, "", line, count=1)
-
- return line.strip()
-
- def parse_log(self, log_path: str) -> List[Dict]:
- """
- 解析完整的 TRPG 日志文件
-
- Args:
- log_path: 日志文件路径
-
- Returns:
- 解析结果列表
- """
- from pathlib import Path
-
- if not Path(log_path).exists():
- raise FileNotFoundError(f"Log file not found: {log_path}")
-
- with open(log_path, "r", encoding="utf-8") as f:
- lines = f.readlines()
-
- results = []
- for i, line in enumerate(lines):
- line = line.strip()
- if not line:
- continue
-
- try:
- result = self.parse_line(line)
- result["line_number"] = i + 1
- results.append(result)
- except Exception as e:
- logger.error(f"Error parsing line {i+1}: {e}")
- results.append({
- "line_number": i + 1,
- "error": str(e),
- "raw_line": line
- })
-
- logger.info(f"Parsed {len(results)} lines from {log_path}")
- return results
-
- def get_statistics(self) -> Dict:
- """获取解析统计信息"""
- return self.statistics.copy()
-
- def reset_statistics(self):
- """重置统计信息"""
- for key in self.statistics:
- self.statistics[key] = 0