Sanitized mirror from private repository - 2026-04-05 09:50:13 UTC
This commit is contained in:
194
dashboard/api/log_parser.py
Normal file
194
dashboard/api/log_parser.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Parse automation log files into structured events for the dashboard."""
|
||||
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
|
||||
# Patterns: (compiled_regex, event_type, optional extractor returning extra fields)
|
||||
# Extractor receives the match object and returns a dict of extra fields.
|
||||
# Order matters — first match wins.
|
||||
PATTERNS = [
|
||||
# --- Email classification ---
|
||||
(re.compile(r"\[(\d+)/(\d+)\] Classifying: (.+?) \(from:"), "email_classifying",
|
||||
lambda m: {"progress": f"{m.group(1)}/{m.group(2)}", "subject": m.group(3)}),
|
||||
(re.compile(r"Cached: (.+?) -> (\w+)"), "email_cached",
|
||||
lambda m: {"subject": m.group(1), "category": m.group(2)}),
|
||||
(re.compile(r"→ (receipts|newsletters|work|personal|accounts)(?:\s*\((.+?)\))?"), "email_classified",
|
||||
lambda m: {"category": m.group(1), "label": m.group(2) or ""}),
|
||||
|
||||
# --- Receipt extraction ---
|
||||
(re.compile(r"Would write:.*'vendor': '([^']+)'.*'amount': '([^']+)'"), "receipt_extracted",
|
||||
lambda m: {"vendor": m.group(1), "amount": m.group(2)}),
|
||||
(re.compile(r"Appended to CSV:.*vendor=([^,]+).*amount=([^,]+)"), "receipt_extracted",
|
||||
lambda m: {"vendor": m.group(1).strip(), "amount": m.group(2).strip()}),
|
||||
|
||||
# --- Cron / automation completions ---
|
||||
(re.compile(r"Done! Stats: \{"), "cron_complete", lambda m: {}),
|
||||
|
||||
# --- Container health / stack restarts ---
|
||||
(re.compile(r"Container (\S+) on (\S+) restarted"), "container_restarted",
|
||||
lambda m: {"container": m.group(1), "endpoint": m.group(2)}),
|
||||
(re.compile(r"LLM says (SAFE|UNSAFE) for (\S+)"), "restart_analysis",
|
||||
lambda m: {"decision": m.group(1), "container": m.group(2)}),
|
||||
(re.compile(r"[Uu]nhealthy.*?(\S+)\s+on\s+(\S+)"), "container_unhealthy",
|
||||
lambda m: {"container": m.group(1), "endpoint": m.group(2)}),
|
||||
(re.compile(r"[Uu]nhealthy"), "container_unhealthy", lambda m: {}),
|
||||
(re.compile(r"Stack-restart check complete"), "stack_healthy", lambda m: {}),
|
||||
|
||||
# --- Backups ---
|
||||
(re.compile(r"Backup Validation: (OK|FAIL)"), "backup_result",
|
||||
lambda m: {"status": m.group(1)}),
|
||||
(re.compile(r"Backup Report"), "backup_result", lambda m: {"status": "report"}),
|
||||
|
||||
# --- Config drift ---
|
||||
(re.compile(r"Detected (\d+) drifts? across (\d+) services?"), "drift_found",
|
||||
lambda m: {"drifts": m.group(1), "services": m.group(2)}),
|
||||
(re.compile(r"No drifts found"), "drift_clean", lambda m: {}),
|
||||
|
||||
# --- Disk predictor ---
|
||||
(re.compile(r"WARNING.*volume.* (\d+) days"), "disk_warning",
|
||||
lambda m: {"days": m.group(1)}),
|
||||
(re.compile(r"Total filesystems: (\d+)"), "disk_scan_complete",
|
||||
lambda m: {"count": m.group(1)}),
|
||||
|
||||
# --- Changelog / PR review ---
|
||||
(re.compile(r"Generated changelog with (\d+) commits"), "changelog_generated",
|
||||
lambda m: {"commits": m.group(1)}),
|
||||
(re.compile(r"(\d+) new commits since"), "changelog_commits",
|
||||
lambda m: {"count": m.group(1)}),
|
||||
(re.compile(r"Posted review comment on PR #(\d+)"), "pr_reviewed",
|
||||
lambda m: {"pr": m.group(1)}),
|
||||
|
||||
# --- Catch-all patterns (lower priority) ---
|
||||
(re.compile(r"ERROR|CRITICAL"), "error", lambda m: {}),
|
||||
(re.compile(r"Starting .+ check|Starting .+ organizer"), "start", lambda m: {}),
|
||||
(re.compile(r"emails? downloaded|backup: \d+ total"), "backup_progress", lambda m: {}),
|
||||
]
|
||||
|
||||
# Timestamp pattern at the start of log lines
|
||||
TS_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}[\sT_]\d{2}:\d{2}:\d{2})")
|
||||
|
||||
|
||||
def parse_timestamp(line: str) -> datetime | None:
|
||||
"""Extract timestamp from a log line."""
|
||||
m = TS_PATTERN.match(line)
|
||||
if m:
|
||||
ts_str = m.group(1).replace("_", " ").replace("T", " ")
|
||||
try:
|
||||
return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def classify_line(line: str) -> tuple[str, dict] | None:
|
||||
"""Return (event_type, extra_fields) if line matches a known pattern, else None."""
|
||||
for pattern, event_type, extractor in PATTERNS:
|
||||
m = pattern.search(line)
|
||||
if m:
|
||||
try:
|
||||
extra = extractor(m)
|
||||
except Exception:
|
||||
extra = {}
|
||||
return event_type, extra
|
||||
return None
|
||||
|
||||
|
||||
def get_recent_events(log_dir: str | Path, max_events: int = 50) -> list[dict]:
|
||||
"""Parse today's events from all log files in log_dir."""
|
||||
log_dir = Path(log_dir)
|
||||
today = date.today().isoformat()
|
||||
events = []
|
||||
|
||||
for log_file in log_dir.glob("*.log"):
|
||||
source = log_file.stem
|
||||
try:
|
||||
with open(log_file, "r", errors="replace") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or today not in line:
|
||||
continue
|
||||
ts = parse_timestamp(line)
|
||||
if ts is None or ts.date().isoformat() != today:
|
||||
continue
|
||||
result = classify_line(line)
|
||||
if result:
|
||||
event_type, extra = result
|
||||
raw_msg = line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip()
|
||||
event = {
|
||||
"time": ts.strftime("%H:%M:%S"),
|
||||
"timestamp": ts.isoformat(),
|
||||
"type": event_type,
|
||||
"source": source,
|
||||
"raw": raw_msg,
|
||||
**extra,
|
||||
}
|
||||
events.append(event)
|
||||
except (OSError, PermissionError):
|
||||
continue
|
||||
|
||||
events.sort(key=lambda e: e["timestamp"], reverse=True)
|
||||
return events[:max_events]
|
||||
|
||||
|
||||
def tail_logs(log_dir: str | Path) -> dict[str, int]:
|
||||
"""Return current file positions (sizes) for SSE polling."""
|
||||
log_dir = Path(log_dir)
|
||||
positions = {}
|
||||
for log_file in log_dir.glob("*.log"):
|
||||
try:
|
||||
positions[str(log_file)] = log_file.stat().st_size
|
||||
except OSError:
|
||||
positions[str(log_file)] = 0
|
||||
return positions
|
||||
|
||||
|
||||
def get_new_lines(log_dir: str | Path, positions: dict[str, int]) -> tuple[list[dict], dict[str, int]]:
|
||||
"""Read new lines since last positions. Returns (new_events, updated_positions)."""
|
||||
log_dir = Path(log_dir)
|
||||
today = date.today().isoformat()
|
||||
new_events = []
|
||||
new_positions = dict(positions)
|
||||
|
||||
for log_file in log_dir.glob("*.log"):
|
||||
path_str = str(log_file)
|
||||
old_pos = positions.get(path_str, 0)
|
||||
try:
|
||||
current_size = log_file.stat().st_size
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
if current_size <= old_pos:
|
||||
new_positions[path_str] = current_size
|
||||
continue
|
||||
|
||||
source = log_file.stem
|
||||
try:
|
||||
with open(log_file, "r", errors="replace") as f:
|
||||
f.seek(old_pos)
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or today not in line:
|
||||
continue
|
||||
ts = parse_timestamp(line)
|
||||
if ts is None:
|
||||
continue
|
||||
result = classify_line(line)
|
||||
if result:
|
||||
event_type, extra = result
|
||||
raw_msg = line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip()
|
||||
new_events.append({
|
||||
"time": ts.strftime("%H:%M:%S"),
|
||||
"timestamp": ts.isoformat(),
|
||||
"type": event_type,
|
||||
"source": source,
|
||||
"raw": raw_msg,
|
||||
**extra,
|
||||
})
|
||||
new_positions[path_str] = current_size
|
||||
except (OSError, PermissionError):
|
||||
continue
|
||||
|
||||
new_events.sort(key=lambda e: e["timestamp"], reverse=True)
|
||||
return new_events, new_positions
|
||||
Reference in New Issue
Block a user