Files
homelab-optimized/dashboard/api/routers/automations.py
Gitea Mirror Bot 2be8f1fe17
Some checks failed
Documentation / Build Docusaurus (push) Failing after 5m1s
Documentation / Deploy to GitHub Pages (push) Has been skipped
Sanitized mirror from private repository - 2026-04-05 08:31:50 UTC
2026-04-05 08:31:50 +00:00

147 lines
4.7 KiB
Python

"""Automation status: email organizers, stack restarts, backup, drift."""
import sqlite3
from datetime import date
from pathlib import Path
from fastapi import APIRouter
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from lib_bridge import GMAIL_DB, DVISH_DB, PROTON_DB, RESTART_DB, LOG_DIR
router = APIRouter(tags=["automations"])
def _query_email_db(db_path: Path, name: str) -> dict:
"""Query a processed.db for today's category counts and sender_cache stats."""
if not db_path.exists():
return {"name": name, "exists": False}
today = date.today().isoformat()
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
# Today's category counts
cur = conn.execute(
"SELECT category, COUNT(*) as cnt FROM processed "
"WHERE processed_at LIKE ? GROUP BY category",
(f"{today}%",),
)
categories = {row["category"]: row["cnt"] for row in cur}
# Total processed today
cur = conn.execute(
"SELECT COUNT(*) FROM processed WHERE processed_at LIKE ?",
(f"{today}%",),
)
total_today = cur.fetchone()[0]
# Sender cache stats
cur = conn.execute("SELECT COUNT(*) FROM sender_cache")
cache_size = cur.fetchone()[0]
cur = conn.execute(
"SELECT category, COUNT(*) as cnt FROM sender_cache GROUP BY category"
)
cache_by_category = {row["category"]: row["cnt"] for row in cur}
conn.close()
return {
"name": name,
"exists": True,
"today_total": total_today,
"today_categories": categories,
"sender_cache_size": cache_size,
"sender_cache_categories": cache_by_category,
}
except Exception as e:
return {"name": name, "exists": True, "error": str(e)}
@router.get("/automations/email")
def email_status():
"""Email organizer status for all 3 accounts."""
accounts = [
_query_email_db(GMAIL_DB, "gmail"),
_query_email_db(DVISH_DB, "dvish"),
_query_email_db(PROTON_DB, "proton"),
]
return {"accounts": accounts}
@router.get("/automations/restarts")
def restart_status():
"""Recent unhealthy container tracking entries."""
if not RESTART_DB.exists():
return {"entries": [], "count": 0}
try:
conn = sqlite3.connect(f"file:{RESTART_DB}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
cur = conn.execute(
"SELECT * FROM unhealthy_tracking ORDER BY last_checked DESC LIMIT 50"
)
entries = [dict(row) for row in cur]
conn.close()
return {"entries": entries, "count": len(entries)}
except Exception as e:
return {"entries": [], "count": 0, "error": str(e)}
@router.get("/automations/backup")
def backup_status():
"""Parse today's backup log for status."""
log_file = LOG_DIR / "gmail-backup-daily.log"
if not log_file.exists():
return {"status": "no_log", "entries": []}
today = date.today().isoformat()
entries = []
has_error = False
try:
with open(log_file, "r", errors="replace") as f:
for line in f:
if today in line:
entries.append(line.strip())
if "ERROR" in line.upper():
has_error = True
except OSError:
return {"status": "read_error", "entries": []}
return {
"status": "error" if has_error else ("ok" if entries else "no_entries_today"),
"entries": entries[-20:], # Last 20 today entries
"has_errors": has_error,
}
@router.get("/automations/drift")
def drift_status():
"""Parse config-drift.log for last result."""
log_file = LOG_DIR / "config-drift.log"
if not log_file.exists():
return {"status": "no_log", "last_result": None}
try:
with open(log_file, "r", errors="replace") as f:
lines = f.readlines()
# Find the last meaningful result
for line in reversed(lines):
line = line.strip()
if "No drifts found" in line:
return {"status": "clean", "last_result": "No drifts found", "drifts": 0}
if "drift" in line.lower():
# Try to extract count
import re
m = re.search(r"(\d+)\s+drifts?", line)
count = int(m.group(1)) if m else -1
return {"status": "drifted", "last_result": line, "drifts": count}
return {"status": "unknown", "last_result": lines[-1].strip() if lines else None}
except OSError:
return {"status": "read_error", "last_result": None}