diff options
| author | 2026-01-05 14:02:08 +0800 | |
|---|---|---|
| committer | 2026-01-05 14:02:08 +0800 | |
| commit | 25380fb4de77966a0f3d00681be25857c27b0869 (patch) | |
| tree | faaf43f76629bbeab76448fb248c1ba9d0c4cbaf /utils | |
| parent | f4f9c541e9917fa614e6e1b8e737167f44c89c43 (diff) | |
| download | base-model-25380fb4de77966a0f3d00681be25857c27b0869.tar.gz base-model-25380fb4de77966a0f3d00681be25857c27b0869.zip | |
feat: enhance log annotation process with improved concurrency and detailed prompt structure
Diffstat (limited to 'utils')
| -rw-r--r-- | utils/llm_seri.py | 162 |
1 files changed, 117 insertions, 45 deletions
diff --git a/utils/llm_seri.py b/utils/llm_seri.py index ac8174b..17fee40 100644 --- a/utils/llm_seri.py +++ b/utils/llm_seri.py @@ -7,6 +7,7 @@ import json import os import time +import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from typing import Any, Dict, List @@ -21,19 +22,48 @@ def get_annotation_prompt(text: str) -> str: """ 构造 LLM 标注 prompt(只返回类型和文本,我们自己计算位置) """ - return f"""你是一个专业的文本标注助手。请对以下 TRPG 游戏日志进行标注,标注格式为 JSON。 + return f"""你是一个专业的 TRPG 游戏日志标注助手。请根据**语义**对以下文本进行标注,标注格式为 JSON。 -## 标签类型及规则 +## 标签类型及语义判断规则 -1. **speaker**: 说话人/玩家名字(位于文本开头,后跟空格和时间戳) -2. **timestamp**: 时间戳(格式如:2024-06-08 21:44:59) -3. **dialogue**: 角色对话(用引号包裹的说话内容,如 ""..."") -4. **action**: 动作/指令(以 . 开头的骰子指令,如 .rd10+7、.ww12a9+1) -5. **comment**: 其他描述性内容(角色扮演描述、系统消息、动作描述等) +1. **speaker**:说话人/玩家名字 + - 通常位于文本开头 + - 后面紧跟空格和时间戳 + - 格式特征:`名字 时间戳` + +2. **timestamp**:时间戳 + - 时间格式:`YYYY-MM-DD HH:MM:SS` + - 紧跟在 speaker 之后 + +3. **action**:骰子/游戏指令 + - 以点号 `.` 开头的指令 + - 例如:`.rd10+7`、`.ww12a9+1`、`.rst` + +4. **dialogue**:角色对话/说话内容 + - **判断依据:是否为角色口中说出的话** + - 可能有引号包裹(\"\"\"\"、\"\"\"\"或\"\"\"\") + - **也可能没有引号**,需要根据语义判断 + - 间接引语、心理独白如果明显是"说话"性质也应标注 + - 关键:这段文字是角色"说"出来的,而不是"做"的动作描述 + +5. **comment**:其他所有内容 + - 动作描述(角色做了什么) + - 场景描写 + - 系统消息(如骰子结果) + - 心理活动(非说话形式) + - GM 描述 + - 其他不属于上述类型的内容 + +## 标注原则 + +- **按文本出现顺序标注** +- **根据语义判断类型**,不要仅依赖格式特征 +- **不要遗漏任何内容**,文本的所有部分都必须被标注 +- **保持文本原样**,标注的 text 必须与原文完全一致 ## 标注示例 -### 示例 1: +### 示例 1:纯动作描述 文本:`风雨 2024-06-08 21:44:59\\n剧烈的疼痛从头颅深处一波波地涌出,仿佛每一次脉搏的跳动都在击打你的头骨。` 标注: ```json @@ -46,7 +76,7 @@ def get_annotation_prompt(text: str) -> str: }} ``` -### 示例 2: +### 示例 2:有引号的对话 + 动作 文本:`莎莎 2024-06-08 21:46:26\\n"呜哇..."#下意识去拿法杖,但启动施法起手后大脑里一片空白...` 标注: ```json @@ -60,7 +90,34 @@ def get_annotation_prompt(text: str) -> str: }} ``` -### 示例 3: +### 示例 3:无引号的对话(语义判断) +文本:`风雨 2024-06-08 21:50:15\\n我不行了,��带我离开这里` +标注: +```json +{{ + "annotations": [ + {{"type": "speaker", "text": "风雨"}}, + {{"type": "timestamp", "text": "2024-06-08 21:50:15"}}, + {{"type": "dialogue", "text": "我不行了,快带我离开这里"}} + ] +}} +``` + +### 示例 4:对话 + 动作混合 +文本:`白麗 霊夢 2024-06-08 21:51:00\\n好的,我明白了。他点点头,转身离开了房间。` +标注: +```json +{{ + "annotations": [ + {{"type": "speaker", "text": "白麗 霊夢"}}, + {{"type": "timestamp", "text": "2024-06-08 21:51:00"}}, + {{"type": "dialogue", "text": "好的,我明白了。"}}, + {{"type": "comment", "text": "他点点头,转身离开了房间。"}} + ] +}} +``` + +### 示例 5:纯动作指令 文本:`莎莎 2024-06-08 21:49:51\\n.rd10+7` 标注: ```json @@ -73,7 +130,7 @@ def get_annotation_prompt(text: str) -> str: }} ``` -### 示例 4: +### 示例 6:系统消息 文本:`白麗 霊夢 2024-06-08 21:49:51\\n莎莎 的出目是\\nD10+7=6+7=13` 标注: ```json @@ -86,12 +143,27 @@ def get_annotation_prompt(text: str) -> str: }} ``` -## 注意事项 +### 示例 7:多段对话混合描述 +文本:`白麗 霊夢 2024-06-08 21:52:00\\n等等,这是什么?他指着地上的物品,疑惑地问道。这是...魔法道具吗?` +标注: +```json +{{ + "annotations": [ + {{"type": "speaker", "text": "白麗 霊夢"}}, + {{"type": "timestamp", "text": "2024-06-08 21:52:00"}}, + {{"type": "dialogue", "text": "等等,这是什么?"}}, + {{"type": "comment", "text": "他指着地上的物品,疑惑地问道。"}}, + {{"type": "dialogue", "text": "这是...魔法道具吗?"}} + ] +}} +``` + +## 重要提示 -- 只返回标注的类型(type)和文本内容(text),不需要返回位置信息 -- 确保标注的文本内容与原文本完全一致 +- **dialogue 的判断核心是"这是角色说的话吗"**,而不是"有没有引号" +- 如果文本是角色直接说出的内容,即使没有引号也应标注为 dialogue +- 如果文本是动作、场景、心理描写等非说话内容,应标注为 comment - 只返回 JSON,不要添加任何其他解释性文字 -- 如果文本中不包含某种标签类型,就不要包含该标签 ## 待标注文本 @@ -165,7 +237,9 @@ def call_llm_api(prompt: str, index: int, total: int) -> Dict[str, Any]: return {"annotations": []} -def calculate_annotation_positions(original_text: str, llm_annotations: List[Dict[str, Any]]) -> List[Dict[str, Any]]: +def calculate_annotation_positions( + original_text: str, llm_annotations: List[Dict[str, Any]] +) -> List[Dict[str, Any]]: """ 根据 LLM 返回的标注文本计算在原文本中的位置 @@ -195,12 +269,9 @@ def calculate_annotation_positions(original_text: str, llm_annotations: List[Dic pos = original_text.find(ann_text) if pos != -1: - result_annotations.append({ - "type": ann_type, - "start": pos, - "end": pos + len(ann_text), - "text": ann_text - }) + result_annotations.append( + {"type": ann_type, "start": pos, "end": pos + len(ann_text), "text": ann_text} + ) # 更新当前位置为标注结束位置 current_pos = pos + len(ann_text) @@ -217,7 +288,7 @@ def convert_to_label_studio_format( annotation_id = str(uuid.uuid4()) - # 计算标注���置 + # 计算标注位置 annotations = calculate_annotation_positions(text, llm_annotations) # 构建 result 数组 @@ -294,7 +365,7 @@ def convert_to_label_studio_format( } -def process_logs(input_path: str, output_path: str, concurrency: int = 5, batch_size: int = 50): +def process_logs(input_path: str, output_path: str, concurrency: int = 10, batch_size: int = 50): """ 处理日志文件并进行自动标注(支持高并发) @@ -302,7 +373,7 @@ def process_logs(input_path: str, output_path: str, concurrency: int = 5, batch_ input_path: 输入的 processed_logs.json 文件路径 output_path: 输出的标注结果文件路径 concurrency: 并发线程数 - batch_size: 批处理保存大小 + batch_size: 批处理保存大小(已弃用,保留用于兼容性) """ # 读取输入文件 print(f"读取输入文件: {input_path}") @@ -313,9 +384,17 @@ def process_logs(input_path: str, output_path: str, concurrency: int = 5, batch_ print(f"总共 {total} 条日志需要标注") print(f"并发数: {concurrency}") - results = [] - # 用于保持顺序的字典 + # 用于保持顺序的字典和线程锁 results_dict = {} + results_lock = threading.Lock() + completed_count = [0] # 使用列表以便在闭包中修改 + + def write_results_to_file(): + """将当前已完成的结果按顺序写入文件""" + with results_lock: + sorted_results = [results_dict[i] for i in sorted(results_dict.keys())] + with open(output_path, "w", encoding="utf-8") as f: + json.dump(sorted_results, f, ensure_ascii=False, indent=2) def process_single_log(index: int, log_entry: Dict[str, Any]): text = log_entry.get("text", "") @@ -350,26 +429,19 @@ def process_logs(input_path: str, output_path: str, concurrency: int = 5, batch_ try: result = future.result() if result is not None: - results_dict[index] = result + with results_lock: + results_dict[index] = result + completed_count[0] += 1 + current_completed = completed_count[0] + + # 立即写入文件 + write_results_to_file() + print(f"[{index}/{total}] 已保存,完成进度: {current_completed}/{total}") except Exception as e: print(f"[{index}/{total}] 处理失败: {e}") - # 按顺序整理结果 - for index in sorted(results_dict.keys()): - results.append(results_dict[index]) - - # 每处理 batch_size 条保存一次 - if index % batch_size == 0: - with open(output_path, "w", encoding="utf-8") as f: - json.dump(results, f, ensure_ascii=False, indent=2) - print(f"\n已保存 {index} 条结果到 {output_path}") - - # 保存最终结果 - print(f"\n保存最终结果到: {output_path}") - with open(output_path, "w", encoding="utf-8") as f: - json.dump(results, f, ensure_ascii=False, indent=2) - - print(f"完成!共标注 {len(results)} 条日志") + print(f"\n完成!共标注 {len(results_dict)} 条日志") + print(f"结果已保存到: {output_path}") def main(): @@ -389,7 +461,7 @@ def main(): return # 开始处理 - process_logs(input_path, output_path, concurrency=5, batch_size=50) + process_logs(input_path, output_path, concurrency=10, batch_size=50) if __name__ == "__main__": |
