"""Overview stats and SSE activity stream.""" import asyncio import json import os import subprocess import sqlite3 from datetime import date, datetime, timezone from fastapi import APIRouter from sse_starlette.sse import EventSourceResponse import httpx import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from lib_bridge import ( portainer_list_containers, ENDPOINTS, ollama_available, GMAIL_DB, DVISH_DB, PROTON_DB, RESTART_DB, LOG_DIR, OLLAMA_URL, prom_query, ) from log_parser import get_recent_events, tail_logs, get_new_lines router = APIRouter(tags=["overview"]) def _count_today_emails(db_path: Path) -> int: """Count emails processed today from a processed.db file.""" if not db_path.exists(): return 0 try: today = date.today().isoformat() conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) cur = conn.execute( "SELECT COUNT(*) FROM processed WHERE processed_at LIKE ?", (f"{today}%",), ) count = cur.fetchone()[0] conn.close() return count except Exception: return 0 def _count_unhealthy(db_path: Path) -> int: """Count unhealthy containers from stack-restart.db.""" if not db_path.exists(): return 0 try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) cur = conn.execute("SELECT COUNT(*) FROM unhealthy_tracking") count = cur.fetchone()[0] conn.close() return count except Exception: return 0 def _gpu_info() -> dict: """Get GPU info from olares via SSH.""" try: result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "olares", "nvidia-smi --query-gpu=temperature.gpu,power.draw,power.limit," "memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits"], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return {"available": False} parts = [p.strip() for p in result.stdout.strip().split(",")] def _f(v): try: return float(v) except (ValueError, TypeError): return None if len(parts) >= 6: return { "available": True, "temp_c": _f(parts[0]), "power_draw_w": _f(parts[1]), "power_limit_w": _f(parts[2]), "memory_used_mb": _f(parts[3]), "memory_total_mb": _f(parts[4]), "utilization_pct": _f(parts[5]), } except Exception: pass return {"available": False} @router.get("/stats/overview") def stats_overview(): """Aggregate overview stats.""" # Container counts container_counts = {} total = 0 for ep_name in ENDPOINTS: try: containers = portainer_list_containers(ep_name) running = sum(1 for c in containers if c.get("State") == "running") container_counts[ep_name] = {"total": len(containers), "running": running} total += len(containers) except Exception: container_counts[ep_name] = {"total": 0, "running": 0, "error": True} # GPU gpu = _gpu_info() # Email counts email_today = { "gmail": _count_today_emails(GMAIL_DB), "dvish": _count_today_emails(DVISH_DB), "proton": _count_today_emails(PROTON_DB), } email_today["total"] = sum(email_today.values()) # Unhealthy unhealthy = _count_unhealthy(RESTART_DB) # Ollama ollama_up = ollama_available(OLLAMA_URL) return { "containers": {"total": total, "by_endpoint": container_counts}, "gpu": gpu, "email_today": email_today, "unhealthy_count": unhealthy, "ollama_available": ollama_up, } @router.get("/activity") async def activity_stream(): """SSE stream of today's automation events.""" async def event_generator(): # Send initial batch events = get_recent_events(LOG_DIR) yield {"event": "init", "data": json.dumps(events)} # Poll for new events positions = tail_logs(LOG_DIR) while True: await asyncio.sleep(5) new_events, positions = get_new_lines(LOG_DIR, positions) if new_events: yield {"event": "update", "data": json.dumps(new_events)} return EventSourceResponse(event_generator()) @router.post("/actions/pause-organizers") def pause_organizers(): """Pause all email organizer cron jobs.""" result = subprocess.run( ["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "stop"], capture_output=True, text=True, timeout=10, ) return {"success": result.returncode == 0, "output": result.stdout.strip()} @router.post("/actions/resume-organizers") def resume_organizers(): """Resume all email organizer cron jobs.""" result = subprocess.run( ["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "start"], capture_output=True, text=True, timeout=10, ) return {"success": result.returncode == 0, "output": result.stdout.strip()} @router.get("/actions/organizer-status") def organizer_status(): """Check if organizers are running or paused.""" result = subprocess.run( ["/home/homelab/organized/repos/homelab/scripts/gmail-organizer-ctl.sh", "status"], capture_output=True, text=True, timeout=10, ) return {"output": result.stdout.strip()} @router.get("/calendar") def get_calendar_events(): """Fetch upcoming events from Baikal CalDAV.""" import re from datetime import datetime, timezone BAIKAL_URL = "http://192.168.0.200:12852/dav.php/calendars/vish/default/" BAIKAL_USER = "vish" BAIKAL_PASS = "REDACTED_PASSWORD" today = datetime.now(timezone.utc).strftime("%Y%m%dT000000Z") body = f''' ''' try: auth = httpx.DigestAuth(BAIKAL_USER, BAIKAL_PASS) with httpx.Client(timeout=10) as client: r = client.request("REPORT", BAIKAL_URL, content=body, headers={"Content-Type": "application/xml", "Depth": "1"}, auth=auth) r.raise_for_status() # Parse iCal events summaries = re.findall(r'SUMMARY:(.*?)(?:\r?\n)', r.text) starts = re.findall(r'DTSTART[^:]*:(.*?)(?:\r?\n)', r.text) locations = re.findall(r'LOCATION:(.*?)(?:\r?\n)', r.text) events = [] now = datetime.now(timezone.utc) for i, (start, summary) in enumerate(zip(starts, summaries)): # Parse date — handle both date and datetime formats try: if len(start) == 8: dt = datetime.strptime(start, "%Y%m%d").replace(tzinfo=timezone.utc) else: clean = start.replace("Z", "") dt = datetime.strptime(clean[:15], "%Y%m%dT%H%M%S").replace(tzinfo=timezone.utc) except ValueError: continue # Only future events if dt < now: continue # Clean up summary (unescape iCal) clean_summary = summary.replace("\\,", ",").replace("\\;", ";").replace("&", "&") events.append({ "summary": clean_summary, "start": dt.isoformat(), "date": dt.strftime("%b %d"), "time": dt.strftime("%I:%M %p") if len(start) > 8 else "All day", "location": locations[i].replace("\\,", ",").replace("\\n", ", ") if i < len(locations) else None, }) # Sort by date, limit to next 8 events.sort(key=lambda e: e["start"]) return {"events": events[:8], "total": len(events)} except Exception as e: return {"events": [], "error": str(e)} def _search_repo_docs(query: str, max_chars: int = 2000) -> str: """Search repo docs/scripts for relevant snippets. Lightweight keyword match.""" import re repo = Path("/app/scripts").parent if Path("/app/scripts").exists() else Path(__file__).parent.parent.parent.parent search_dirs = [repo / "docs" / "services" / "individual", repo / "scripts", repo / "docs"] keywords = [w.lower() for w in re.findall(r'\w{3,}', query) if w.lower() not in { "the", "how", "what", "does", "can", "are", "this", "that", "have", "many", "much", "about", "from", "with", "your", "there", "which", }] if not keywords: return "" # Add aliases so related terms find each other aliases = {"tailscale": "headscale", "headscale": "tailscale", "gpu": "nvidia", "jellyfin": "olares", "containers": "portainer", "dns": "adguard"} extra = [aliases[k] for k in keywords if k in aliases] keywords = list(set(keywords + extra)) scored = [] for search_dir in search_dirs: if not search_dir.exists(): continue for f in search_dir.rglob("*.md"): try: text = f.read_text(errors="ignore")[:8000] score = sum(text.lower().count(kw) for kw in keywords) if score > 0: scored.append((score, f, text)) except Exception: continue for f in search_dir.rglob("*.py"): if f.name.startswith("__"): continue try: # Only read the docstring/header, not full scripts text = f.read_text(errors="ignore")[:1000] score = sum(text.lower().count(kw) for kw in keywords) if score > 0: scored.append((score, f, text)) except Exception: continue if not scored: return "" scored.sort(key=lambda x: -x[0]) snippets = [] total = 0 for _, path, text in scored[:2]: # max 2 files # Trim to relevant section — find paragraphs with keywords lines = text.split("\n") relevant = [] for i, line in enumerate(lines): if any(kw in line.lower() for kw in keywords): start = max(0, i - 2) end = min(len(lines), i + 5) relevant.extend(lines[start:end]) snippet = "\n".join(dict.fromkeys(relevant))[:1000] # dedup, cap at 1K if not snippet.strip(): snippet = text[:500] snippets.append(f"[{path.name}]\n{snippet}") total += len(snippet) if total >= max_chars: break return "\n\n".join(snippets) @router.post("/chat") def chat_with_ollama(body: dict): """Chat with Ollama using live homelab context + repo docs.""" message = body.get("message", "") if not message: return {"error": "No message provided"} # Gather live context from multiple sources context_parts = [] try: overview = stats_overview() containers = overview.get("containers", {}) gpu = overview.get("gpu", {}) context_parts.append( f"Containers: {containers.get('total', '?')} total across endpoints: " + ", ".join(f"{k} ({v.get('total','?')} containers, {v.get('running','?')} running)" for k, v in containers.get("by_endpoint", {}).items()) ) if gpu.get("available"): context_parts.append( f"GPU: {gpu.get('name','RTX 5090')}, {gpu.get('temp_c','?')}°C, " f"{gpu.get('memory_used_mb','?')}/{gpu.get('memory_total_mb','?')} MB VRAM, " f"{gpu.get('utilization_pct','?')}% util" ) email_data = overview.get("email_today", {}) if isinstance(email_data, dict): context_parts.append(f"Emails today: {email_data.get('total', 0)} (dvish: {email_data.get('dvish', 0)}, proton: {email_data.get('proton', 0)})") context_parts.append(f"Ollama: {'online' if overview.get('ollama_available') else 'offline'}") context_parts.append(f"Unhealthy containers: {overview.get('unhealthy_count', 0)}") except Exception: context_parts.append("(could not fetch live stats)") # Fetch Headscale nodes if question mentions network/tailscale/headscale/nodes msg_lower = message.lower() if any(kw in msg_lower for kw in ["tailscale", "headscale", "node", "mesh", "vpn", "network"]): try: import json as _json hs_result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "calypso", "/usr/local/bin/docker exec headscale headscale nodes list -o json"], capture_output=True, text=True, timeout=10, ) if hs_result.returncode == 0: nodes = _json.loads(hs_result.stdout) online = [n for n in nodes if n.get("online")] node_names = ", ".join(n.get("givenName") or n.get("name", "?") for n in nodes) context_parts.append(f"Headscale/Tailscale: {len(nodes)} nodes ({len(online)} online): {node_names}") else: context_parts.append("Headscale: 26 nodes (could not fetch live list, but documented as 26)") except Exception: context_parts.append("Headscale: 26 nodes (documented, could not fetch live)") # Fetch Jellyfin status if question mentions media/jellyfin/streaming if any(kw in msg_lower for kw in ["jellyfin", "media", "stream", "movie", "tv", "playing"]): try: from routers.media import jellyfin_status jf = jellyfin_status() libs = ", ".join(f"{l['name']} ({l['type']})" for l in jf.get("libraries", [])) active = jf.get("active_sessions", []) playing = ", ".join(f"{s['title']} by {s['user']}" for s in active) if active else "nothing" context_parts.append(f"Jellyfin v{jf.get('version','?')}: libraries={libs}. Now playing: {playing}") except Exception: pass # Fetch AdGuard stats if question mentions dns/adguard/blocked if any(kw in msg_lower for kw in ["dns", "adguard", "blocked", "queries", "domain"]): try: from routers.network import adguard_stats ag = adguard_stats() context_parts.append(f"AdGuard DNS: {ag.get('total_queries', '?')} total queries, {ag.get('blocked', '?')} blocked, {ag.get('avg_time', '?')}s avg response") except Exception: pass system_context = ( "You are a homelab assistant. You have direct access to the following live infrastructure data:\n\n" + "\n".join(f"- {p}" for p in context_parts) + "\n\n" "Homelab hosts: Atlantis (Synology NAS, media/arr stack), Calypso (Synology, AdGuard DNS, Headscale, Authentik SSO), " "Olares (K3s, RTX 5090, Jellyfin, Ollama), NUC (lightweight services), RPi5 (Uptime Kuma), " "homelab-vm (Prometheus, Grafana, dashboard), Guava (TrueNAS), Seattle (remote VM), matrix-ubuntu (NPM, CrowdSec).\n\n" "Services: Sonarr, Radarr, SABnzbd, Deluge, Prowlarr, Bazarr, Lidarr, Tdarr, Audiobookshelf, LazyLibrarian on Atlantis. " "Jellyfin + Ollama on Olares with GPU transcoding. 3 email auto-organizers (Gmail x2 + Proton). " "11 Ollama-powered automation scripts. Gitea CI with AI PR reviewer.\n\n" "IMPORTANT: Answer using the LIVE DATA above, not general knowledge. The container counts are REAL numbers from Portainer right now. " "When asked 'how many containers on atlantis' answer with the exact number from the live data (e.g. 59). Be concise." ) # Search repo docs for relevant context (max 2K chars) doc_context = _search_repo_docs(message, max_chars=2000) if doc_context: system_context += f"\n\nRelevant documentation:\n{doc_context}" prompt = f"{system_context}\n\nUser: {message}\nAssistant:" try: from lib_bridge import ollama_available as _ollama_check if not _ollama_check(): return {"response": "Ollama is currently offline. Try again later."} import sys as _sys scripts_dir = str(Path("/app/scripts") if Path("/app/scripts").exists() else Path(__file__).parent.parent.parent / "scripts") if scripts_dir not in _sys.path: _sys.path.insert(0, scripts_dir) from lib.ollama import ollama_generate response = ollama_generate(prompt, num_predict=800, timeout=90) return {"response": response} except Exception as e: return {"error": str(e)} # --------------------------------------------------------------------------- # Health score # --------------------------------------------------------------------------- @router.get("/health-score") def health_score(): """Calculate aggregate system health score 0-100.""" score = 100 details = [] try: overview = stats_overview() containers = overview.get("containers", {}) by_ep = containers.get("by_endpoint", {}) # Container health (40 points) — only penalize crashed containers, not cleanly stopped ones crashed = 0 cleanly_stopped = 0 for ep_name in by_ep: try: ep_containers = portainer_list_containers(ep_name) for c in ep_containers: state = c.get("State", "") status = c.get("Status", "") if state != "running": if "Exited (0)" in status: cleanly_stopped += 1 else: crashed += 1 except Exception: pass if crashed > 0: penalty = min(40, crashed * 8) score -= penalty details.append(f"-{penalty}: {crashed} containers crashed/unhealthy") else: details.append("+40: all containers healthy") if cleanly_stopped > 0: details.append(f"(info: {cleanly_stopped} intentionally stopped, not penalized)") # Unhealthy containers (20 points) unhealthy = overview.get("unhealthy_count", 0) if unhealthy > 0: penalty = min(20, unhealthy * 10) score -= penalty details.append(f"-{penalty}: {unhealthy} unhealthy containers") else: details.append("+20: no unhealthy containers") # GPU available (10 points) gpu = overview.get("gpu", {}) if not gpu.get("available"): score -= 10 details.append("-10: GPU unavailable") else: details.append("+10: GPU online") # Ollama available (10 points) if not overview.get("ollama_available"): score -= 10 details.append("-10: Ollama offline") else: details.append("+10: Ollama online") # Backup status (10 points) backup_log = Path("/app/logs" if Path("/app/logs").exists() else "/tmp") / "gmail-backup-daily.log" if backup_log.exists(): with open(backup_log) as f: content = f.read() if "ERROR" in content[-2000:]: score -= 10 details.append("-10: backup has errors") else: details.append("+10: backup OK") else: score -= 5 details.append("-5: no backup log found") # Config drift (10 points) drift_log = Path("/app/logs" if Path("/app/logs").exists() else "/tmp") / "config-drift.log" if drift_log.exists(): with open(drift_log) as f: lines = f.readlines() last_lines = "".join(lines[-20:]) if "drifts" in last_lines.lower() and "no drifts" not in last_lines.lower(): score -= 10 details.append("-10: config drift detected") else: details.append("+10: no drift") else: details.append("+10: no drift (no log)") except Exception as e: details.append(f"Error calculating: {e}") return { "score": max(0, min(100, score)), "grade": "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "D" if score >= 60 else "F", "details": details, } # --------------------------------------------------------------------------- # Quick actions # --------------------------------------------------------------------------- @router.post("/actions/restart-jellyfin") def restart_jellyfin(): """Restart Jellyfin on Olares.""" result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "olares", "kubectl rollout restart deployment/jellyfin -n jellyfin-vishinator"], capture_output=True, text=True, timeout=15) return {"success": result.returncode == 0, "output": result.stdout.strip() or result.stderr.strip()} @router.post("/actions/restart-ollama") def restart_ollama(): """Restart Ollama on Olares.""" result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "olares", "kubectl rollout restart deployment/ollama -n ollamaserver-shared"], capture_output=True, text=True, timeout=15) return {"success": result.returncode == 0, "output": result.stdout.strip() or result.stderr.strip()} @router.post("/actions/run-backup") def run_backup(): """Trigger daily Gmail backup.""" result = subprocess.run( ["/home/homelab/organized/repos/homelab/scripts/gmail-backup-daily.sh"], capture_output=True, text=True, timeout=300) return {"success": result.returncode == 0, "output": result.stdout.strip()[-500:]} # --------------------------------------------------------------------------- # Automation timeline # --------------------------------------------------------------------------- @router.get("/automation-timeline") def automation_timeline(): """When each automation last ran.""" log_dir = Path("/app/logs") if Path("/app/logs").exists() else Path("/tmp") automations = { "Email (lz)": "gmail-organizer.log", "Email (dvish)": "gmail-organizer-dvish.log", "Email (proton)": "proton-organizer.log", "Stack Restart": "stack-restart.log", "Backup": "gmail-backup-daily.log", "Backup Validator": "backup-validator.log", "Disk Predictor": "disk-predictor.log", "Config Drift": "config-drift.log", "Receipt Tracker": "receipt-tracker.log", "Changelog": "changelog-generator.log", "Email Digest": "email-digest.log", } timeline = [] for name, filename in automations.items(): path = log_dir / filename if path.exists(): mtime = os.path.getmtime(path) last_modified = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() # Get last line with a timestamp with open(path) as f: lines = f.readlines() last_run = None for line in reversed(lines[-50:]): if line[:4].isdigit(): last_run = line[:19] break # Fall back to file modification time if no timestamp found in content if not last_run: last_run = last_modified[:19] timeline.append({"name": name, "last_run": last_run, "last_modified": last_modified, "exists": True}) else: timeline.append({"name": name, "exists": False}) return timeline # --------------------------------------------------------------------------- # Disk usage (via Prometheus) # --------------------------------------------------------------------------- @router.get("/disk-usage") def disk_usage(): """Disk usage from Prometheus. Filters out network mounts (nfs/cifs) so remote capacity isn't double-counted, deduplicates Synology btrfs subvolumes, and aggregates ZFS datasets into pool-level usage (individual ZFS datasets misleadingly show pool free space). """ _fs_exclude = "tmpfs|devtmpfs|overlay|nfs|nfs4|cifs" _mp_exclude = "/boot.*" _synology_hosts = {"atlantis", "calypso", "setillo"} try: avail = prom_query(f'node_filesystem_avail_bytes{{fstype!~"{_fs_exclude}",mountpoint!~"{_mp_exclude}"}}') total = prom_query(f'node_filesystem_size_bytes{{fstype!~"{_fs_exclude}",mountpoint!~"{_mp_exclude}"}}') total_map = {} for t in total: key = f"{t['metric'].get('instance', '?')}:{t['metric'].get('mountpoint', '?')}" total_map[key] = float(t['value'][1]) disks = {} # Collect ZFS datasets separately for pool-level aggregation # Key: (host, pool_avail_rounded) -> {used, avail, label} zfs_pools: dict[tuple, dict] = {} for a in avail: key = f"{a['metric'].get('instance', '?')}:{a['metric'].get('mountpoint', '?')}" mount = a['metric'].get('mountpoint', '?') fstype = a['metric'].get('fstype', '') avail_bytes = float(a['value'][1]) total_bytes = total_map.get(key, 0) if total_bytes < 1e9: continue host = a['metric'].get('instance', '?').split(':')[0] # ZFS: aggregate all datasets per pool instead of showing individually if fstype == "zfs": used_bytes = total_bytes - avail_bytes pool_key = (host, round(avail_bytes / 1e9)) if pool_key not in zfs_pools: zfs_pools[pool_key] = {"used": 0, "avail": avail_bytes, "label": mount, "host": host} zfs_pools[pool_key]["used"] += used_bytes # Keep shortest mountpoint as label if len(mount) < len(zfs_pools[pool_key]["label"]): zfs_pools[pool_key]["label"] = mount continue # Skip Synology REDACTED_APP_PASSWORD bind-mounts (subvolumes of the same btrfs pool) if "/@appdata/" in mount or "/@docker" in mount: continue # Synology NAS hosts: only show /volumeN data partitions, skip OS root if host in _synology_hosts and not mount.startswith("/volume"): continue dedup_key = f"{host}:{mount}" used_pct = ((total_bytes - avail_bytes) / total_bytes * 100) if total_bytes > 0 else 0 disks[dedup_key] = { "host": host, "mount": mount, "total_gb": round(total_bytes / 1e9, 1), "avail_gb": round(avail_bytes / 1e9, 1), "used_pct": round(used_pct, 1), } # Convert aggregated ZFS pools into disk entries (skip tiny pools < 10GB) for pool_key, p in zfs_pools.items(): total_bytes = p["used"] + p["avail"] if total_bytes < 10e9: continue used_pct = (p["used"] / total_bytes * 100) if total_bytes > 0 else 0 dedup_key = f"{p['host']}:zfs:{pool_key[1]}" disks[dedup_key] = { "host": p["host"], "mount": p["label"], "total_gb": round(total_bytes / 1e9, 1), "avail_gb": round(p["avail"] / 1e9, 1), "used_pct": round(used_pct, 1), } result = sorted(disks.values(), key=lambda d: -d["used_pct"]) return result[:20] except Exception as e: return {"error": str(e)} # --------------------------------------------------------------------------- # Host temperatures (via Prometheus) # --------------------------------------------------------------------------- @router.get("/temperatures") def temperatures(): """Host temperatures from Prometheus node_hwmon_temp_celsius. Returns one entry per host with CPU/SoC temp (highest relevant sensor) plus any hot NVMe drives flagged separately. """ # Chips/labels that indicate CPU/SoC temperature _cpu_chips = {"coretemp", "k10temp", "pci0000:00_0000:00:18_3", "thermal_zone"} try: results = prom_query("node_hwmon_temp_celsius") from collections import defaultdict hosts: dict[str, dict] = defaultdict(lambda: { "cpu_temp": None, "sensors": [], }) for r in results: m = r["metric"] host = m.get("instance", "?").split(":")[0] chip = m.get("chip", "") label = m.get("label", m.get("sensor", "")) temp = float(r["value"][1]) if temp <= 0: continue is_cpu = any(k in chip for k in _cpu_chips) is_nvme = "nvme" in chip entry = hosts[host] if is_cpu: if entry["cpu_temp"] is None or temp > entry["cpu_temp"]: entry["cpu_temp"] = temp elif is_nvme: entry["sensors"].append({"label": f"NVMe ({chip.split('_')[-1]})", "temp": temp}) else: entry["sensors"].append({"label": label or chip, "temp": temp}) out = [] for host, data in hosts.items(): # Pick the highest temp as representative if no CPU sensor found all_temps = ([data["cpu_temp"]] if data["cpu_temp"] else []) + \ [s["temp"] for s in data["sensors"]] cpu = data["cpu_temp"] or (max(all_temps) if all_temps else None) if cpu is None: continue # Flag hottest NVMe if above 70°C hot_nvme = None nvme_sensors = [s for s in data["sensors"] if "NVMe" in s["label"]] if nvme_sensors: hottest = max(nvme_sensors, key=lambda s: s["temp"]) if hottest["temp"] >= 70: hot_nvme = {"label": hottest["label"], "temp": round(hottest["temp"], 1)} out.append({ "host": host, "cpu_temp": round(cpu, 1), "hot_nvme": hot_nvme, }) out.sort(key=lambda d: -d["cpu_temp"]) return out except Exception as e: return {"error": str(e)}