From 9f7e4b5d0a4aa6d9536a6eb1471a110d716e2566 Mon Sep 17 00:00:00 2001 From: pine Date: Mon, 14 Apr 2025 20:51:18 +0800 Subject: feat: :art: refactor log parsing logic with simplified rules and priority-based matching --- src/conventionalrp/core/parser.py | 193 ++++++++++++++++++++++++-------------- test/example_rule.json | 34 ++++--- 2 files changed, 142 insertions(+), 85 deletions(-) diff --git a/src/conventionalrp/core/parser.py b/src/conventionalrp/core/parser.py index f214c4c..a3dd947 100644 --- a/src/conventionalrp/core/parser.py +++ b/src/conventionalrp/core/parser.py @@ -1,7 +1,7 @@ import json5 import re from pathlib import Path - +from typing import List, Dict, Optional class Parser: def __init__(self): @@ -25,97 +25,144 @@ class Parser: self.rules = rules print(f"Rules loaded: {rules}\n") - def parse_log(self, log_path: str): + def parse_log(self, log_path: str) -> List[Dict]: """Parse the TRPG log based on loaded rules.""" - parsed_data = [] - if not Path(log_path).exists(): - raise FileNotFoundError(f"No such file or directory: {log_path} ") + raise FileNotFoundError(f"No such file or directory: {log_path}") with open(log_path, "r", encoding="utf-8") as f: log_content = f.read().splitlines() + parsed_data = [] current_metadata = None current_content = [] # Iterate each line of the log for line in log_content: + line = line.strip() # pass blank line - if not line.strip(): + if not line: continue - # metadata detect - is_metadata = False - metadata_content = self.rules.get("metadata") - patterns = metadata_content.get("patterns", []) - for pattern in patterns: - match = re.search(pattern, line) - if match: - # If it's metadata, save the previous content - if current_metadata: - parsed_data.append({ - **current_metadata, - "content": current_content - }) - current_content = [] - - # Parsing new metadata - current_metadata = {} - groups = metadata_content.get("groups", []) - for i, key in enumerate(groups): - if i + 1 <= len(match.groups()): # Ensure effective - current_metadata[key] = match.group(i + 1).strip() - is_metadata = True - break - - if is_metadata: - continue # The metadata line has been processed, skip subsequent content matching - - # content detect - remaining_line = line - rules = self.rules.get("content") - while remaining_line: - matched = False - - for rule in rules: - # pass metadata rule - if rule["type"] == "metadata": - continue - - for pattern in rule["patterns"]: - match = re.match(pattern, remaining_line) - if match: - # If the matching content is not the beginning, it means that there is unknown content in front of it - if match.start() > 0: - current_content.append({ - "type": "unknown", - "content": remaining_line[:match.start()] - }) - - # Extract matched content - entry = {"type": rule["type"], "content": match.group(0)} - for i, group in enumerate(rule["groups"]): - entry[group] = match.group(i+1).strip() if match.group(i+1) else "" - - current_content.append(entry) - remaining_line = remaining_line[match.end():].lstrip() - matched = True - break - if matched: - break - - if not matched: - current_content.append({ - "type": "unknown", - "content": remaining_line + # Check for metadata + metadata_match = self._match_metadata(line) + if metadata_match: + if current_metadata: + parsed_data.append({ + **current_metadata, + "content": current_content }) - remaining_line = "" + current_content = [] + current_metadata = metadata_match + continue + + # Parse content + if current_metadata: + parsed_segments = self._parse_line_content(line) + current_content.extend(parsed_segments) - # Process the last line + # Add the last entry if current_metadata: parsed_data.append({ **current_metadata, "content": current_content }) - return parsed_data \ No newline at end of file + return parsed_data + + def _match_metadata(self, line: str) -> Optional[Dict]: + """Match metadata line.""" + metadata_rule = self.rules.get("metadata") + if not metadata_rule: + return None + + for pattern in metadata_rule.get("patterns", []): + match = re.match(pattern, line) + if match: + metadata = {"type": "metadata"} + for i, key in enumerate(metadata_rule.get("groups", [])): + if i + 1 <= len(match.groups()): + metadata[key] = match.group(i + 1).strip() + return metadata + return None + + def _parse_line_content(self, line: str) -> List[Dict]: + """Parse a single line of content recursively.""" + if not line: + return [] + + # Sort rules by priority (highest first) + content_rules = sorted( + self.rules.get("content", []), + key=lambda x: x.get("priority", 0), + reverse=True + ) + + for rule in content_rules: + for pattern in rule["patterns"]: + match = re.search(pattern, line) + if match: + # Handle different match types + if rule["match_type"] == "enclosed": + return self._handle_enclosed_match(line, match, rule) + elif rule["match_type"] == "prefix": + return self._handle_prefix_match(line, match, rule) + elif rule["match_type"] == "suffix": + return self._handle_suffix_match(line, match, rule) + + # If no matches found, return as unknown + return [{"type": "unknown", "content": line}] + + def _handle_enclosed_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]: + """Handle enclosed matches (highest priority).""" + before = line[:match.start()].strip() + matched = match.group(0).strip() + after = line[match.end():].strip() + + result = [] + if before: + result.extend(self._parse_line_content(before)) + + entry = {"type": rule["type"], "content": matched} + for i, group in enumerate(rule.get("groups", [])): + if i + 1 <= len(match.groups()): + entry[group] = match.group(i + 1).strip() if match.group(i + 1) else "" + result.append(entry) + + if after: + result.extend(self._parse_line_content(after)) + + return result + + def _handle_prefix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]: + """Handle prefix matches.""" + matched = line[match.start():].strip() + before = line[:match.start()].strip() + + result = [] + if before: + result.extend(self._parse_line_content(before)) + + entry = {"type": rule["type"], "content": matched} + for i, group in enumerate(rule.get("groups", [])): + if i + 1 <= len(match.groups()): + entry[group] = match.group(i + 1).strip() if match.group(i + 1) else "" + result.append(entry) + + return result + + def _handle_suffix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]: + """Handle suffix matches.""" + matched = line[:match.end()].strip() + after = line[match.end():].strip() + + entry = {"type": rule["type"], "content": matched} + for i, group in enumerate(rule.get("groups", [])): + if i + 1 <= len(match.groups()): + entry[group] = match.group(i + 1).strip() if match.group(i + 1) else "" + + result = [entry] + if after: + result.extend(self._parse_line_content(after)) + + return result \ No newline at end of file diff --git a/test/example_rule.json b/test/example_rule.json index 750789a..73abe3f 100644 --- a/test/example_rule.json +++ b/test/example_rule.json @@ -1,6 +1,6 @@ { + // 匹配日志元数据,提取 id、QQ 账号和时间。例如:墨勒托.DW(1571806261) 2025-01-27 19:58:15 "metadata": { - // 匹配日志元数据,提取 id、QQ 账号和时间。例如:墨勒托.DW(1571806261) 2025-01-27 19:58:15 "type": "metadata", "patterns": [ "^(\\S+)\\((\\d+)\\)\\s+(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})" @@ -10,37 +10,47 @@ "content": [ { "type": "action", - "patterns": ["^#s*((?:(?![“”\"(【】]).)+)"], // 排除后续特殊符号 + "match_type": "prefix", + "priority": 1001, // 优先级,数字越小优先级越高,范围为1-5000 + "patterns": ["^#(.+)"], "groups": ["action_content"] }, { "type": "speech", + "match_type": "enclosed", + "priority": 101, "patterns": [ - "[“](.+?)[”]", // 中文引号 - "\"(.*?)\"", // 英文引号 - "”(.+?)“" // 混合引号 + "[“”\"](.+?)[“”\"]" // 匹配各种引号 ], "groups": ["speech_content"] }, { "type": "ooc_speech", + "match_type": "enclosed", + "priority": 103, "patterns": [ - // "((.*?))", // 英文括号 - "((.*?))", // 中文括号 - // "((.*)", // 未闭合英文括号 - "((.*)" // 未闭合中文括号 + "[\\((](.+?)[\\))]" // 匹配各种括号 ], "groups": ["ooc_content"] }, { - // 匹配掷骰指令,以 . 或 。开头但是不匹配连续的指令前缀。例如:匹配".ra智力",不匹配"。。。" + "type": "suffix_ooc_speech", + "match_type": "suffix", + "priority": 2001, + "patterns": ["(.+)b$"], + "groups": ["suffix_ooc_speech"] + }, + { "type": "dice_order", - "patterns": ["^(?:[\\.。]([^.。].+))"], + "match_type": "prefix", + "priority": 1, + "patterns": ["^[.。](.+)"], "groups": ["dice_command"] }, { - // 匹配角色心理活动。例如:【这里好可怕】 "type": "thought", + "match_type": "enclosed", + "priority": 102, "patterns": ["【(.+)】"], "groups": ["thought_content"] } -- cgit v1.2.3-70-g09d2