"""Overview stats and SSE activity stream.""" import asyncio import json import subprocess import sqlite3 from datetime import date from fastapi import APIRouter from sse_starlette.sse import EventSourceResponse import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from lib_bridge import ( portainer_list_containers, ENDPOINTS, ollama_available, GMAIL_DB, DVISH_DB, PROTON_DB, RESTART_DB, LOG_DIR, OLLAMA_URL, ) from log_parser import get_recent_events, tail_logs, get_new_lines router = APIRouter(tags=["overview"]) def _count_today_emails(db_path: Path) -> int: """Count emails processed today from a processed.db file.""" if not db_path.exists(): return 0 try: today = date.today().isoformat() conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) cur = conn.execute( "SELECT COUNT(*) FROM processed WHERE processed_at LIKE ?", (f"{today}%",), ) count = cur.fetchone()[0] conn.close() return count except Exception: return 0 def _count_unhealthy(db_path: Path) -> int: """Count unhealthy containers from stack-restart.db.""" if not db_path.exists(): return 0 try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) cur = conn.execute("SELECT COUNT(*) FROM unhealthy_tracking") count = cur.fetchone()[0] conn.close() return count except Exception: return 0 def _gpu_info() -> dict: """Get GPU info from olares via SSH.""" try: result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "olares", "nvidia-smi --query-gpu=temperature.gpu,power.draw,power.limit," "memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits"], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return {"available": False} parts = [p.strip() for p in result.stdout.strip().split(",")] def _f(v): try: return float(v) except (ValueError, TypeError): return None if len(parts) >= 6: return { "available": True, "temp_c": _f(parts[0]), "power_draw_w": _f(parts[1]), "power_limit_w": _f(parts[2]), "memory_used_mb": _f(parts[3]), "memory_total_mb": _f(parts[4]), "utilization_pct": _f(parts[5]), } except Exception: pass return {"available": False} @router.get("/stats/overview") def stats_overview(): """Aggregate overview stats.""" # Container counts container_counts = {} total = 0 for ep_name in ENDPOINTS: try: containers = portainer_list_containers(ep_name) running = sum(1 for c in containers if c.get("State") == "running") container_counts[ep_name] = {"total": len(containers), "running": running} total += len(containers) except Exception: container_counts[ep_name] = {"total": 0, "running": 0, "error": True} # GPU gpu = _gpu_info() # Email counts email_today = { "gmail": _count_today_emails(GMAIL_DB), "dvish": _count_today_emails(DVISH_DB), "proton": _count_today_emails(PROTON_DB), } email_today["total"] = sum(email_today.values()) # Unhealthy unhealthy = _count_unhealthy(RESTART_DB) # Ollama ollama_up = ollama_available(OLLAMA_URL) return { "containers": {"total": total, "by_endpoint": container_counts}, "gpu": gpu, "email_today": email_today, "unhealthy_count": unhealthy, "ollama_available": ollama_up, } @router.get("/activity") async def activity_stream(): """SSE stream of today's automation events.""" async def event_generator(): # Send initial batch events = get_recent_events(LOG_DIR) yield {"event": "init", "data": json.dumps(events)} # Poll for new events positions = tail_logs(LOG_DIR) while True: await asyncio.sleep(5) new_events, positions = get_new_lines(LOG_DIR, positions) if new_events: yield {"event": "update", "data": json.dumps(new_events)} return EventSourceResponse(event_generator())