"""Network / Headscale / AdGuard routes.""" from fastapi import APIRouter import subprocess import json import httpx router = APIRouter(tags=["network"]) ADGUARD_URL = "http://192.168.0.250:9080" ADGUARD_USER = "vish" ADGUARD_PASS = "REDACTED_PASSWORD" def _adguard_get(path): with httpx.Client(timeout=10) as client: client.post(f"{ADGUARD_URL}/control/login", json={"name": ADGUARD_USER, "password": ADGUARD_PASS}) r = client.get(f"{ADGUARD_URL}/control{path}") r.raise_for_status() return r.json() if r.content else {} @router.get("/network/headscale") def headscale_nodes(): """List Headscale nodes.""" result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "calypso", "docker exec headscale headscale nodes list -o json"], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return {"nodes": [], "error": result.stderr.strip()} nodes = json.loads(result.stdout) return {"nodes": [ {"name": n.get("givenName") or n.get("name", "?"), "ip": (n.get("ipAddresses") or ["?"])[0], "online": n.get("online", False), "last_seen": n.get("lastSeen", "")} for n in nodes ]} @router.get("/network/adguard") def adguard_stats(): """Get AdGuard DNS stats.""" try: stats = _adguard_get("/stats") return { "total_queries": stats.get("num_dns_queries", 0), "blocked": stats.get("num_blocked_filtering", 0), "avg_time": stats.get("avg_processing_time", 0), } except Exception as e: return {"error": str(e)} @router.get("/network/adguard/rewrites") def adguard_rewrites(): """List AdGuard DNS rewrites.""" try: data = _adguard_get("/rewrite/list") return [{"domain": r.get("domain", ""), "answer": r.get("answer", "")} for r in (data or [])] except Exception as e: return {"error": str(e)}