1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
import json5
import re
from pathlib import Path
from typing import List, Dict, Optional
class Parser:
def __init__(self):
self.rules: Dict[str, List[Dict]] = {}
def load_rules(self, rules_path: str):
"""Load parsing rules."""
if not Path(rules_path).exists():
raise FileNotFoundError(f"No such file or directory: {rules_path} ")
with open(rules_path, "r", encoding="utf-8") as f:
file_content = f.read()
rules = json5.loads(file_content)
if rules is None:
raise ValueError(f"Rule file cannot be empty.")
self.rules = rules
print(f"Rules loaded: {rules}\n")
def parse_log(self, log_path: str) -> List[Dict]:
"""Parse the TRPG log based on loaded rules."""
if not Path(log_path).exists():
raise FileNotFoundError(f"No such file or directory: {log_path}")
with open(log_path, "r", encoding="utf-8") as f:
log_content = f.read().splitlines()
parsed_data = []
current_metadata = None
current_content = []
# Iterate each line of the log
for line in log_content:
line = line.strip()
# pass blank line
if not line:
continue
# Check for metadata
metadata_match = self._match_metadata(line)
if metadata_match:
if current_metadata:
parsed_data.append({
**current_metadata,
"content": current_content
})
current_content = []
current_metadata = metadata_match
continue
# Parse content
if current_metadata:
parsed_segments = self._parse_line_content(line)
current_content.extend(parsed_segments)
# Add the last entry
if current_metadata:
parsed_data.append({
**current_metadata,
"content": current_content
})
return parsed_data
def _match_metadata(self, line: str) -> Optional[Dict]:
"""Match metadata line."""
metadata_rule = self.rules.get("metadata")
if isinstance(metadata_rule, list) and metadata_rule:
metadata_rule = metadata_rule[0]
if not metadata_rule:
return None
for pattern in metadata_rule.get("patterns", []):
match = re.match(pattern, line)
if match:
metadata = {"type": "metadata"}
for i, key in enumerate(metadata_rule.get("groups", [])):
if i + 1 <= len(match.groups()):
metadata[key] = match.group(i + 1).strip()
return metadata
return None
def _parse_line_content(self, line: str) -> List[Dict]:
"""Parse a single line of content recursively."""
if not line:
return []
# Sort rules by priority (highest first)
content_rules = sorted(
self.rules.get("content", []),
key=lambda x: x.get("priority", 0),
reverse=True
)
for rule in content_rules:
for pattern in rule["patterns"]:
match = re.search(pattern, line)
if match:
# Handle different match types
if rule["match_type"] == "enclosed":
return self._handle_enclosed_match(line, match, rule)
elif rule["match_type"] == "prefix":
return self._handle_prefix_match(line, match, rule)
elif rule["match_type"] == "suffix":
return self._handle_suffix_match(line, match, rule)
# If no matches found, return as unknown
return [{"type": "unknown", "content": line}]
def _handle_enclosed_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
"""Handle enclosed matches (highest priority)."""
before = line[:match.start()].strip()
matched = match.group(0).strip()
after = line[match.end():].strip()
result = []
if before:
result.extend(self._parse_line_content(before))
entry = {"type": rule["type"], "content": matched}
for i, group in enumerate(rule.get("groups", [])):
if i + 1 <= len(match.groups()):
entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
result.append(entry)
if after:
result.extend(self._parse_line_content(after))
return result
def _handle_prefix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
"""Handle prefix matches."""
matched = line[match.start():].strip()
before = line[:match.start()].strip()
result = []
if before:
result.extend(self._parse_line_content(before))
entry = {"type": rule["type"], "content": matched}
for i, group in enumerate(rule.get("groups", [])):
if i + 1 <= len(match.groups()):
entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
result.append(entry)
return result
def _handle_suffix_match(self, line: str, match: re.Match, rule: Dict) -> List[Dict]:
"""Handle suffix matches."""
matched = line[:match.end()].strip()
after = line[match.end():].strip()
entry = {"type": rule["type"], "content": matched}
for i, group in enumerate(rule.get("groups", [])):
if i + 1 <= len(match.groups()):
entry[group] = match.group(i + 1).strip() if match.group(i + 1) else ""
result = [entry]
if after:
result.extend(self._parse_line_content(after))
return result
|