Files
homelab-optimized/dashboard/api/routers/overview.py
Gitea Mirror Bot 1f453f302a
Some checks failed
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Documentation / Build Docusaurus (push) Has been cancelled
Sanitized mirror from private repository - 2026-04-05 10:10:45 UTC
2026-04-05 10:10:45 +00:00

427 lines
16 KiB
Python

"""Overview stats and SSE activity stream."""
import asyncio
import json
import subprocess
import sqlite3
from datetime import date
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
import httpx
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from lib_bridge import (
portainer_list_containers, ENDPOINTS, ollama_available,
GMAIL_DB, DVISH_DB, PROTON_DB, RESTART_DB, LOG_DIR, OLLAMA_URL,
)
from log_parser import get_recent_events, tail_logs, get_new_lines
router = APIRouter(tags=["overview"])
def _count_today_emails(db_path: Path) -> int:
"""Count emails processed today from a processed.db file."""
if not db_path.exists():
return 0
try:
today = date.today().isoformat()
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
cur = conn.execute(
"SELECT COUNT(*) FROM processed WHERE processed_at LIKE ?",
(f"{today}%",),
)
count = cur.fetchone()[0]
conn.close()
return count
except Exception:
return 0
def _count_unhealthy(db_path: Path) -> int:
"""Count unhealthy containers from stack-restart.db."""
if not db_path.exists():
return 0
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
cur = conn.execute("SELECT COUNT(*) FROM unhealthy_tracking")
count = cur.fetchone()[0]
conn.close()
return count
except Exception:
return 0
def _gpu_info() -> dict:
"""Get GPU info from olares via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=3", "olares",
"nvidia-smi --query-gpu=temperature.gpu,power.draw,power.limit,"
"memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
return {"available": False}
parts = [p.strip() for p in result.stdout.strip().split(",")]
def _f(v):
try:
return float(v)
except (ValueError, TypeError):
return None
if len(parts) >= 6:
return {
"available": True,
"temp_c": _f(parts[0]),
"power_draw_w": _f(parts[1]),
"power_limit_w": _f(parts[2]),
"memory_used_mb": _f(parts[3]),
"memory_total_mb": _f(parts[4]),
"utilization_pct": _f(parts[5]),
}
except Exception:
pass
return {"available": False}
@router.get("/stats/overview")
def stats_overview():
"""Aggregate overview stats."""
# Container counts
container_counts = {}
total = 0
for ep_name in ENDPOINTS:
try:
containers = portainer_list_containers(ep_name)
running = sum(1 for c in containers if c.get("State") == "running")
container_counts[ep_name] = {"total": len(containers), "running": running}
total += len(containers)
except Exception:
container_counts[ep_name] = {"total": 0, "running": 0, "error": True}
# GPU
gpu = _gpu_info()
# Email counts
email_today = {
"gmail": _count_today_emails(GMAIL_DB),
"dvish": _count_today_emails(DVISH_DB),
"proton": _count_today_emails(PROTON_DB),
}
email_today["total"] = sum(email_today.values())
# Unhealthy
unhealthy = _count_unhealthy(RESTART_DB)
# Ollama
ollama_up = ollama_available(OLLAMA_URL)
return {
"containers": {"total": total, "by_endpoint": container_counts},
"gpu": gpu,
"email_today": email_today,
"unhealthy_count": unhealthy,
"ollama_available": ollama_up,
}
@router.get("/activity")
async def activity_stream():
"""SSE stream of today's automation events."""
async def event_generator():
# Send initial batch
events = get_recent_events(LOG_DIR)
yield {"event": "init", "data": json.dumps(events)}
# Poll for new events
positions = tail_logs(LOG_DIR)
while True:
await asyncio.sleep(5)
new_events, positions = get_new_lines(LOG_DIR, positions)
if new_events:
yield {"event": "update", "data": json.dumps(new_events)}
return EventSourceResponse(event_generator())
@router.post("/actions/pause-organizers")
def pause_organizers():
"""Pause all email organizer cron jobs."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "stop"],
capture_output=True, text=True, timeout=10,
)
return {"success": result.returncode == 0, "output": result.stdout.strip()}
@router.post("/actions/resume-organizers")
def resume_organizers():
"""Resume all email organizer cron jobs."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "start"],
capture_output=True, text=True, timeout=10,
)
return {"success": result.returncode == 0, "output": result.stdout.strip()}
@router.get("/actions/organizer-status")
def organizer_status():
"""Check if organizers are running or paused."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "status"],
capture_output=True, text=True, timeout=10,
)
return {"output": result.stdout.strip()}
@router.get("/calendar")
def get_calendar_events():
"""Fetch upcoming events from Baikal CalDAV."""
import re
from datetime import datetime, timezone
BAIKAL_URL = "http://192.168.0.200:12852/dav.php/calendars/vish/default/"
BAIKAL_USER = "vish"
BAIKAL_PASS = "REDACTED_PASSWORD"
today = datetime.now(timezone.utc).strftime("%Y%m%dT000000Z")
body = f'''<?xml version="1.0" encoding="UTF-8"?>
<c:calendar-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop><d:getetag/><c:calendar-data/></d:prop>
<c:filter>
<c:comp-filter name="VCALENDAR">
<c:comp-filter name="VEVENT">
<c:time-range start="{today}"/>
</c:comp-filter>
</c:comp-filter>
</c:filter>
</c:calendar-query>'''
try:
auth = httpx.DigestAuth(BAIKAL_USER, BAIKAL_PASS)
with httpx.Client(timeout=10) as client:
r = client.request("REPORT", BAIKAL_URL, content=body,
headers={"Content-Type": "application/xml", "Depth": "1"}, auth=auth)
r.raise_for_status()
# Parse iCal events
summaries = re.findall(r'SUMMARY:(.*?)(?:\r?\n)', r.text)
starts = re.findall(r'DTSTART[^:]*:(.*?)(?:\r?\n)', r.text)
locations = re.findall(r'LOCATION:(.*?)(?:\r?\n)', r.text)
events = []
now = datetime.now(timezone.utc)
for i, (start, summary) in enumerate(zip(starts, summaries)):
# Parse date — handle both date and datetime formats
try:
if len(start) == 8:
dt = datetime.strptime(start, "%Y%m%d").replace(tzinfo=timezone.utc)
else:
clean = start.replace("Z", "")
dt = datetime.strptime(clean[:15], "%Y%m%dT%H%M%S").replace(tzinfo=timezone.utc)
except ValueError:
continue
# Only future events
if dt < now:
continue
# Clean up summary (unescape iCal)
clean_summary = summary.replace("\\,", ",").replace("\\;", ";").replace("&amp;", "&")
events.append({
"summary": clean_summary,
"start": dt.isoformat(),
"date": dt.strftime("%b %d"),
"time": dt.strftime("%I:%M %p") if len(start) > 8 else "All day",
"location": locations[i].replace("\\,", ",").replace("\\n", ", ") if i < len(locations) else None,
})
# Sort by date, limit to next 8
events.sort(key=lambda e: e["start"])
return {"events": events[:8], "total": len(events)}
except Exception as e:
return {"events": [], "error": str(e)}
def _search_repo_docs(query: str, max_chars: int = 2000) -> str:
"""Search repo docs/scripts for relevant snippets. Lightweight keyword match."""
import re
repo = Path("/app/scripts").parent if Path("/app/scripts").exists() else Path(__file__).parent.parent.parent.parent
search_dirs = [repo / "docs" / "services" / "individual", repo / "scripts", repo / "docs"]
keywords = [w.lower() for w in re.findall(r'\w{3,}', query) if w.lower() not in {
"the", "how", "what", "does", "can", "are", "this", "that", "have",
"many", "much", "about", "from", "with", "your", "there", "which",
}]
if not keywords:
return ""
# Add aliases so related terms find each other
aliases = {"tailscale": "headscale", "headscale": "tailscale", "gpu": "nvidia",
"jellyfin": "olares", "containers": "portainer", "dns": "adguard"}
extra = [aliases[k] for k in keywords if k in aliases]
keywords = list(set(keywords + extra))
scored = []
for search_dir in search_dirs:
if not search_dir.exists():
continue
for f in search_dir.rglob("*.md"):
try:
text = f.read_text(errors="ignore")[:8000]
score = sum(text.lower().count(kw) for kw in keywords)
if score > 0:
scored.append((score, f, text))
except Exception:
continue
for f in search_dir.rglob("*.py"):
if f.name.startswith("__"):
continue
try:
# Only read the docstring/header, not full scripts
text = f.read_text(errors="ignore")[:1000]
score = sum(text.lower().count(kw) for kw in keywords)
if score > 0:
scored.append((score, f, text))
except Exception:
continue
if not scored:
return ""
scored.sort(key=lambda x: -x[0])
snippets = []
total = 0
for _, path, text in scored[:2]: # max 2 files
# Trim to relevant section — find paragraphs with keywords
lines = text.split("\n")
relevant = []
for i, line in enumerate(lines):
if any(kw in line.lower() for kw in keywords):
start = max(0, i - 2)
end = min(len(lines), i + 5)
relevant.extend(lines[start:end])
snippet = "\n".join(dict.fromkeys(relevant))[:1000] # dedup, cap at 1K
if not snippet.strip():
snippet = text[:500]
snippets.append(f"[{path.name}]\n{snippet}")
total += len(snippet)
if total >= max_chars:
break
return "\n\n".join(snippets)
@router.post("/chat")
def chat_with_ollama(body: dict):
"""Chat with Ollama using live homelab context + repo docs."""
message = body.get("message", "")
if not message:
return {"error": "No message provided"}
# Gather live context from multiple sources
context_parts = []
try:
overview = get_overview()
containers = overview.get("containers", {})
gpu = overview.get("gpu", {})
context_parts.append(
f"Containers: {containers.get('total', '?')} total across endpoints: "
+ ", ".join(f"{k} ({v.get('total','?')} containers, {v.get('running','?')} running)"
for k, v in containers.get("by_endpoint", {}).items())
)
if gpu.get("available"):
context_parts.append(
f"GPU: {gpu.get('name','RTX 5090')}, {gpu.get('temp_c','?')}°C, "
f"{gpu.get('memory_used_mb','?')}/{gpu.get('memory_total_mb','?')} MB VRAM, "
f"{gpu.get('utilization_pct','?')}% util"
)
email_data = overview.get("email_today", {})
if isinstance(email_data, dict):
context_parts.append(f"Emails today: {email_data.get('total', 0)} (dvish: {email_data.get('dvish', 0)}, proton: {email_data.get('proton', 0)})")
context_parts.append(f"Ollama: {'online' if overview.get('ollama_available') else 'offline'}")
context_parts.append(f"Unhealthy containers: {overview.get('unhealthy_count', 0)}")
except Exception:
context_parts.append("(could not fetch live stats)")
# Fetch Headscale nodes if question mentions network/tailscale/headscale/nodes
msg_lower = message.lower()
if any(kw in msg_lower for kw in ["tailscale", "headscale", "node", "mesh", "vpn", "network"]):
try:
import json as _json
hs_result = subprocess.run(
["ssh", "-o", "ConnectTimeout=3", "calypso",
"/usr/local/bin/docker exec headscale headscale nodes list -o json"],
capture_output=True, text=True, timeout=10,
)
if hs_result.returncode == 0:
nodes = _json.loads(hs_result.stdout)
online = [n for n in nodes if n.get("online")]
node_names = ", ".join(n.get("givenName") or n.get("name", "?") for n in nodes)
context_parts.append(f"Headscale/Tailscale: {len(nodes)} nodes ({len(online)} online): {node_names}")
else:
context_parts.append("Headscale: 26 nodes (could not fetch live list, but documented as 26)")
except Exception:
context_parts.append("Headscale: 26 nodes (documented, could not fetch live)")
# Fetch Jellyfin status if question mentions media/jellyfin/streaming
if any(kw in msg_lower for kw in ["jellyfin", "media", "stream", "movie", "tv", "playing"]):
try:
from routers.media import jellyfin_status
jf = jellyfin_status()
libs = ", ".join(f"{l['name']} ({l['type']})" for l in jf.get("libraries", []))
active = jf.get("active_sessions", [])
playing = ", ".join(f"{s['title']} by {s['user']}" for s in active) if active else "nothing"
context_parts.append(f"Jellyfin v{jf.get('version','?')}: libraries={libs}. Now playing: {playing}")
except Exception:
pass
# Fetch AdGuard stats if question mentions dns/adguard/blocked
if any(kw in msg_lower for kw in ["dns", "adguard", "blocked", "queries", "domain"]):
try:
from routers.network import adguard_stats
ag = adguard_stats()
context_parts.append(f"AdGuard DNS: {ag.get('total_queries', '?')} total queries, {ag.get('blocked', '?')} blocked, {ag.get('avg_time', '?')}s avg response")
except Exception:
pass
system_context = (
"You are a homelab assistant. You have direct access to the following live infrastructure data:\n\n"
+ "\n".join(f"- {p}" for p in context_parts)
+ "\n\n"
"Homelab hosts: Atlantis (Synology NAS, media/arr stack), Calypso (Synology, AdGuard DNS, Headscale, Authentik SSO), "
"Olares (K3s, RTX 5090, Jellyfin, Ollama), NUC (lightweight services), RPi5 (Uptime Kuma), "
"homelab-vm (Prometheus, Grafana, dashboard), Guava (TrueNAS), Seattle (remote VM), matrix-ubuntu (NPM, CrowdSec).\n\n"
"Services: Sonarr, Radarr, SABnzbd, Deluge, Prowlarr, Bazarr, Lidarr, Tdarr, Audiobookshelf, LazyLibrarian on Atlantis. "
"Jellyfin + Ollama on Olares with GPU transcoding. 3 email auto-organizers (Gmail x2 + Proton). "
"11 Ollama-powered automation scripts. Gitea CI with AI PR reviewer.\n\n"
"IMPORTANT: Answer using the LIVE DATA above, not general knowledge. The container counts are REAL numbers from Portainer right now. "
"When asked 'how many containers on atlantis' answer with the exact number from the live data (e.g. 59). Be concise."
)
# Search repo docs for relevant context (max 2K chars)
doc_context = _search_repo_docs(message, max_chars=2000)
if doc_context:
system_context += f"\n\nRelevant documentation:\n{doc_context}"
prompt = f"{system_context}\n\nUser: {message}\nAssistant:"
try:
from lib_bridge import ollama_available as _ollama_check
if not _ollama_check():
return {"response": "Ollama is currently offline. Try again later."}
import sys as _sys
scripts_dir = str(Path("/app/scripts") if Path("/app/scripts").exists() else Path(__file__).parent.parent.parent / "scripts")
if scripts_dir not in _sys.path:
_sys.path.insert(0, scripts_dir)
from lib.ollama import ollama_generate
response = ollama_generate(prompt, num_predict=800, timeout=90)
return {"response": response}
except Exception as e:
return {"error": str(e)}