Files
homelab-optimized/dashboard/api/log_parser.py
Gitea Mirror Bot 61e87cd8d4
Some checks failed
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Documentation / Build Docusaurus (push) Has been cancelled
Sanitized mirror from private repository - 2026-04-16 09:20:47 UTC
2026-04-16 09:20:47 +00:00

195 lines
7.8 KiB
Python

"""Parse automation log files into structured events for the dashboard."""
import os
import re
from datetime import datetime, date
from pathlib import Path
# Patterns: (compiled_regex, event_type, optional extractor returning extra fields)
# Extractor receives the match object and returns a dict of extra fields.
# Order matters — first match wins.
PATTERNS = [
# --- Email classification ---
(re.compile(r"\[(\d+)/(\d+)\] Classifying: (.+?) \(from:"), "email_classifying",
lambda m: {"progress": f"{m.group(1)}/{m.group(2)}", "subject": m.group(3)}),
(re.compile(r"Cached: (.+?) -> (\w+)"), "email_cached",
lambda m: {"subject": m.group(1), "category": m.group(2)}),
(re.compile(r"→ (receipts|newsletters|work|personal|accounts)(?:\s*\((.+?)\))?"), "email_classified",
lambda m: {"category": m.group(1), "label": m.group(2) or ""}),
# --- Receipt extraction ---
(re.compile(r"Would write:.*'vendor': '([^']+)'.*'amount': '([^']+)'"), "receipt_extracted",
lambda m: {"vendor": m.group(1), "amount": m.group(2)}),
(re.compile(r"Appended to CSV:.*vendor=([^,]+).*amount=([^,]+)"), "receipt_extracted",
lambda m: {"vendor": m.group(1).strip(), "amount": m.group(2).strip()}),
# --- Cron / automation completions ---
(re.compile(r"Done! Stats: \{"), "cron_complete", lambda m: {}),
# --- Container health / stack restarts ---
(re.compile(r"Container (\S+) on (\S+) restarted"), "container_restarted",
lambda m: {"container": m.group(1), "endpoint": m.group(2)}),
(re.compile(r"LLM says (SAFE|UNSAFE) for (\S+)"), "restart_analysis",
lambda m: {"decision": m.group(1), "container": m.group(2)}),
(re.compile(r"[Uu]nhealthy.*?(\S+)\s+on\s+(\S+)"), "container_unhealthy",
lambda m: {"container": m.group(1), "endpoint": m.group(2)}),
(re.compile(r"[Uu]nhealthy"), "container_unhealthy", lambda m: {}),
(re.compile(r"Stack-restart check complete"), "stack_healthy", lambda m: {}),
# --- Backups ---
(re.compile(r"Backup Validation: (OK|FAIL)"), "backup_result",
lambda m: {"status": m.group(1)}),
(re.compile(r"Backup Report"), "backup_result", lambda m: {"status": "report"}),
# --- Config drift ---
(re.compile(r"Detected (\d+) drifts? across (\d+) services?"), "drift_found",
lambda m: {"drifts": m.group(1), "services": m.group(2)}),
(re.compile(r"No drifts found"), "drift_clean", lambda m: {}),
# --- Disk predictor ---
(re.compile(r"WARNING.*volume.* (\d+) days"), "disk_warning",
lambda m: {"days": m.group(1)}),
(re.compile(r"Total filesystems: (\d+)"), "disk_scan_complete",
lambda m: {"count": m.group(1)}),
# --- Changelog / PR review ---
(re.compile(r"Generated changelog with (\d+) commits"), "changelog_generated",
lambda m: {"commits": m.group(1)}),
(re.compile(r"(\d+) new commits since"), "changelog_commits",
lambda m: {"count": m.group(1)}),
(re.compile(r"Posted review comment on PR #(\d+)"), "pr_reviewed",
lambda m: {"pr": m.group(1)}),
# --- Catch-all patterns (lower priority) ---
(re.compile(r"ERROR|CRITICAL"), "error", lambda m: {}),
(re.compile(r"Starting .+ check|Starting .+ organizer"), "start", lambda m: {}),
(re.compile(r"emails? downloaded|backup: \d+ total"), "backup_progress", lambda m: {}),
]
# Timestamp pattern at the start of log lines
TS_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}[\sT_]\d{2}:\d{2}:\d{2})")
def parse_timestamp(line: str) -> datetime | None:
"""Extract timestamp from a log line."""
m = TS_PATTERN.match(line)
if m:
ts_str = m.group(1).replace("_", " ").replace("T", " ")
try:
return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
pass
return None
def classify_line(line: str) -> tuple[str, dict] | None:
"""Return (event_type, extra_fields) if line matches a known pattern, else None."""
for pattern, event_type, extractor in PATTERNS:
m = pattern.search(line)
if m:
try:
extra = extractor(m)
except Exception:
extra = {}
return event_type, extra
return None
def get_recent_events(log_dir: str | Path, max_events: int = 50) -> list[dict]:
"""Parse today's events from all log files in log_dir."""
log_dir = Path(log_dir)
today = date.today().isoformat()
events = []
for log_file in log_dir.glob("*.log"):
source = log_file.stem
try:
with open(log_file, "r", errors="replace") as f:
for line in f:
line = line.strip()
if not line or today not in line:
continue
ts = parse_timestamp(line)
if ts is None or ts.date().isoformat() != today:
continue
result = classify_line(line)
if result:
event_type, extra = result
raw_msg = line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip()
event = {
"time": ts.strftime("%H:%M:%S"),
"timestamp": ts.isoformat(),
"type": event_type,
"source": source,
"raw": raw_msg,
**extra,
}
events.append(event)
except (OSError, PermissionError):
continue
events.sort(key=lambda e: e["timestamp"], reverse=True)
return events[:max_events]
def tail_logs(log_dir: str | Path) -> dict[str, int]:
"""Return current file positions (sizes) for SSE polling."""
log_dir = Path(log_dir)
positions = {}
for log_file in log_dir.glob("*.log"):
try:
positions[str(log_file)] = log_file.stat().st_size
except OSError:
positions[str(log_file)] = 0
return positions
def get_new_lines(log_dir: str | Path, positions: dict[str, int]) -> tuple[list[dict], dict[str, int]]:
"""Read new lines since last positions. Returns (new_events, updated_positions)."""
log_dir = Path(log_dir)
today = date.today().isoformat()
new_events = []
new_positions = dict(positions)
for log_file in log_dir.glob("*.log"):
path_str = str(log_file)
old_pos = positions.get(path_str, 0)
try:
current_size = log_file.stat().st_size
except OSError:
continue
if current_size <= old_pos:
new_positions[path_str] = current_size
continue
source = log_file.stem
try:
with open(log_file, "r", errors="replace") as f:
f.seek(old_pos)
for line in f:
line = line.strip()
if not line or today not in line:
continue
ts = parse_timestamp(line)
if ts is None:
continue
result = classify_line(line)
if result:
event_type, extra = result
raw_msg = line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip()
new_events.append({
"time": ts.strftime("%H:%M:%S"),
"timestamp": ts.isoformat(),
"type": event_type,
"source": source,
"raw": raw_msg,
**extra,
})
new_positions[path_str] = current_size
except (OSError, PermissionError):
continue
new_events.sort(key=lambda e: e["timestamp"], reverse=True)
return new_events, new_positions