aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/conventionalrp/core/parser.py
diff options
context:
space:
mode:
author简律纯 <i@jyunko.cn>2025-04-27 12:29:42 +0800
committerGitHub <noreply@github.com>2025-04-27 12:29:42 +0800
commit013fc7460e77799825b8ae5aff9a4e59feec6453 (patch)
tree04e4bb4c368e6266fec7c010362aa77d2a657b80 /src/conventionalrp/core/parser.py
parent5319feea52f7266029b9a3a609a3f1ae494c6a60 (diff)
parent9f7e4b5d0a4aa6d9536a6eb1471a110d716e2566 (diff)
downloadconventional_role_play-013fc7460e77799825b8ae5aff9a4e59feec6453.tar.gz
conventional_role_play-013fc7460e77799825b8ae5aff9a4e59feec6453.zip
Merge pull request #8 from pineoncellar/main
feat: 🎨 refactor log parsing logic with simplified rules and priority-based matching
Diffstat (limited to 'src/conventionalrp/core/parser.py')
-rw-r--r--src/conventionalrp/core/parser.py195
1 files changed, 121 insertions, 74 deletions
diff --git a/src/conventionalrp/core/parser.py b/src/conventionalrp/core/parser.py
index f451178..a3dd947 100644
--- a/src/conventionalrp/core/parser.py
+++ b/src/conventionalrp/core/parser.py
@@ -1,7 +1,7 @@
import json5
import re
from pathlib import Path
-
+from typing import List, Dict, Optional
class Parser:
def __init__(self):
@@ -23,99 +23,146 @@ class Parser:
# to be continue...
self.rules = rules
+ print(f"Rules loaded: {rules}\n")
- def parse_log(self, log_path: str):
+ def parse_log(self, log_path: str) -> List[Dict]:
"""Parse the TRPG log based on loaded rules."""
- parsed_data = []
-
if not Path(log_path).exists():
- raise FileNotFoundError(f"No such file or directory: {log_path} ")
+ raise FileNotFoundError(f"No such file or directory: {log_path}")
with open(log_path, "r", encoding="utf-8") as f:
log_content = f.read().splitlines()
+ parsed_data = []
current_metadata = None
current_content = []
# Iterate each line of the log
for line in log_content:
+ line = line.strip()
# pass blank line
- if not line.strip():
+ if not line:
continue
- # metadata detect
- is_metadata = False
- for rule in self.rules:
- if rule.get("type") == "metadata":
- patterns = rule.get("patterns", [])
- for pattern in patterns:
- match = re.search(pattern, line)
- if match:
- # If it's metadata, save the previous content
- if current_metadata:
- parsed_data.append({
- **current_metadata,
- "content": current_content
- })
- current_content = []
-
- # Parsing new metadata
- current_metadata = {}
- groups = rule.get("groups", [])
- for i, key in enumerate(groups):
- if i + 1 <= len(match.groups()): # Ensure effective
- current_metadata[key] = match.group(i + 1).strip()
- is_metadata = True
- break
- if is_metadata:
- break
-
- if is_metadata:
- continue # The metadata line has been processed, skip subsequent content matching
-
- # content detect
- remaining_line = line
- while remaining_line:
- matched = False
- for rule in self.rules:
- # pass metadata rule
- if rule["type"] == "metadata":
- continue
-
- for pattern in rule["patterns"]:
- match = re.match(pattern, remaining_line)
- if match:
- # If the matching content is not the beginning, it means that there is unknown content in front of it
- if match.start() > 0:
- current_content.append({
- "type": "unknown",
- "content": remaining_line[:match.start()]
- })
-
- # Extract matched content
- entry = {"type": rule["type"], "content": match.group(0)}
- for i, group in enumerate(rule["groups"]):
- entry[group] = match.group(i+1).strip() if match.group(i+1) else ""
-
- current_content.append(entry)
- remaining_line = remaining_line[match.end():].lstrip()
- matched = True
- break
- if matched:
- break
-
- if not matched:
- current_content.append({
- "type": "unknown",
- "content": remaining_line
+ # Check for metadata
+ metadata_match = self._match_metadata(line)
+ if metadata_match:
+ if current_metadata:
+ parsed_data.append({
+ **current_metadata,
+ "content": current_content
})
- remaining_line = ""
+ current_content = []
+ current_metadata = metadata_match
+ continue
+
+ # Parse content
+ if current_metadata:
+ parsed_segments = self._parse_line_content(line)
+ current_content.extend(parsed_segments)
- # Process the last line
+ # Add the last entry
if current_metadata:
parsed_data.append({
**current_metadata,
"content": current_content
})
- return parsed_data \ No newline at end of file
+ return parsed_data
+
+ def _match_metadata(self, line: str) -> Optional[Dict]:
+ """Match metadata line."""
+ metadata_rule = self.rules.get("metadata")
+ if not metadata_rule:
+ return None
+
+ for pattern in metadata_rule.get("patterns", []):
+ match = re.match(pattern, line)
+ if match:
+ metadata = {"type": "metadata"}
+ for i, key in enumerate(metadata_rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ metadata[key] = match.group(i + 1).strip()
+ return metadata
+ return None
+
+ def _parse_line_content(self, line: str) -> List[Dict]:
+ """Parse a single line of content recursively."""
+ if not line:
+ return []
+
+ # Sort rules by priority (highest first)
+ content_rules = sorted(
+ self.rules.get("content", []),
+ key=lambda x: x.get("priority", 0),
+ reverse=True
+ )
+
+ for rule in content_rules:
+ for pattern in rule["patterns"]:
+ match = re.search(pattern, line)
+ if match:
+ # Handle different match types
+ if rule["match_type"] == "enclosed":
+ return self._handle_enclosed_match(line, match, rule)
+ elif rule["match_type"] == "prefix":
+ return self._handle_prefix_match(line, match, rule)
+ elif rule["match_type"] == "suffix":
+ return self._handle_suffix_match(line, match, rule)
+
+ # If no matches found, return as unknown
+ return [{"type": "unknown", "content": line}]
+
+ def _handle_enclosed_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
+ """Handle enclosed matches (highest priority)."""
+ before = line[:match.start()].strip()
+ matched = match.group(0).strip()
+ after = line[match.end():].strip()
+
+ result = []
+ if before:
+ result.extend(self._parse_line_content(before))
+
+ entry = {"type": rule["type"], "content": matched}
+ for i, group in enumerate(rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
+ result.append(entry)
+
+ if after:
+ result.extend(self._parse_line_content(after))
+
+ return result
+
+ def _handle_prefix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
+ """Handle prefix matches."""
+ matched = line[match.start():].strip()
+ before = line[:match.start()].strip()
+
+ result = []
+ if before:
+ result.extend(self._parse_line_content(before))
+
+ entry = {"type": rule["type"], "content": matched}
+ for i, group in enumerate(rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
+ result.append(entry)
+
+ return result
+
+ def _handle_suffix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
+ """Handle suffix matches."""
+ matched = line[:match.end()].strip()
+ after = line[match.end():].strip()
+
+ entry = {"type": rule["type"], "content": matched}
+ for i, group in enumerate(rule.get("groups", [])):
+ if i + 1 <= len(match.groups()):
+ entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
+
+ result = [entry]
+ if after:
+ result.extend(self._parse_line_content(after))
+
+ return result \ No newline at end of file