Sanitized mirror from private repository - 2026-04-16 09:26:39 UTC
This commit is contained in:
202
scripts/backup-validator.py
Normal file
202
scripts/backup-validator.py
Normal file
@@ -0,0 +1,202 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Validate daily Gmail backup logs and send report via email.
|
||||
|
||||
Reads /tmp/gmail-backup-daily.log, checks for today's entries,
|
||||
validates no errors and positive backup counts. Optionally uses
|
||||
Ollama LLM to summarize the log.
|
||||
|
||||
Cron: 0 4 * * * /usr/bin/python3 /home/homelab/organized/repos/homelab/scripts/backup-validator.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError
|
||||
from lib.notify import send_email, send_ntfy
|
||||
|
||||
# ── config ───────────────────────────────────────────────────────────────────
|
||||
|
||||
LOG_FILE = "/tmp/gmail-backup-daily.log"
|
||||
TIMEZONE = ZoneInfo("America/Los_Angeles")
|
||||
NTFY_TOPIC = "REDACTED_NTFY_TOPIC"
|
||||
LLM_TAIL_LINES = 50
|
||||
|
||||
log = logging.getLogger("backup-validator")
|
||||
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def read_log(path: str) -> list[str]:
|
||||
"""Read all lines from the backup log file."""
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
raise FileNotFoundError(f"Log file not found: {path}")
|
||||
return p.read_text().splitlines()
|
||||
|
||||
|
||||
def filter_today_entries(lines: list[str], today_str: str) -> list[str]:
|
||||
"""Filter log lines that contain today's date string."""
|
||||
return [line for line in lines if today_str in line]
|
||||
|
||||
|
||||
def check_errors(lines: list[str]) -> list[str]:
|
||||
"""Return lines containing ERROR (case-insensitive)."""
|
||||
return [line for line in lines if re.search(r"\bERROR\b", line, re.IGNORECASE)]
|
||||
|
||||
|
||||
def extract_backup_counts(lines: list[str]) -> list[int]:
|
||||
"""Extract numeric counts from lines like 'X new emails backed up' or 'backed up X emails'."""
|
||||
counts = []
|
||||
for line in lines:
|
||||
m = re.search(r"(\d+)\s+(?:new\s+)?emails?\s+backed\s+up", line, re.IGNORECASE)
|
||||
if m:
|
||||
counts.append(int(m.group(1)))
|
||||
m = re.search(r"backed\s+up\s+(\d+)", line, re.IGNORECASE)
|
||||
if m:
|
||||
counts.append(int(m.group(1)))
|
||||
return counts
|
||||
|
||||
|
||||
def validate(log_path: str) -> dict:
|
||||
"""Run all validation checks. Returns a result dict."""
|
||||
result = {
|
||||
"ok": True,
|
||||
"issues": [],
|
||||
"today_lines": 0,
|
||||
"error_lines": [],
|
||||
"backup_counts": [],
|
||||
"tail_lines": [],
|
||||
}
|
||||
|
||||
try:
|
||||
all_lines = read_log(log_path)
|
||||
except FileNotFoundError as e:
|
||||
result["ok"] = False
|
||||
result["issues"].append(str(e))
|
||||
return result
|
||||
|
||||
today_str = datetime.now(TIMEZONE).strftime("%Y-%m-%d")
|
||||
today_lines = filter_today_entries(all_lines, today_str)
|
||||
result["today_lines"] = len(today_lines)
|
||||
result["tail_lines"] = all_lines[-LLM_TAIL_LINES:]
|
||||
|
||||
if not today_lines:
|
||||
result["ok"] = False
|
||||
result["issues"].append(f"No entries found for today ({today_str})")
|
||||
return result
|
||||
|
||||
errors = check_errors(today_lines)
|
||||
if errors:
|
||||
result["ok"] = False
|
||||
result["error_lines"] = errors
|
||||
result["issues"].append(f"{len(errors)} ERROR line(s) found in today's entries")
|
||||
|
||||
counts = extract_backup_counts(today_lines)
|
||||
result["backup_counts"] = counts
|
||||
if not counts:
|
||||
result["ok"] = False
|
||||
result["issues"].append("No backup count lines found in today's entries")
|
||||
elif all(c == 0 for c in counts):
|
||||
result["ok"] = False
|
||||
result["issues"].append("All backup counts are 0")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def build_report(result: dict, llm_summary: str | None = None) -> str:
|
||||
"""Build a plain-text report from validation results."""
|
||||
lines = []
|
||||
status = "OK" if result["ok"] else "ISSUES FOUND"
|
||||
lines.append(f"Backup Validation: {status}")
|
||||
lines.append("=" * 50)
|
||||
lines.append(f"Today's log entries: {result['today_lines']}")
|
||||
|
||||
if result["backup_counts"]:
|
||||
lines.append(f"Backup counts: {result['backup_counts']}")
|
||||
|
||||
if result["issues"]:
|
||||
lines.append("")
|
||||
lines.append("Issues:")
|
||||
for issue in result["issues"]:
|
||||
lines.append(f" - {issue}")
|
||||
|
||||
if result["error_lines"]:
|
||||
lines.append("")
|
||||
lines.append("Error lines:")
|
||||
for err in result["error_lines"][:10]:
|
||||
lines.append(f" {err}")
|
||||
|
||||
if llm_summary:
|
||||
lines.append("")
|
||||
lines.append("LLM Analysis:")
|
||||
lines.append("-" * 40)
|
||||
lines.append(llm_summary)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Validate daily Gmail backup logs")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Print report without sending notifications")
|
||||
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
||||
)
|
||||
|
||||
log.info("Starting backup validation for %s", LOG_FILE)
|
||||
result = validate(LOG_FILE)
|
||||
status_label = "OK" if result["ok"] else "ISSUES FOUND"
|
||||
subject = f"Backup Report: {status_label}"
|
||||
|
||||
# LLM summary
|
||||
llm_summary = None
|
||||
if result["tail_lines"]:
|
||||
if ollama_available():
|
||||
log.info("Ollama available, requesting log summary")
|
||||
tail_text = "\n".join(result["tail_lines"])
|
||||
prompt = (
|
||||
"Summarize this email backup log in 3-5 bullet points. "
|
||||
"Report ONLY: total emails backed up, any ERROR or FAIL lines, "
|
||||
"and whether the backup completed. Do NOT flag normal things "
|
||||
"(trash count, folder sizes, starred messages) as concerns — "
|
||||
"those are expected. If everything looks fine, just say 'Backup OK' "
|
||||
"with the email count.\n\n"
|
||||
f"```\n{tail_text}\n```"
|
||||
)
|
||||
try:
|
||||
llm_summary = ollama_generate(prompt)
|
||||
log.info("LLM summary received (%d chars)", len(llm_summary))
|
||||
except OllamaUnavailableError as e:
|
||||
log.warning("Ollama became unavailable: %s", e)
|
||||
else:
|
||||
log.info("Ollama not available, sending raw validation results")
|
||||
|
||||
report = build_report(result, llm_summary)
|
||||
log.info("Report:\n%s", report)
|
||||
|
||||
if args.dry_run:
|
||||
print(report)
|
||||
return
|
||||
|
||||
try:
|
||||
send_email(subject=subject, text_body=report)
|
||||
log.info("Email sent: %s", subject)
|
||||
except Exception as e:
|
||||
log.error("Failed to send email: %s", e)
|
||||
|
||||
# Issues already reported via email above
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user