Files
homelab-optimized/dashboard/api/log_parser.py
Gitea Mirror Bot b25f28559d
Some checks failed
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Documentation / Build Docusaurus (push) Has been cancelled
Sanitized mirror from private repository - 2026-04-05 05:32:08 UTC
2026-04-05 05:32:08 +00:00

136 lines
4.9 KiB
Python

"""Parse automation log files into structured events for the dashboard."""
import os
import re
from datetime import datetime, date
from pathlib import Path
# Patterns to match interesting log lines
PATTERNS = [
(re.compile(r"→ receipts|→ newsletters|→ work|→ personal|→ accounts"), "email"),
(re.compile(r"Stack-restart check complete"), "restart_check"),
(re.compile(r"Backup Validation: OK|Backup Report"), "backup"),
(re.compile(r"Cached:\s*\d+"), "cache"),
(re.compile(r"[Uu]nhealthy"), "unhealthy"),
(re.compile(r"Restarting container|restart_container"), "container_restart"),
(re.compile(r"ERROR|CRITICAL"), "error"),
(re.compile(r"Starting .+ check|Starting .+ organizer"), "start"),
(re.compile(r"drifts? found|No drifts found"), "drift"),
(re.compile(r"emails? downloaded|backup: \d+ total"), "backup_progress"),
]
# Timestamp pattern at the start of log lines
TS_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2}[\sT_]\d{2}:\d{2}:\d{2})")
def parse_timestamp(line: str) -> datetime | None:
"""Extract timestamp from a log line."""
m = TS_PATTERN.match(line)
if m:
ts_str = m.group(1).replace("_", " ").replace("T", " ")
try:
return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
pass
return None
def classify_line(line: str) -> str | None:
"""Return event type if line matches a known pattern, else None."""
for pattern, event_type in PATTERNS:
if pattern.search(line):
return event_type
return None
def get_recent_events(log_dir: str | Path, max_events: int = 50) -> list[dict]:
"""Parse today's events from all log files in log_dir."""
log_dir = Path(log_dir)
today = date.today().isoformat()
events = []
for log_file in log_dir.glob("*.log"):
source = log_file.stem
try:
with open(log_file, "r", errors="replace") as f:
for line in f:
line = line.strip()
if not line or today not in line:
continue
ts = parse_timestamp(line)
if ts is None or ts.date().isoformat() != today:
continue
event_type = classify_line(line)
if event_type:
events.append({
"time": ts.strftime("%H:%M:%S"),
"timestamp": ts.isoformat(),
"type": event_type,
"source": source,
"message": line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip(),
})
except (OSError, PermissionError):
continue
events.sort(key=lambda e: e["timestamp"], reverse=True)
return events[:max_events]
def tail_logs(log_dir: str | Path) -> dict[str, int]:
"""Return current file positions (sizes) for SSE polling."""
log_dir = Path(log_dir)
positions = {}
for log_file in log_dir.glob("*.log"):
try:
positions[str(log_file)] = log_file.stat().st_size
except OSError:
positions[str(log_file)] = 0
return positions
def get_new_lines(log_dir: str | Path, positions: dict[str, int]) -> tuple[list[dict], dict[str, int]]:
"""Read new lines since last positions. Returns (new_events, updated_positions)."""
log_dir = Path(log_dir)
today = date.today().isoformat()
new_events = []
new_positions = dict(positions)
for log_file in log_dir.glob("*.log"):
path_str = str(log_file)
old_pos = positions.get(path_str, 0)
try:
current_size = log_file.stat().st_size
except OSError:
continue
if current_size <= old_pos:
new_positions[path_str] = current_size
continue
source = log_file.stem
try:
with open(log_file, "r", errors="replace") as f:
f.seek(old_pos)
for line in f:
line = line.strip()
if not line or today not in line:
continue
ts = parse_timestamp(line)
if ts is None:
continue
event_type = classify_line(line)
if event_type:
new_events.append({
"time": ts.strftime("%H:%M:%S"),
"timestamp": ts.isoformat(),
"type": event_type,
"source": source,
"message": line[len(ts.isoformat().split("T")[0]) + 1:].strip().lstrip(",").strip(),
})
new_positions[path_str] = current_size
except (OSError, PermissionError):
continue
new_events.sort(key=lambda e: e["timestamp"], reverse=True)
return new_events, new_positions