"""Network / Headscale / AdGuard routes.""" from fastapi import APIRouter import subprocess import json import httpx router = APIRouter(tags=["network"]) CLOUDFLARE_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret CLOUDFLARE_ZONE_ID = "4dbd15d096d71101b7c0c6362b307a66" AUTHENTIK_URL = "https://sso.vish.gg" AUTHENTIK_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret GITEA_URL = "https://git.vish.gg" GITEA_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret ADGUARD_URL = "http://192.168.0.250:9080" ADGUARD_USER = "vish" ADGUARD_PASS = "REDACTED_PASSWORD" def _adguard_get(path): with httpx.Client(timeout=10) as client: client.post(f"{ADGUARD_URL}/control/login", json={"name": ADGUARD_USER, "password": ADGUARD_PASS}) r = client.get(f"{ADGUARD_URL}/control{path}") r.raise_for_status() return r.json() if r.content else {} def _parse_headscale_time(val) -> str: """Convert headscale timestamp (protobuf or string) to ISO format.""" if not val: return "" if isinstance(val, dict) and "seconds" in val: from datetime import datetime, timezone return datetime.fromtimestamp(val["seconds"], tz=timezone.utc).isoformat() if isinstance(val, str): return val[:19] return "" @router.get("/network/headscale") def headscale_nodes(): """List Headscale nodes.""" result = subprocess.run( ["ssh", "-o", "ConnectTimeout=3", "calypso", "sudo /usr/local/bin/docker exec headscale headscale nodes list -o json"], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return {"nodes": [], "error": result.stderr.strip()} try: nodes = json.loads(result.stdout) except json.JSONDecodeError: return {"nodes": [], "error": "Invalid JSON from headscale"} online_count = sum(1 for n in nodes if n.get("online")) return { "nodes": [ {"name": n.get("given_name") or n.get("givenName") or n.get("name", "?"), "ip": (n.get("ip_addresses") or n.get("ipAddresses") or ["?"])[0], "online": n.get("online", False), "last_seen": _parse_headscale_time(n.get("last_seen") or n.get("lastSeen"))} for n in nodes ], "total": len(nodes), "online": online_count, } @router.get("/network/adguard") def adguard_stats(): """Get AdGuard DNS stats.""" try: stats = _adguard_get("/stats") return { "total_queries": stats.get("num_dns_queries", 0), "blocked": stats.get("num_blocked_filtering", 0), "avg_time": stats.get("avg_processing_time", 0), } except Exception as e: return {"error": str(e)} @router.get("/network/adguard/rewrites") def adguard_rewrites(): """List AdGuard DNS rewrites.""" try: data = _adguard_get("/rewrite/list") return [{"domain": r.get("domain", ""), "answer": r.get("answer", "")} for r in (data or [])] except Exception as e: return {"error": str(e)} @router.get("/network/cloudflare") def cloudflare_stats(): """Cloudflare DNS records with proxied status.""" try: with httpx.Client(timeout=10) as client: r = client.get(f"https://api.cloudflare.com/client/v4/zones/{CLOUDFLARE_ZONE_ID}/dns_records", headers={"Authorization": f"Bearer {CLOUDFLARE_TOKEN}"}, params={"per_page": 100}) r.raise_for_status() raw_records = r.json().get("result", []) proxied_count = sum(1 for rec in raw_records if rec.get("proxied")) types = {} records = [] for rec in raw_records: t = rec.get("type", "?") types[t] = types.get(t, 0) + 1 records.append({ "name": rec.get("name", "?"), "type": t, "content": rec.get("content", "?"), "proxied": rec.get("proxied", False), "ttl": rec.get("ttl", 0), }) records.sort(key=lambda r: (r["type"], r["name"])) return { "total": len(records), "proxied": proxied_count, "dns_only": len(records) - proxied_count, "types": types, "records": records, } except Exception as e: return {"error": str(e)} @router.get("/network/authentik") def authentik_info(): """Authentik users, sessions, and recent events.""" try: with httpx.Client(timeout=10, verify=False) as client: headers = {"Authorization": f"Bearer {AUTHENTIK_TOKEN}"} # Users ur = client.get(f"{AUTHENTIK_URL}/api/v3/core/users/", headers=headers, params={"page_size": 20}) users = [] if ur.status_code == 200: for u in ur.json().get("results", []): if u.get("username", "").startswith("ak-"): continue # Skip service accounts users.append({ "username": u.get("username", "?"), "last_login": u.get("last_login", "")[:19] if u.get("last_login") else "never", "active": u.get("is_active", False), }) # Sessions sr = client.get(f"{AUTHENTIK_URL}/api/v3/core/authenticated_sessions/", headers=headers) session_count = sr.json().get("pagination", {}).get("count", 0) if sr.status_code == 200 else 0 # Recent events (skip noisy secret_rotate) er = client.get(f"{AUTHENTIK_URL}/api/v3/events/events/", headers=headers, params={"page_size": 20, "ordering": "-created"}) events = [] if er.status_code == 200: for e in er.json().get("results", []): action = e.get("action", "?") if action in ("secret_rotate",): continue user = e.get("user", {}).get("username") or e.get("context", {}).get("username", "system") events.append({ "action": action, "user": user, "created": e.get("created", "?")[:19], }) if len(events) >= 5: break return { "users": users, "active_sessions": session_count, "recent_events": events, } except Exception as e: return {"error": str(e)} @router.get("/network/gitea") def gitea_activity(): """Recent Gitea commits and open PRs.""" try: with httpx.Client(timeout=10) as client: # Recent commits cr = client.get(f"{GITEA_URL}/api/v1/repos/vish/homelab/commits", headers={"Authorization": f"token {GITEA_TOKEN}"}, params={"limit": 5, "sha": "main"}) commits = [] if cr.status_code == 200: for c in cr.json()[:5]: commits.append({ "sha": c.get("sha", "?")[:7], "message": c.get("commit", {}).get("message", "?").split("\n")[0][:80], "date": c.get("commit", {}).get("committer", {}).get("date", "?")[:10], "author": c.get("commit", {}).get("author", {}).get("name", "?"), }) # Open PRs pr = client.get(f"{GITEA_URL}/api/v1/repos/vish/homelab/pulls", headers={"Authorization": f"token {GITEA_TOKEN}"}, params={"state": "open", "limit": 5}) prs = [] if pr.status_code == 200: for p in pr.json(): prs.append({ "number": p.get("number"), "title": p.get("title", "?"), "user": p.get("user", {}).get("login", "?"), }) return {"commits": commits, "open_prs": prs} except Exception as e: return {"error": str(e)}