"""Parse automation log files into structured events for the dashboard.""" import os import re from datetime import datetime, date from pathlib import Path # Patterns: (compiled_regex, event_type, optional extractor returning extra fields) # Extractor receives the match object and returns a dict of extra fields. # Order matters — first match wins. PATTERNS = [ # --- Email classification --- (re.compile(r"\[(\d+)/(\d+)\] Classifying: (.+?) \(from:"), "email_classifying", lambda m: {"progress": f"{m.group(1)}/{m.group(2)}", "subject": m.group(3)}), (re.compile(r"Cached: (.+?) -> (\w+)"), "email_cached", lambda m: {"subject": m.group(1), "category": m.group(2)}), (re.compile(r"→ (receipts|newsletters|work|personal|accounts)(?:\s*\((.+?)\))?"), "email_classified", lambda m: {"category": m.group(1), "label": m.group(2) or ""}), # --- Receipt extraction --- (re.compile(r"Would write:.*'vendor': '([^']+)'.*'amount': '([^']+)'"), "receipt_extracted", lambda m: {"vendor": m.group(1), "amount": m.group(2)}), (re.compile(r"Appended to CSV:.*vendor=([^,]+).*amount=([^,]+)"), "receipt_extracted", lambda m: {"vendor": m.group(1).strip(), "amount": m.group(2).strip()}), # --- Cron / automation completions --- (re.compile(r"Done! Stats: \{"), "cron_complete", lambda m: {}), # --- Container health / stack restarts --- (re.compile(r"Container (\S+) on (\S+) restarted"), "container_restarted", lambda m: {"container": m.group(1), "endpoint": m.group(2)}), (re.compile(r"LLM says (SAFE|UNSAFE) for (\S+)"), "restart_analysis", lambda m: {"decision": m.group(1), "container": m.group(2)}), (re.compile(r"[Uu]nhealthy.*?(\S+)\s+on\s+(\S+)"), "container_unhealthy", lambda m: {"container": m.group(1), "endpoint": m.group(2)}), (re.compile(r"[Uu]nhealthy"), "container_unhealthy", lambda m: {}), (re.compile(r"Stack-restart check complete"), "stack_healthy", lambda m: {}), # --- Backups --- (re.compile(r"Backup Validation: (OK|FAIL)"), "backup_result", lambda m: {"status": m.group(1)}), (re.compile(r"Backup Report"), "backup_result", lambda m: {"status": "report"}), # --- Config drift --- (re.compile(r"Detected (\d+) drifts? across (\d+) services?"), "drift_found", lambda m: {"drifts": m.group(1), "services": m.group(2)}), (re.compile(r"No drifts found"), "drift_clean", lambda m: {}), # --- Disk predictor --- (re.compile(r"WARNING.*volume.* (\d+) days"), "disk_warning", lambda m: {"days": m.group(1)}), (re.compile(r"Total filesystems: (\d+)"), "disk_scan_complete", lambda m: {"count": m.group(1)}), # --- Changelog / PR review --- (re.compile(r"Generated changelog with (\d+) commits"), "changelog_generated", lambda m: {"commits": m.group(1)}), (re.compile(r"(\d+) new commits since"), "changelog_commits", lambda m: {"count": m.group(1)}), (re.compile(r"Posted review comment on PR #(\d+)"), "pr_reviewed", lambda m: {"pr": m.group(1)}), # --- Catch-all patterns (lower priority) --- (re.compile(r"ERROR|CRITICAL"), "error", lambda m: {}), (re.compile(r"Starting .+ check|Starting .+ organizer"), "start", lambda m: {}), (re.compile(r"emails? downloaded|backup: \d+ total"), "backup_progress", lambda m: {}), ] # Timestamp pattern at the start of log lines TS_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}[\sT_]\d{2}:\d{2}:\d{2})") def parse_timestamp(line: str) -> datetime | None: """Extract timestamp from a log line.""" m = TS_PATTERN.match(line) if m: ts_str = m.group(1).replace("_", " ").replace("T", " ") try: return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") except ValueError: pass return None def classify_line(line: str) -> tuple[str, dict] | None: """Return (event_type, extra_fields) if line matches a known pattern, else None.""" for pattern, event_type, extractor in PATTERNS: m = pattern.search(line) if m: try: extra = extractor(m) except Exception: extra = {} return event_type, extra return None def get_recent_events(log_dir: str | Path, max_events: int = 50) -> list[dict]: """Parse today's events from all log files in log_dir.""" log_dir = Path(log_dir) today = date.today().isoformat() events = [] for log_file in log_dir.glob("*.log"): source = log_file.stem try: with open(log_file, "r", errors="replace") as f: for line in f: line = line.strip() if not line or today not in line: continue ts = parse_timestamp(line) if ts is None or ts.date().isoformat() != today: continue result = classify_line(line) if result: event_type, extra = result raw_msg = line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip() event = { "time": ts.strftime("%H:%M:%S"), "timestamp": ts.isoformat(), "type": event_type, "source": source, "raw": raw_msg, **extra, } events.append(event) except (OSError, PermissionError): continue events.sort(key=lambda e: e["timestamp"], reverse=True) return events[:max_events] def tail_logs(log_dir: str | Path) -> dict[str, int]: """Return current file positions (sizes) for SSE polling.""" log_dir = Path(log_dir) positions = {} for log_file in log_dir.glob("*.log"): try: positions[str(log_file)] = log_file.stat().st_size except OSError: positions[str(log_file)] = 0 return positions def get_new_lines(log_dir: str | Path, positions: dict[str, int]) -> tuple[list[dict], dict[str, int]]: """Read new lines since last positions. Returns (new_events, updated_positions).""" log_dir = Path(log_dir) today = date.today().isoformat() new_events = [] new_positions = dict(positions) for log_file in log_dir.glob("*.log"): path_str = str(log_file) old_pos = positions.get(path_str, 0) try: current_size = log_file.stat().st_size except OSError: continue if current_size <= old_pos: new_positions[path_str] = current_size continue source = log_file.stem try: with open(log_file, "r", errors="replace") as f: f.seek(old_pos) for line in f: line = line.strip() if not line or today not in line: continue ts = parse_timestamp(line) if ts is None: continue result = classify_line(line) if result: event_type, extra = result raw_msg = line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip() new_events.append({ "time": ts.strftime("%H:%M:%S"), "timestamp": ts.isoformat(), "type": event_type, "source": source, "raw": raw_msg, **extra, }) new_positions[path_str] = current_size except (OSError, PermissionError): continue new_events.sort(key=lambda e: e["timestamp"], reverse=True) return new_events, new_positions