"""Parse automation log files into structured events for the dashboard.""" import os import re from datetime import datetime, date from pathlib import Path # Patterns to match interesting log lines PATTERNS = [ (re.compile(r"→ receipts|→ newsletters|→ work|→ personal|→ accounts"), "email"), (re.compile(r"Stack-restart check complete"), "restart_check"), (re.compile(r"Backup Validation: OK|Backup Report"), "backup"), (re.compile(r"Cached:\s*\d+"), "cache"), (re.compile(r"[Uu]nhealthy"), "unhealthy"), (re.compile(r"Restarting container|restart_container"), "container_restart"), (re.compile(r"ERROR|CRITICAL"), "error"), (re.compile(r"Starting .+ check|Starting .+ organizer"), "start"), (re.compile(r"drifts? found|No drifts found"), "drift"), (re.compile(r"emails? downloaded|backup: \d+ total"), "backup_progress"), ] # Timestamp pattern at the start of log lines TS_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}[\sT_]\d{2}:\d{2}:\d{2})") def parse_timestamp(line: str) -> datetime | None: """Extract timestamp from a log line.""" m = TS_PATTERN.match(line) if m: ts_str = m.group(1).replace("_", " ").replace("T", " ") try: return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") except ValueError: pass return None def classify_line(line: str) -> str | None: """Return event type if line matches a known pattern, else None.""" for pattern, event_type in PATTERNS: if pattern.search(line): return event_type return None def get_recent_events(log_dir: str | Path, max_events: int = 50) -> list[dict]: """Parse today's events from all log files in log_dir.""" log_dir = Path(log_dir) today = date.today().isoformat() events = [] for log_file in log_dir.glob("*.log"): source = log_file.stem try: with open(log_file, "r", errors="replace") as f: for line in f: line = line.strip() if not line or today not in line: continue ts = parse_timestamp(line) if ts is None or ts.date().isoformat() != today: continue event_type = classify_line(line) if event_type: events.append({ "time": ts.strftime("%H:%M:%S"), "timestamp": ts.isoformat(), "type": event_type, "source": source, "message": line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip(), }) except (OSError, PermissionError): continue events.sort(key=lambda e: e["timestamp"], reverse=True) return events[:max_events] def tail_logs(log_dir: str | Path) -> dict[str, int]: """Return current file positions (sizes) for SSE polling.""" log_dir = Path(log_dir) positions = {} for log_file in log_dir.glob("*.log"): try: positions[str(log_file)] = log_file.stat().st_size except OSError: positions[str(log_file)] = 0 return positions def get_new_lines(log_dir: str | Path, positions: dict[str, int]) -> tuple[list[dict], dict[str, int]]: """Read new lines since last positions. Returns (new_events, updated_positions).""" log_dir = Path(log_dir) today = date.today().isoformat() new_events = [] new_positions = dict(positions) for log_file in log_dir.glob("*.log"): path_str = str(log_file) old_pos = positions.get(path_str, 0) try: current_size = log_file.stat().st_size except OSError: continue if current_size <= old_pos: new_positions[path_str] = current_size continue source = log_file.stem try: with open(log_file, "r", errors="replace") as f: f.seek(old_pos) for line in f: line = line.strip() if not line or today not in line: continue ts = parse_timestamp(line) if ts is None: continue event_type = classify_line(line) if event_type: new_events.append({ "time": ts.strftime("%H:%M:%S"), "timestamp": ts.isoformat(), "type": event_type, "source": source, "message": line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip(), }) new_positions[path_str] = current_size except (OSError, PermissionError): continue new_events.sort(key=lambda e: e["timestamp"], reverse=True) return new_events, new_positions