Files
homelab-optimized/dashboard/api/routers/overview.py
Gitea Mirror Bot 37ee54f6e9
Some checks failed
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Documentation / Build Docusaurus (push) Has been cancelled
Sanitized mirror from private repository - 2026-04-19 09:37:42 UTC
2026-04-19 09:37:42 +00:00

767 lines
30 KiB
Python

"""Overview stats and SSE activity stream."""
import asyncio
import json
import os
import subprocess
import sqlite3
from datetime import date, datetime, timezone
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
import httpx
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from lib_bridge import (
portainer_list_containers, ENDPOINTS, ollama_available,
GMAIL_DB, DVISH_DB, PROTON_DB, RESTART_DB, LOG_DIR, OLLAMA_URL,
prom_query,
)
from log_parser import get_recent_events, tail_logs, get_new_lines
router = APIRouter(tags=["overview"])
def _count_today_emails(db_path: Path) -> int:
"""Count emails processed today from a processed.db file."""
if not db_path.exists():
return 0
try:
today = date.today().isoformat()
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
cur = conn.execute(
"SELECT COUNT(*) FROM processed WHERE processed_at LIKE ?",
(f"{today}%",),
)
count = cur.fetchone()[0]
conn.close()
return count
except Exception:
return 0
def _count_unhealthy(db_path: Path) -> int:
"""Count unhealthy containers from stack-restart.db."""
if not db_path.exists():
return 0
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
cur = conn.execute("SELECT COUNT(*) FROM unhealthy_tracking")
count = cur.fetchone()[0]
conn.close()
return count
except Exception:
return 0
def _gpu_info() -> dict:
"""Get GPU info from olares via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=3", "olares",
"nvidia-smi --query-gpu=temperature.gpu,power.draw,power.limit,"
"memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
return {"available": False}
parts = [p.strip() for p in result.stdout.strip().split(",")]
def _f(v):
try:
return float(v)
except (ValueError, TypeError):
return None
if len(parts) >= 6:
return {
"available": True,
"temp_c": _f(parts[0]),
"power_draw_w": _f(parts[1]),
"power_limit_w": _f(parts[2]),
"memory_used_mb": _f(parts[3]),
"memory_total_mb": _f(parts[4]),
"utilization_pct": _f(parts[5]),
}
except Exception:
pass
return {"available": False}
@router.get("/stats/overview")
def stats_overview():
"""Aggregate overview stats."""
# Container counts
container_counts = {}
total = 0
for ep_name in ENDPOINTS:
try:
containers = portainer_list_containers(ep_name)
running = sum(1 for c in containers if c.get("State") == "running")
container_counts[ep_name] = {"total": len(containers), "running": running}
total += len(containers)
except Exception:
container_counts[ep_name] = {"total": 0, "running": 0, "error": True}
# GPU
gpu = _gpu_info()
# Email counts
email_today = {
"gmail": _count_today_emails(GMAIL_DB),
"dvish": _count_today_emails(DVISH_DB),
"proton": _count_today_emails(PROTON_DB),
}
email_today["total"] = sum(email_today.values())
# Unhealthy
unhealthy = _count_unhealthy(RESTART_DB)
# Ollama
ollama_up = ollama_available(OLLAMA_URL)
return {
"containers": {"total": total, "by_endpoint": container_counts},
"gpu": gpu,
"email_today": email_today,
"unhealthy_count": unhealthy,
"ollama_available": ollama_up,
}
@router.get("/activity")
async def activity_stream():
"""SSE stream of today's automation events."""
async def event_generator():
# Send initial batch
events = get_recent_events(LOG_DIR)
yield {"event": "init", "data": json.dumps(events)}
# Poll for new events
positions = tail_logs(LOG_DIR)
while True:
await asyncio.sleep(5)
new_events, positions = get_new_lines(LOG_DIR, positions)
if new_events:
yield {"event": "update", "data": json.dumps(new_events)}
return EventSourceResponse(event_generator())
@router.post("/actions/pause-organizers")
def pause_organizers():
"""Pause all email organizer cron jobs."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "stop"],
capture_output=True, text=True, timeout=10,
)
return {"success": result.returncode == 0, "output": result.stdout.strip()}
@router.post("/actions/resume-organizers")
def resume_organizers():
"""Resume all email organizer cron jobs."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "start"],
capture_output=True, text=True, timeout=10,
)
return {"success": result.returncode == 0, "output": result.stdout.strip()}
@router.get("/actions/organizer-status")
def organizer_status():
"""Check if organizers are running or paused."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "status"],
capture_output=True, text=True, timeout=10,
)
return {"output": result.stdout.strip()}
@router.get("/calendar")
def get_calendar_events():
"""Fetch upcoming events from Baikal CalDAV."""
import re
from datetime import datetime, timezone
BAIKAL_URL = "http://192.168.0.200:12852/dav.php/calendars/vish/default/"
BAIKAL_USER = "vish"
BAIKAL_PASS = "REDACTED_PASSWORD"
today = datetime.now(timezone.utc).strftime("%Y%m%dT000000Z")
body = f'''<?xml version="1.0" encoding="UTF-8"?>
<c:calendar-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop><d:getetag/><c:calendar-data/></d:prop>
<c:filter>
<c:comp-filter name="VCALENDAR">
<c:comp-filter name="VEVENT">
<c:time-range start="{today}"/>
</c:comp-filter>
</c:comp-filter>
</c:filter>
</c:calendar-query>'''
try:
auth = httpx.DigestAuth(BAIKAL_USER, BAIKAL_PASS)
with httpx.Client(timeout=10) as client:
r = client.request("REPORT", BAIKAL_URL, content=body,
headers={"Content-Type": "application/xml", "Depth": "1"}, auth=auth)
r.raise_for_status()
# Parse iCal events
summaries = re.findall(r'SUMMARY:(.*?)(?:\r?\n)', r.text)
starts = re.findall(r'DTSTART[^:]*:(.*?)(?:\r?\n)', r.text)
locations = re.findall(r'LOCATION:(.*?)(?:\r?\n)', r.text)
events = []
now = datetime.now(timezone.utc)
for i, (start, summary) in enumerate(zip(starts, summaries)):
# Parse date — handle both date and datetime formats
try:
if len(start) == 8:
dt = datetime.strptime(start, "%Y%m%d").replace(tzinfo=timezone.utc)
else:
clean = start.replace("Z", "")
dt = datetime.strptime(clean[:15], "%Y%m%dT%H%M%S").replace(tzinfo=timezone.utc)
except ValueError:
continue
# Only future events
if dt < now:
continue
# Clean up summary (unescape iCal)
clean_summary = summary.replace("\\,", ",").replace("\\;", ";").replace("&amp;", "&")
events.append({
"summary": clean_summary,
"start": dt.isoformat(),
"date": dt.strftime("%b %d"),
"time": dt.strftime("%I:%M %p") if len(start) > 8 else "All day",
"location": locations[i].replace("\\,", ",").replace("\\n", ", ") if i < len(locations) else None,
})
# Sort by date, limit to next 8
events.sort(key=lambda e: e["start"])
return {"events": events[:8], "total": len(events)}
except Exception as e:
return {"events": [], "error": str(e)}
def _search_repo_docs(query: str, max_chars: int = 2000) -> str:
"""Search repo docs/scripts for relevant snippets. Lightweight keyword match."""
import re
repo = Path("/app/scripts").parent if Path("/app/scripts").exists() else Path(__file__).parent.parent.parent.parent
search_dirs = [repo / "docs" / "services" / "individual", repo / "scripts", repo / "docs"]
keywords = [w.lower() for w in re.findall(r'\w{3,}', query) if w.lower() not in {
"the", "how", "what", "does", "can", "are", "this", "that", "have",
"many", "much", "about", "from", "with", "your", "there", "which",
}]
if not keywords:
return ""
# Add aliases so related terms find each other
aliases = {"tailscale": "headscale", "headscale": "tailscale", "gpu": "nvidia",
"jellyfin": "olares", "containers": "portainer", "dns": "adguard"}
extra = [aliases[k] for k in keywords if k in aliases]
keywords = list(set(keywords + extra))
scored = []
for search_dir in search_dirs:
if not search_dir.exists():
continue
for f in search_dir.rglob("*.md"):
try:
text = f.read_text(errors="ignore")[:8000]
score = sum(text.lower().count(kw) for kw in keywords)
if score > 0:
scored.append((score, f, text))
except Exception:
continue
for f in search_dir.rglob("*.py"):
if f.name.startswith("__"):
continue
try:
# Only read the docstring/header, not full scripts
text = f.read_text(errors="ignore")[:1000]
score = sum(text.lower().count(kw) for kw in keywords)
if score > 0:
scored.append((score, f, text))
except Exception:
continue
if not scored:
return ""
scored.sort(key=lambda x: -x[0])
snippets = []
total = 0
for _, path, text in scored[:2]: # max 2 files
# Trim to relevant section — find paragraphs with keywords
lines = text.split("\n")
relevant = []
for i, line in enumerate(lines):
if any(kw in line.lower() for kw in keywords):
start = max(0, i - 2)
end = min(len(lines), i + 5)
relevant.extend(lines[start:end])
snippet = "\n".join(dict.fromkeys(relevant))[:1000] # dedup, cap at 1K
if not snippet.strip():
snippet = text[:500]
snippets.append(f"[{path.name}]\n{snippet}")
total += len(snippet)
if total >= max_chars:
break
return "\n\n".join(snippets)
@router.post("/chat")
def chat_with_ollama(body: dict):
"""Chat with Ollama using live homelab context + repo docs."""
message = body.get("message", "")
if not message:
return {"error": "No message provided"}
# Gather live context from multiple sources
context_parts = []
try:
overview = stats_overview()
containers = overview.get("containers", {})
gpu = overview.get("gpu", {})
context_parts.append(
f"Containers: {containers.get('total', '?')} total across endpoints: "
+ ", ".join(f"{k} ({v.get('total','?')} containers, {v.get('running','?')} running)"
for k, v in containers.get("by_endpoint", {}).items())
)
if gpu.get("available"):
context_parts.append(
f"GPU: {gpu.get('name','RTX 5090')}, {gpu.get('temp_c','?')}°C, "
f"{gpu.get('memory_used_mb','?')}/{gpu.get('memory_total_mb','?')} MB VRAM, "
f"{gpu.get('utilization_pct','?')}% util"
)
email_data = overview.get("email_today", {})
if isinstance(email_data, dict):
context_parts.append(f"Emails today: {email_data.get('total', 0)} (dvish: {email_data.get('dvish', 0)}, proton: {email_data.get('proton', 0)})")
context_parts.append(f"Ollama: {'online' if overview.get('ollama_available') else 'offline'}")
context_parts.append(f"Unhealthy containers: {overview.get('unhealthy_count', 0)}")
except Exception:
context_parts.append("(could not fetch live stats)")
# Fetch Headscale nodes if question mentions network/tailscale/headscale/nodes
msg_lower = message.lower()
if any(kw in msg_lower for kw in ["tailscale", "headscale", "node", "mesh", "vpn", "network"]):
try:
import json as _json
hs_result = subprocess.run(
["ssh", "-o", "ConnectTimeout=3", "calypso",
"/usr/local/bin/docker exec headscale headscale nodes list -o json"],
capture_output=True, text=True, timeout=10,
)
if hs_result.returncode == 0:
nodes = _json.loads(hs_result.stdout)
online = [n for n in nodes if n.get("online")]
node_names = ", ".join(n.get("givenName") or n.get("name", "?") for n in nodes)
context_parts.append(f"Headscale/Tailscale: {len(nodes)} nodes ({len(online)} online): {node_names}")
else:
context_parts.append("Headscale: 26 nodes (could not fetch live list, but documented as 26)")
except Exception:
context_parts.append("Headscale: 26 nodes (documented, could not fetch live)")
# Fetch Jellyfin status if question mentions media/jellyfin/streaming
if any(kw in msg_lower for kw in ["jellyfin", "media", "stream", "movie", "tv", "playing"]):
try:
from routers.media import jellyfin_status
jf = jellyfin_status()
libs = ", ".join(f"{l['name']} ({l['type']})" for l in jf.get("libraries", []))
active = jf.get("active_sessions", [])
playing = ", ".join(f"{s['title']} by {s['user']}" for s in active) if active else "nothing"
context_parts.append(f"Jellyfin v{jf.get('version','?')}: libraries={libs}. Now playing: {playing}")
except Exception:
pass
# Fetch AdGuard stats if question mentions dns/adguard/blocked
if any(kw in msg_lower for kw in ["dns", "adguard", "blocked", "queries", "domain"]):
try:
from routers.network import adguard_stats
ag = adguard_stats()
context_parts.append(f"AdGuard DNS: {ag.get('total_queries', '?')} total queries, {ag.get('blocked', '?')} blocked, {ag.get('avg_time', '?')}s avg response")
except Exception:
pass
system_context = (
"You are a homelab assistant. You have direct access to the following live infrastructure data:\n\n"
+ "\n".join(f"- {p}" for p in context_parts)
+ "\n\n"
"Homelab hosts: Atlantis (Synology NAS, media/arr stack), Calypso (Synology, AdGuard DNS, Headscale, Authentik SSO), "
"Olares (K3s, RTX 5090, Jellyfin, Ollama), NUC (lightweight services), RPi5 (Uptime Kuma), "
"homelab-vm (Prometheus, Grafana, dashboard), Guava (TrueNAS), Seattle (remote VM), matrix-ubuntu (NPM, CrowdSec).\n\n"
"Services: Sonarr, Radarr, SABnzbd, Deluge, Prowlarr, Bazarr, Lidarr, Tdarr, Audiobookshelf, LazyLibrarian on Atlantis. "
"Jellyfin + Ollama on Olares with GPU transcoding. 3 email auto-organizers (Gmail x2 + Proton). "
"11 Ollama-powered automation scripts. Gitea CI with AI PR reviewer.\n\n"
"IMPORTANT: Answer using the LIVE DATA above, not general knowledge. The container counts are REAL numbers from Portainer right now. "
"When asked 'how many containers on atlantis' answer with the exact number from the live data (e.g. 59). Be concise."
)
# Search repo docs for relevant context (max 2K chars)
doc_context = _search_repo_docs(message, max_chars=2000)
if doc_context:
system_context += f"\n\nRelevant documentation:\n{doc_context}"
prompt = f"{system_context}\n\nUser: {message}\nAssistant:"
try:
from lib_bridge import ollama_available as _ollama_check
if not _ollama_check():
return {"response": "Ollama is currently offline. Try again later."}
import sys as _sys
scripts_dir = str(Path("/app/scripts") if Path("/app/scripts").exists() else Path(__file__).parent.parent.parent / "scripts")
if scripts_dir not in _sys.path:
_sys.path.insert(0, scripts_dir)
from lib.ollama import ollama_generate
response = ollama_generate(prompt, num_predict=800, timeout=90)
return {"response": response}
except Exception as e:
return {"error": str(e)}
# ---------------------------------------------------------------------------
# Health score
# ---------------------------------------------------------------------------
@router.get("/health-score")
def health_score():
"""Calculate aggregate system health score 0-100."""
score = 100
details = []
try:
overview = stats_overview()
containers = overview.get("containers", {})
by_ep = containers.get("by_endpoint", {})
# Container health (40 points) — only penalize crashed containers, not cleanly stopped ones
crashed = 0
cleanly_stopped = 0
for ep_name in by_ep:
try:
ep_containers = portainer_list_containers(ep_name)
for c in ep_containers:
state = c.get("State", "")
status = c.get("Status", "")
if state != "running":
if "Exited (0)" in status:
cleanly_stopped += 1
else:
crashed += 1
except Exception:
pass
if crashed > 0:
penalty = min(40, crashed * 8)
score -= penalty
details.append(f"-{penalty}: {crashed} containers crashed/unhealthy")
else:
details.append("+40: all containers healthy")
if cleanly_stopped > 0:
details.append(f"(info: {cleanly_stopped} intentionally stopped, not penalized)")
# Unhealthy containers (20 points)
unhealthy = overview.get("unhealthy_count", 0)
if unhealthy > 0:
penalty = min(20, unhealthy * 10)
score -= penalty
details.append(f"-{penalty}: {unhealthy} unhealthy containers")
else:
details.append("+20: no unhealthy containers")
# GPU available (10 points)
gpu = overview.get("gpu", {})
if not gpu.get("available"):
score -= 10
details.append("-10: GPU unavailable")
else:
details.append("+10: GPU online")
# Ollama available (10 points)
if not overview.get("ollama_available"):
score -= 10
details.append("-10: Ollama offline")
else:
details.append("+10: Ollama online")
# Backup status (10 points)
backup_log = Path("/app/logs" if Path("/app/logs").exists() else "/tmp") / "gmail-backup-daily.log"
if backup_log.exists():
with open(backup_log) as f:
content = f.read()
if "ERROR" in content[-2000:]:
score -= 10
details.append("-10: backup has errors")
else:
details.append("+10: backup OK")
else:
score -= 5
details.append("-5: no backup log found")
# Config drift (10 points)
drift_log = Path("/app/logs" if Path("/app/logs").exists() else "/tmp") / "config-drift.log"
if drift_log.exists():
with open(drift_log) as f:
lines = f.readlines()
last_lines = "".join(lines[-20:])
if "drifts" in last_lines.lower() and "no drifts" not in last_lines.lower():
score -= 10
details.append("-10: config drift detected")
else:
details.append("+10: no drift")
else:
details.append("+10: no drift (no log)")
except Exception as e:
details.append(f"Error calculating: {e}")
return {
"score": max(0, min(100, score)),
"grade": "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "D" if score >= 60 else "F",
"details": details,
}
# ---------------------------------------------------------------------------
# Quick actions
# ---------------------------------------------------------------------------
@router.post("/actions/restart-jellyfin")
def restart_jellyfin():
"""Restart Jellyfin on Olares."""
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=3", "olares",
"kubectl rollout restart deployment/jellyfin -n jellyfin-vishinator"],
capture_output=True, text=True, timeout=15)
return {"success": result.returncode == 0, "output": result.stdout.strip() or result.stderr.strip()}
@router.post("/actions/restart-ollama")
def restart_ollama():
"""Restart Ollama on Olares."""
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=3", "olares",
"kubectl rollout restart deployment/ollama -n ollamaserver-shared"],
capture_output=True, text=True, timeout=15)
return {"success": result.returncode == 0, "output": result.stdout.strip() or result.stderr.strip()}
@router.post("/actions/run-backup")
def run_backup():
"""Trigger daily Gmail backup."""
result = subprocess.run(
["/home/homelab/organized/repos/homelab/scripts/gmail-backup-daily.sh"],
capture_output=True, text=True, timeout=300)
return {"success": result.returncode == 0, "output": result.stdout.strip()[-500:]}
# ---------------------------------------------------------------------------
# Automation timeline
# ---------------------------------------------------------------------------
@router.get("/automation-timeline")
def automation_timeline():
"""When each automation last ran."""
log_dir = Path("/app/logs") if Path("/app/logs").exists() else Path("/tmp")
automations = {
"Email (lz)": "gmail-organizer.log",
"Email (dvish)": "gmail-organizer-dvish.log",
"Email (proton)": "proton-organizer.log",
"Stack Restart": "stack-restart.log",
"Backup": "gmail-backup-daily.log",
"Backup Validator": "backup-validator.log",
"Disk Predictor": "disk-predictor.log",
"Config Drift": "config-drift.log",
"Receipt Tracker": "receipt-tracker.log",
"Changelog": "changelog-generator.log",
"Email Digest": "email-digest.log",
}
timeline = []
for name, filename in automations.items():
path = log_dir / filename
if path.exists():
mtime = os.path.getmtime(path)
last_modified = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
# Get last line with a timestamp
with open(path) as f:
lines = f.readlines()
last_run = None
for line in reversed(lines[-50:]):
if line[:4].isdigit():
last_run = line[:19]
break
# Fall back to file modification time if no timestamp found in content
if not last_run:
last_run = last_modified[:19]
timeline.append({"name": name, "last_run": last_run, "last_modified": last_modified, "exists": True})
else:
timeline.append({"name": name, "exists": False})
return timeline
# ---------------------------------------------------------------------------
# Disk usage (via Prometheus)
# ---------------------------------------------------------------------------
@router.get("/disk-usage")
def disk_usage():
"""Disk usage from Prometheus.
Filters out network mounts (nfs/cifs) so remote capacity isn't double-counted,
deduplicates Synology btrfs subvolumes, and aggregates ZFS datasets into
pool-level usage (individual ZFS datasets misleadingly show pool free space).
"""
_fs_exclude = "tmpfs|devtmpfs|overlay|nfs|nfs4|cifs"
_mp_exclude = "/boot.*"
_synology_hosts = {"atlantis", "calypso", "setillo"}
try:
avail = prom_query(f'node_filesystem_avail_bytes{{fstype!~"{_fs_exclude}",mountpoint!~"{_mp_exclude}"}}')
total = prom_query(f'node_filesystem_size_bytes{{fstype!~"{_fs_exclude}",mountpoint!~"{_mp_exclude}"}}')
total_map = {}
for t in total:
key = f"{t['metric'].get('instance', '?')}:{t['metric'].get('mountpoint', '?')}"
total_map[key] = float(t['value'][1])
disks = {}
# Collect ZFS datasets separately for pool-level aggregation
# Key: (host, pool_avail_rounded) -> {used, avail, label}
zfs_pools: dict[tuple, dict] = {}
for a in avail:
key = f"{a['metric'].get('instance', '?')}:{a['metric'].get('mountpoint', '?')}"
mount = a['metric'].get('mountpoint', '?')
fstype = a['metric'].get('fstype', '')
avail_bytes = float(a['value'][1])
total_bytes = total_map.get(key, 0)
if total_bytes < 1e9:
continue
host = a['metric'].get('instance', '?').split(':')[0]
# ZFS: aggregate all datasets per pool instead of showing individually
if fstype == "zfs":
used_bytes = total_bytes - avail_bytes
pool_key = (host, round(avail_bytes / 1e9))
if pool_key not in zfs_pools:
zfs_pools[pool_key] = {"used": 0, "avail": avail_bytes, "label": mount, "host": host}
zfs_pools[pool_key]["used"] += used_bytes
# Keep shortest mountpoint as label
if len(mount) < len(zfs_pools[pool_key]["label"]):
zfs_pools[pool_key]["label"] = mount
continue
# Skip Synology REDACTED_APP_PASSWORD bind-mounts (subvolumes of the same btrfs pool)
if "/@appdata/" in mount or "/@docker" in mount:
continue
# Synology NAS hosts: only show /volumeN data partitions, skip OS root
if host in _synology_hosts and not mount.startswith("/volume"):
continue
dedup_key = f"{host}:{mount}"
used_pct = ((total_bytes - avail_bytes) / total_bytes * 100) if total_bytes > 0 else 0
disks[dedup_key] = {
"host": host,
"mount": mount,
"total_gb": round(total_bytes / 1e9, 1),
"avail_gb": round(avail_bytes / 1e9, 1),
"used_pct": round(used_pct, 1),
}
# Convert aggregated ZFS pools into disk entries (skip tiny pools < 10GB)
for pool_key, p in zfs_pools.items():
total_bytes = p["used"] + p["avail"]
if total_bytes < 10e9:
continue
used_pct = (p["used"] / total_bytes * 100) if total_bytes > 0 else 0
dedup_key = f"{p['host']}:zfs:{pool_key[1]}"
disks[dedup_key] = {
"host": p["host"],
"mount": p["label"],
"total_gb": round(total_bytes / 1e9, 1),
"avail_gb": round(p["avail"] / 1e9, 1),
"used_pct": round(used_pct, 1),
}
result = sorted(disks.values(), key=lambda d: -d["used_pct"])
return result[:20]
except Exception as e:
return {"error": str(e)}
# ---------------------------------------------------------------------------
# Host temperatures (via Prometheus)
# ---------------------------------------------------------------------------
@router.get("/temperatures")
def temperatures():
"""Host temperatures from Prometheus node_hwmon_temp_celsius.
Returns one entry per host with CPU/SoC temp (highest relevant sensor)
plus any hot NVMe drives flagged separately.
"""
# Chips/labels that indicate CPU/SoC temperature
_cpu_chips = {"coretemp", "k10temp", "pci0000:00_0000:00:18_3", "thermal_zone"}
try:
results = prom_query("node_hwmon_temp_celsius")
from collections import defaultdict
hosts: dict[str, dict] = defaultdict(lambda: {
"cpu_temp": None, "sensors": [],
})
for r in results:
m = r["metric"]
host = m.get("instance", "?").split(":")[0]
chip = m.get("chip", "")
label = m.get("label", m.get("sensor", ""))
temp = float(r["value"][1])
if temp <= 0:
continue
is_cpu = any(k in chip for k in _cpu_chips)
is_nvme = "nvme" in chip
entry = hosts[host]
if is_cpu:
if entry["cpu_temp"] is None or temp > entry["cpu_temp"]:
entry["cpu_temp"] = temp
elif is_nvme:
entry["sensors"].append({"label": f"NVMe ({chip.split('_')[-1]})", "temp": temp})
else:
entry["sensors"].append({"label": label or chip, "temp": temp})
out = []
for host, data in hosts.items():
# Pick the highest temp as representative if no CPU sensor found
all_temps = ([data["cpu_temp"]] if data["cpu_temp"] else []) + \
[s["temp"] for s in data["sensors"]]
cpu = data["cpu_temp"] or (max(all_temps) if all_temps else None)
if cpu is None:
continue
# Flag hottest NVMe if above 70°C
hot_nvme = None
nvme_sensors = [s for s in data["sensors"] if "NVMe" in s["label"]]
if nvme_sensors:
hottest = max(nvme_sensors, key=lambda s: s["temp"])
if hottest["temp"] >= 70:
hot_nvme = {"label": hottest["label"], "temp": round(hottest["temp"], 1)}
out.append({
"host": host,
"cpu_temp": round(cpu, 1),
"hot_nvme": hot_nvme,
})
out.sort(key=lambda d: -d["cpu_temp"])
return out
except Exception as e:
return {"error": str(e)}