aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorpine <dijsds@163.com>2025-04-14 20:51:18 +0800
committerpine <dijsds@163.com>2025-04-14 20:51:18 +0800
commit9f7e4b5d0a4aa6d9536a6eb1471a110d716e2566 (patch)
tree04e4bb4c368e6266fec7c010362aa77d2a657b80
parent4f56bec03e050678b706ff34a9c8db186b63a4e1 (diff)
downloadconventional_role_play-pineoncellar/main.tar.gz
conventional_role_play-pineoncellar/main.zip
feat: :art: refactor log parsing logic with simplified rules and priority-based matchingpineoncellar/main
-rw-r--r--src/conventionalrp/core/parser.py193
-rw-r--r--test/example_rule.json34
2 files changed, 142 insertions, 85 deletions
diff --git a/src/conventionalrp/core/parser.py b/src/conventionalrp/core/parser.py
index f214c4c..a3dd947 100644
--- a/src/conventionalrp/core/parser.py
+++ b/src/conventionalrp/core/parser.py
@@ -1,7 +1,7 @@
import json5
import re
from pathlib import Path
-
+from typing import List, Dict, Optional
class Parser:
def __init__(self):
@@ -25,97 +25,144 @@ class Parser:
self.rules = rules
print(f"Rules loaded: {rules}\n")
- def parse_log(self, log_path: str):
+ def parse_log(self, log_path: str) -> List[Dict]:
"""Parse the TRPG log based on loaded rules."""
- parsed_data = []
-
if not Path(log_path).exists():
- raise FileNotFoundError(f"No such file or directory: {log_path} ")
+ raise FileNotFoundError(f"No such file or directory: {log_path}")
with open(log_path, "r", encoding="utf-8") as f:
log_content = f.read().splitlines()
+ parsed_data = []
current_metadata = None
current_content = []
# Iterate each line of the log
for line in log_content:
+ line = line.strip()
# pass blank line
- if not line.strip():
+ if not line:
continue
- # metadata detect
- is_metadata = False
- metadata_content = self.rules.get("metadata")
- patterns = metadata_content.get("patterns", [])
- for pattern in patterns:
- match = re.search(pattern, line)
- if match:
- # If it's metadata, save the previous content
- if current_metadata:
- parsed_data.append({
- **current_metadata,
- "content": current_content
- })
- current_content = []
-
- # Parsing new metadata
- current_metadata = {}
- groups = metadata_content.get("groups", [])
- for i, key in enumerate(groups):
- if i + 1 <= len(match.groups()): # Ensure effective
- current_metadata[key] = match.group(i + 1).strip()
- is_metadata = True
- break
-
- if is_metadata:
- continue # The metadata line has been processed, skip subsequent content matching
-
- # content detect
- remaining_line = line
- rules = self.rules.get("content")
- while remaining_line:
- matched = False
-
- for rule in rules:
- # pass metadata rule
- if rule["type"] == "metadata":
- continue
-
- for pattern in rule["patterns"]:
- match = re.match(pattern, remaining_line)
- if match:
- # If the matching content is not the beginning, it means that there is unknown content in front of it
- if match.start() > 0:
- current_content.append({
- "type": "unknown",
- "content": remaining_line[:match.start()]
- })
-
- # Extract matched content
- entry = {"type": rule["type"], "content": match.group(0)}
- for i, group in enumerate(rule["groups"]):
- entry[group] = match.group(i+1).strip() if match.group(i+1) else ""
-
- current_content.append(entry)
- remaining_line = remaining_line[match.end():].lstrip()
- matched = True
- break
- if matched:
- break
-
- if not matched:
- current_content.append({
- "type": "unknown",
- "content": remaining_line
+ # Check for metadata
+ metadata_match = self._match_metadata(line)
+ if metadata_match:
+ if current_metadata:
+ parsed_data.append({
+ **current_metadata,
+ "content": current_content
})
- remaining_line = ""
+ current_content = []
+ current_metadata = metadata_match
+ continue
+
+ # Parse content
+ if current_metadata:
+ parsed_segments = self._parse_line_content(line)
+ current_content.extend(parsed_segments)
- # Process the last line
+ # Add the last entry
if current_metadata:
parsed_data.append({
**current_metadata,
"content": current_content
})
- return parsed_data \ No newline at end of file
+ return parsed_data
+
+ def _match_metadata(self, line: str) -> Optional[Dict]:
+ """Match metadata line."""
+ metadata_rule = self.rules.get("metadata")
+ if not metadata_rule:
+ return None
+
+ for pattern in metadata_rule.get("patterns", []):
+ match = re.match(pattern, line)
+ if match:
+ metadata = {"type": "metadata"}
+ for i, key in enumerate(metadata_rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ metadata[key] = match.group(i + 1).strip()
+ return metadata
+ return None
+
+ def _parse_line_content(self, line: str) -> List[Dict]:
+ """Parse a single line of content recursively."""
+ if not line:
+ return []
+
+ # Sort rules by priority (highest first)
+ content_rules = sorted(
+ self.rules.get("content", []),
+ key=lambda x: x.get("priority", 0),
+ reverse=True
+ )
+
+ for rule in content_rules:
+ for pattern in rule["patterns"]:
+ match = re.search(pattern, line)
+ if match:
+ # Handle different match types
+ if rule["match_type"] == "enclosed":
+ return self._handle_enclosed_match(line, match, rule)
+ elif rule["match_type"] == "prefix":
+ return self._handle_prefix_match(line, match, rule)
+ elif rule["match_type"] == "suffix":
+ return self._handle_suffix_match(line, match, rule)
+
+ # If no matches found, return as unknown
+ return [{"type": "unknown", "content": line}]
+
+ def _handle_enclosed_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
+ """Handle enclosed matches (highest priority)."""
+ before = line[:match.start()].strip()
+ matched = match.group(0).strip()
+ after = line[match.end():].strip()
+
+ result = []
+ if before:
+ result.extend(self._parse_line_content(before))
+
+ entry = {"type": rule["type"], "content": matched}
+ for i, group in enumerate(rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
+ result.append(entry)
+
+ if after:
+ result.extend(self._parse_line_content(after))
+
+ return result
+
+ def _handle_prefix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
+ """Handle prefix matches."""
+ matched = line[match.start():].strip()
+ before = line[:match.start()].strip()
+
+ result = []
+ if before:
+ result.extend(self._parse_line_content(before))
+
+ entry = {"type": rule["type"], "content": matched}
+ for i, group in enumerate(rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
+ result.append(entry)
+
+ return result
+
+ def _handle_suffix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
+ """Handle suffix matches."""
+ matched = line[:match.end()].strip()
+ after = line[match.end():].strip()
+
+ entry = {"type": rule["type"], "content": matched}
+ for i, group in enumerate(rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
+
+ result = [entry]
+ if after:
+ result.extend(self._parse_line_content(after))
+
+ return result \ No newline at end of file
diff --git a/test/example_rule.json b/test/example_rule.json
index 750789a..73abe3f 100644
--- a/test/example_rule.json
+++ b/test/example_rule.json
@@ -1,6 +1,6 @@
{
+ // 匹配日志元数据,提取 id、QQ 账号和时间。例如:墨勒托.DW(1571806261) 2025-01-27 19:58:15
"metadata": {
- // 匹配日志元数据,提取 id、QQ 账号和时间。例如:墨勒托.DW(1571806261) 2025-01-27 19:58:15
"type": "metadata",
"patterns": [
"^(\\S+)\\((\\d+)\\)\\s+(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})"
@@ -10,37 +10,47 @@
"content": [
{
"type": "action",
- "patterns": ["^#s*((?:(?![“”\"(【】]).)+)"], // 排除后续特殊符号
+ "match_type": "prefix",
+ "priority": 1001, // 优先级,数字越小优先级越高,范围为1-5000
+ "patterns": ["^#(.+)"],
"groups": ["action_content"]
},
{
"type": "speech",
+ "match_type": "enclosed",
+ "priority": 101,
"patterns": [
- "[“](.+?)[”]", // 中文引号
- "\"(.*?)\"", // 英文引号
- "”(.+?)“" // 混合引号
+ "[“”\"](.+?)[“”\"]" // 匹配各种引号
],
"groups": ["speech_content"]
},
{
"type": "ooc_speech",
+ "match_type": "enclosed",
+ "priority": 103,
"patterns": [
- // "((.*?))", // 英文括号
- "((.*?))", // 中文括号
- // "((.*)", // 未闭合英文括号
- "((.*)" // 未闭合中文括号
+ "[\\((](.+?)[\\))]" // 匹配各种括号
],
"groups": ["ooc_content"]
},
{
- // 匹配掷骰指令,以 . 或 。开头但是不匹配连续的指令前缀。例如:匹配".ra智力",不匹配"。。。"
+ "type": "suffix_ooc_speech",
+ "match_type": "suffix",
+ "priority": 2001,
+ "patterns": ["(.+)b$"],
+ "groups": ["suffix_ooc_speech"]
+ },
+ {
"type": "dice_order",
- "patterns": ["^(?:[\\.。]([^.。].+))"],
+ "match_type": "prefix",
+ "priority": 1,
+ "patterns": ["^[.。](.+)"],
"groups": ["dice_command"]
},
{
- // 匹配角色心理活动。例如:【这里好可怕】
"type": "thought",
+ "match_type": "enclosed",
+ "priority": 102,
"patterns": ["【(.+)】"],
"groups": ["thought_content"]
}