#!/usr/bin/env python3 """ Homelab MCP Server Provides MCP tools for managing homelab infrastructure: - Portainer: stack/container management across all endpoints - Gitea: repo, issue, and branch management - Prometheus: PromQL queries and target inspection - Grafana: dashboard and alert listing - Sonarr/Radarr: media library and download queue - SABnzbd: download queue management - SSH: remote command execution on homelab hosts - Filesystem: read/write files on the local machine """ import json import os import subprocess from pathlib import Path from typing import Optional import httpx from fastmcp import FastMCP # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- PORTAINER_URL = "http://100.83.230.112:10000" PORTAINER_TOKEN = "REDACTED_TOKEN" GITEA_TOKEN = "REDACTED_TOKEN" NTFY_BASE = "https://ntfy.vish.gg" REPO_PATH = Path("/home/homelab/organized/repos/homelab") ENDPOINTS: dict[str, int] = { "atlantis": 2, "calypso": 443397, "nuc": 443398, "homelab": 443399, "rpi5": 443395, } # Gitea — now on matrix-ubuntu via NPM GITEA_URL = "https://git.vish.gg" GITEA_ORG = "vish" # Monitoring PROMETHEUS_URL = "http://192.168.0.210:9090" GRAFANA_URL = "http://192.168.0.210:3300" GRAFANA_USER = "admin" GRAFANA_PASS = "REDACTED_PASSWORD" # Media SONARR_URL = "http://192.168.0.200:8989" SONARR_API_KEY = "REDACTED_API_KEY" RADARR_URL = "http://192.168.0.200:7878" RADARR_API_KEY = "REDACTED_API_KEY" SABNZBD_URL = "http://192.168.0.200:8080" SABNZBD_API_KEY = "REDACTED_API_KEY" # AdGuard — primary on Calypso, secondary on NUC ADGUARD_URL = "http://192.168.0.250:9080" ADGUARD_USER = "vish" ADGUARD_PASS = "REDACTED_PASSWORD" # NPM — now on matrix-ubuntu NPM_URL = "http://192.168.0.154:81" NPM_USER = "your-email@example.com" NPM_PASS = "REDACTED_PASSWORD" # Headscale — on Calypso HEADSCALE_URL = "https://headscale.vish.gg:8443" HEADSCALE_CONTAINER = "headscale" HEADSCALE_CALYPSO_SSH = "calypso" # Authentik — on Calypso AUTHENTIK_URL = "https://sso.vish.gg" AUTHENTIK_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret # Cloudflare CLOUDFLARE_TOKEN = ( "FGXlHM7doB8Z4vxv84_ntzhG_Cx15RXs66zoouZU" # pragma: allowlist secret ) CLOUDFLARE_ZONE_ID = "4dbd15d096d71101b7c0c6362b307a66" # Uptime Kuma — on Pi-5 KUMA_DB_PATH = "/home/vish/docker/kuma/data/kuma.db" KUMA_CONTAINER = "uptime-kuma" KUMA_HOST = "pi-5" # SSH — hostnames must resolve via /etc/hosts or ~/.ssh/config SSH_KNOWN_HOSTS = [ "atlantis", "calypso", "setillo", "setillo-root", "nuc", "homelab-vm", "rpi5", "pi-5", "matrix-ubuntu", "moon", "olares", "guava", "pve", "seattle-tailscale", "gl-mt3000", ] # Filesystem — restrict read/write to safe root paths FS_ALLOWED_ROOTS = [ Path("/home/homelab"), Path("/tmp"), ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _portainer(method: str, path: str, **kwargs) -> dict | list: """Make a Portainer API request. Raises on HTTP error.""" with httpx.Client(verify=False, timeout=30) as client: r = client.request( method, f"{PORTAINER_URL}/api{path}", headers={"X-API-Key": PORTAINER_TOKEN}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _resolve_endpoint(endpoint: str) -> int: """Resolve endpoint name or numeric string to an endpoint ID.""" if endpoint.isdigit(): return int(endpoint) ep = endpoint.lower() if ep not in ENDPOINTS: raise ValueError( f"Unknown endpoint '{endpoint}'. Known: {', '.join(ENDPOINTS)}" ) return ENDPOINTS[ep] def _gitea(method: str, path: str, **kwargs) -> dict | list: """Make a Gitea API request.""" with httpx.Client(timeout=20) as client: r = client.request( method, f"{GITEA_URL}/api/v1{path}", headers={"Authorization": f"token {GITEA_TOKEN}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _arr( base_url: str, api_key: str, path: str, params: dict | None = None ) -> dict | list: """Make a Sonarr/Radarr API request.""" with httpx.Client(timeout=20) as client: r = client.get( f"{base_url}/api/v3{path}", headers={"X-Api-Key": api_key}, params=params or {}, ) r.raise_for_status() return r.json() def _sabnzbd(mode: str, extra: dict | None = None) -> dict: """Make a SABnzbd API request.""" params = {"apikey": SABNZBD_API_KEY, "output": "json", "mode": mode} if extra: params.update(extra) with httpx.Client(timeout=20) as client: r = client.get(f"{SABNZBD_URL}/api", params=params) r.raise_for_status() return r.json() def _fs_safe(path: str) -> Path: """Resolve a path and verify it's under an allowed root.""" p = Path(path).expanduser().resolve() for root in FS_ALLOWED_ROOTS: try: p.relative_to(root) return p except ValueError: continue allowed = ", ".join(str(r) for r in FS_ALLOWED_ROOTS) raise PermissionError(f"Path '{path}' is outside allowed roots: {allowed}") # --------------------------------------------------------------------------- # MCP Server # --------------------------------------------------------------------------- mcp = FastMCP( "Homelab", instructions=( "Tools for managing a homelab running Docker services across multiple hosts.\n\n" "PORTAINER — Docker orchestration across 5 endpoints:\n" " Endpoints: atlantis (main NAS, media stack), calypso (secondary NAS), " "nuc (mini PC), homelab (VM at 192.168.0.210), rpi5 (Raspberry Pi).\n" " Tools: list_endpoints, list_stacks, get_stack, redeploy_stack, " "list_containers, get_container_logs, restart_container, start_container, " "stop_container, list_stack_containers, check_portainer.\n\n" "GITEA — Self-hosted git at git.vish.gg, org=vish. Repo names can be " "'vish/homelab' or just 'homelab'.\n" " Tools: gitea_list_repos, gitea_list_issues, gitea_create_issue, gitea_list_branches.\n\n" "ADGUARD — DNS rewrite management for split-horizon DNS (Calypso, 192.168.0.250:9080).\n" " The wildcard *.vish.gg → 100.85.21.51 (matrix-ubuntu) requires specific overrides\n" " for unproxied services accessed internally (pt.vish.gg, sso.vish.gg, git.vish.gg).\n" " Tools: adguard_list_rewrites, adguard_add_rewrite, adguard_delete_rewrite.\n\n" "NPM — Nginx Proxy Manager on matrix-ubuntu (192.168.0.154:81).\n" " Cert IDs: npm-6=mx.vish.gg, npm-7=livekit.mx.vish.gg, npm-8=*.vish.gg(CF),\n" " npm-11=pt.vish.gg, npm-12=sso.vish.gg. NEVER reuse an existing npm-N ID.\n" " Tools: npm_list_proxy_hosts, npm_list_certs, npm_get_proxy_host, npm_update_cert.\n\n" "HEADSCALE — Self-hosted Tailscale coordination server on Calypso.\n" " Access via SSH to calypso → docker exec headscale.\n" " Tools: headscale_list_nodes, headscale_create_preauth_key, headscale_delete_node, " "headscale_rename_node.\n\n" "AUTHENTIK — SSO identity provider at sso.vish.gg (Calypso).\n" " All proxy providers need cookie_domain=vish.gg to avoid redirect loops.\n" " Tools: authentik_list_applications, authentik_list_providers, authentik_list_users,\n" " authentik_update_app_launch_url, authentik_set_provider_cookie_domain.\n\n" "CLOUDFLARE — DNS management for vish.gg zone.\n" " Most *.vish.gg are proxied (orange cloud). Unproxied: mx.vish.gg, headscale.vish.gg,\n" " livekit.mx.vish.gg, pt.vish.gg, sso.vish.gg, derp*.vish.gg.\n" " Tools: cloudflare_list_dns_records, cloudflare_create_dns_record,\n" " cloudflare_delete_dns_record, cloudflare_update_dns_record.\n\n" "UPTIME KUMA — Monitoring on Pi-5 (100.77.151.40:3001). DB manipulation via SSH+SQLite.\n" " Always call kuma_restart after adding/modifying monitors.\n" " Tools: kuma_list_monitors, kuma_list_groups, kuma_add_monitor, kuma_set_parent, " "kuma_restart.\n\n" "PROMETHEUS — PromQL queries against homelab metrics (192.168.0.210:9090).\n" " Tools: prometheus_query, prometheus_targets.\n\n" "GRAFANA — Dashboard and alert inspection (192.168.0.210:3300).\n" " Tools: grafana_list_dashboards, grafana_list_alerts.\n\n" "SONARR/RADARR — Media library and download queue on Atlantis (ports 8989/7878).\n" " Tools: sonarr_list_series, sonarr_queue, radarr_list_movies, radarr_queue.\n\n" "SABNZBD — Usenet download queue on Atlantis (port 8080).\n" " Tools: sabnzbd_queue, sabnzbd_pause, sabnzbd_resume.\n\n" "SSH — Run commands on homelab hosts. Allowed: atlantis, calypso, setillo, " "setillo-root, nuc, homelab-vm, rpi5. Requires key auth in ~/.ssh/config.\n" " Tool: ssh_exec(host, command).\n\n" "FILESYSTEM — Read/write files on the local machine. " "Allowed roots: /home/homelab, /tmp.\n" " Tools: fs_read, fs_write, fs_list.\n\n" "REPO — Inspect compose files in the homelab git repo at " "/home/homelab/organized/repos/homelab.\n" " Tools: list_homelab_services, get_compose_file.\n\n" "UTILITIES — check_url (HTTP health check), send_notification (ntfy push)." ), ) # --------------------------------------------------------------------------- # Portainer — Endpoints # --------------------------------------------------------------------------- @mcp.tool() def list_endpoints() -> str: """List all Portainer environments (servers) with their connection status.""" data = _portainer("GET", "/endpoints") rows = [] for ep in data: rows.append( f" {ep['Name']} (id={ep['Id']}) — " f"status={'online' if ep.get('Status') == 1 else 'offline'} — " f"containers={ep.get('Snapshots', [{}])[0].get('RunningContainerCount', '?')} running" ) return "Endpoints:\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Portainer — Stacks # --------------------------------------------------------------------------- @mcp.tool() def list_stacks(endpoint: Optional[str] = None) -> str: """ List all Portainer stacks with their status. Args: endpoint: Optional filter by endpoint name (atlantis, calypso, nuc, homelab, rpi5) or numeric ID. """ data = _portainer("GET", "/stacks") if endpoint: ep_id = _resolve_endpoint(endpoint) data = [s for s in data if s.get("EndpointId") == ep_id] rows = [] for s in sorted(data, key=lambda x: x["Name"]): status = "active" if s.get("Status") == 1 else "inactive" ep_name = next( (k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")), str(s.get("EndpointId")), ) git = s.get("GitConfig", {}) git_info = f" [git: {git.get('ConfigFilePath', '')}]" if git else "" rows.append( f" [{s['Id']}] {s['Name']} — {status} — endpoint={ep_name}{git_info}" ) return f"Stacks ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def get_stack(stack_name_or_id: str) -> str: """ Get detailed info about a specific stack. Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] git = s.get("GitConfig") or {} env = s.get("Env") or [] ep_name = next( (k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")), str(s.get("EndpointId")), ) lines = [ f"Stack: {s['Name']} (id={s['Id']})", f" Status: {'active' if s.get('Status') == 1 else 'inactive'}", f" Endpoint: {ep_name} (id={s.get('EndpointId')})", f" Created: {s.get('CreationDate', 'unknown')}", f" Updated: {s.get('UpdateDate', 'unknown')}", ] if git: lines += [ f" Git URL: {git.get('URL', '')}", f" Git Branch: {git.get('ReferenceName', '').replace('refs/heads/', '')}", f" Git File: {git.get('ConfigFilePath', '')}", ] if env: lines.append(f" Env vars: {len(env)} set") return "\n".join(lines) @mcp.tool() def redeploy_stack(stack_name_or_id: str) -> str: """ Trigger a GitOps redeploy of a stack (pull latest from Git and redeploy). Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] if not s.get("GitConfig"): return f"Stack '{s['Name']}' is not a GitOps stack — cannot redeploy via git." ep_id = s["EndpointId"] stack_id = s["Id"] _portainer( "PUT", f"/stacks/{stack_id}/git/redeploy?endpointId={ep_id}", json={ "pullImage": True, "prune": False, "repositoryAuthentication": True, "repositoryUsername": "vish", "repositoryPassword": GITEA_TOKEN, }, ) return f"Redeploy triggered for stack '{s['Name']}' (id={stack_id}) on endpoint {ep_id}." # --------------------------------------------------------------------------- # Portainer — Containers # --------------------------------------------------------------------------- @mcp.tool() def list_containers( endpoint: str = "atlantis", all_containers: bool = False, filter_name: Optional[str] = None, ) -> str: """ List containers on a Portainer endpoint. Args: endpoint: Endpoint name (atlantis, calypso, nuc, homelab, rpi5) or ID. all_containers: If True, include stopped containers. Default False (running only). filter_name: Optional substring to filter container names. """ ep_id = _resolve_endpoint(endpoint) params = "?all=true" if all_containers else "" data = _portainer("GET", f"/endpoints/{ep_id}/docker/containers/json{params}") if filter_name: term = filter_name.lower() data = [c for c in data if any(term in n.lower() for n in c.get("Names", []))] rows = [] for c in sorted(data, key=lambda x: x.get("Names", [""])[0]): name = c.get("Names", ["?"])[0].lstrip("/") image = c.get("Image", "?").split(":")[0].split("/")[-1] state = c.get("State", "?") short_id = c.get("Id", "")[:12] rows.append(f" {short_id} {state:10s} {name:40s} {image}") header = f"Containers on {endpoint} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image" return header + "\n" + "\n".join(rows) @mcp.tool() def get_container_logs( container_id_or_name: str, endpoint: str = "atlantis", tail: int = 100, ) -> str: """ Get recent logs from a container. Args: container_id_or_name: Container ID (short or full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. tail: Number of log lines to return. Default: 100. """ ep_id = _resolve_endpoint(endpoint) # Resolve by name if not a hex ID if not all(c in "0123456789abcdefABCDEF" for c in container_id_or_name): containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) term = container_id_or_name.lower() matches = [ c for c in containers if any(term in n.lower() for n in c.get("Names", [])) ] if not matches: return ( f"No container found matching '{container_id_or_name}' on {endpoint}." ) if len(matches) > 1: names = ", ".join(c["Names"][0].lstrip("/") for c in matches) return f"Multiple matches: {names}. Be more specific." container_id_or_name = matches[0]["Id"][:12] with httpx.Client(verify=False, timeout=30) as client: r = client.get( f"{PORTAINER_URL}/api/endpoints/{ep_id}/docker/containers/{container_id_or_name}/logs", headers={"X-API-Key": PORTAINER_TOKEN}, params={"stdout": 1, "stderr": 1, "tail": tail, "timestamps": 0}, ) r.raise_for_status() # Docker log stream has 8-byte header per line; strip it raw = r.content lines = [] i = 0 while i < len(raw): if i + 8 > len(raw): break size = int.from_bytes(raw[i + 4 : i + 8], "big") line = raw[i + 8 : i + 8 + size].decode("utf-8", errors="replace").rstrip() if line: lines.append(line) i += 8 + size if not lines: # fallback: treat as plain text lines = r.text.splitlines() return "\n".join(lines[-tail:]) @mcp.tool() def restart_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Restart a container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/restart") return f"Restarted container {cid} on {endpoint}." @mcp.tool() def stop_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Stop a running container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/stop") return f"Stopped container {cid} on {endpoint}." @mcp.tool() def start_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Start a stopped container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/start") return f"Started container {cid} on {endpoint}." def _resolve_container_id(name_or_id: str, ep_id: int) -> str: """Resolve a container name substring to a short container ID.""" if len(name_or_id) >= 12 and all(c in "0123456789abcdefABCDEF" for c in name_or_id): return name_or_id[:12] containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) term = name_or_id.lower() matches = [ c for c in containers if any(term in n.lower() for n in c.get("Names", [])) ] if not matches: raise ValueError(f"No container found matching '{name_or_id}'.") if len(matches) > 1: names = ", ".join(c["Names"][0].lstrip("/") for c in matches) raise ValueError( f"Multiple containers match '{name_or_id}': {names}. Be more specific." ) return matches[0]["Id"][:12] # --------------------------------------------------------------------------- # Portainer — Stack containers (convenience) # --------------------------------------------------------------------------- @mcp.tool() def list_stack_containers(stack_name_or_id: str) -> str: """ List all containers belonging to a specific stack. Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] ep_id = s["EndpointId"] stack_name = s["Name"] containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) # Filter by compose project label (Portainer uses com.docker.compose.project) stack_containers = [ c for c in containers if c.get("Labels", {}).get("com.docker.compose.project", "").lower() == stack_name.lower() or any(stack_name.lower() in n.lower() for n in c.get("Names", [])) ] ep_name = next((k for k, v in ENDPOINTS.items() if v == ep_id), str(ep_id)) rows = [] for c in sorted(stack_containers, key=lambda x: x.get("Names", [""])[0]): name = c.get("Names", ["?"])[0].lstrip("/") state = c.get("State", "?") short_id = c.get("Id", "")[:12] image = c.get("Image", "?").split(":")[0].split("/")[-1] rows.append(f" {short_id} {state:10s} {name:40s} {image}") header = f"Containers in stack '{stack_name}' on {ep_name} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image" return header + "\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Health checks # --------------------------------------------------------------------------- @mcp.tool() def check_url(url: str, expected_status: int = 200) -> str: """ Perform an HTTP health check on a URL. Args: url: The URL to check (e.g. http://192.168.0.200:9443/api/status). expected_status: Expected HTTP status code. Default: 200. """ try: with httpx.Client(verify=False, timeout=10, follow_redirects=True) as client: r = client.get(url) ok = r.status_code == expected_status return ( f"{'OK' if ok else 'FAIL'} {url}\n" f" Status: {r.status_code} (expected {expected_status})\n" f" Latency: {r.elapsed.total_seconds() * 1000:.0f}ms" ) except Exception as e: return f"ERROR {url}\n {type(e).__name__}: {e}" @mcp.tool() def check_portainer() -> str: """Quick health check of the Portainer API and summary of infrastructure.""" try: status = _portainer("GET", "/status") stacks = _portainer("GET", "/stacks") stacks_list = stacks if isinstance(stacks, list) else [] status_dict = status if isinstance(status, dict) else {} active = sum( 1 for s in stacks_list if isinstance(s, dict) and s.get("Status") == 1 ) return ( f"Portainer OK — version {status_dict.get('Version', '?')}\n" f" Stacks: {len(stacks_list)} total, {active} active" ) except Exception as e: return f"Portainer UNREACHABLE: {e}" # --------------------------------------------------------------------------- # Repo — service inspection # --------------------------------------------------------------------------- @mcp.tool() def list_homelab_services(host_filter: Optional[str] = None) -> str: """ List all services/stacks defined in the homelab repository. Args: host_filter: Optional substring to filter by host/path (e.g. 'atlantis', 'calypso', 'seattle'). """ compose_files = list(REPO_PATH.rglob("docker-compose.yml")) + list( REPO_PATH.rglob("docker-compose.yaml") ) # Exclude archive compose_files = [f for f in compose_files if "archive" not in f.parts] rows = [] for f in sorted(compose_files): rel = f.relative_to(REPO_PATH) if host_filter and host_filter.lower() not in str(rel).lower(): continue rows.append(f" {rel}") return f"Compose files ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def get_compose_file(service_path: str) -> str: """ Read a compose file from the homelab repo. Args: service_path: Relative path within the repo, e.g. 'hosts/synology/atlantis/arr-suite/docker-compose.yml' or a partial name like 'atlantis/arr-suite'. """ # Try exact relative path first candidate = REPO_PATH / service_path if candidate.is_file(): return candidate.read_text() # Try fuzzy: find compose files whose path contains the fragment # Searches docker-compose.yml/yaml AND standalone *.yaml/*.yml stack files term = service_path.lower().replace("\\", "/") all_compose = ( list(REPO_PATH.rglob("docker-compose.yml")) + list(REPO_PATH.rglob("docker-compose.yaml")) + list(REPO_PATH.rglob("*.yaml")) + list(REPO_PATH.rglob("*.yml")) ) hits = [ f for f in all_compose if term in str(f.relative_to(REPO_PATH)).lower() and "archive" not in f.parts and ".git" not in f.parts and "node_modules" not in f.parts ] # Prefer docker-compose files if both match compose_hits = [f for f in hits if "docker-compose" in f.name] if compose_hits: hits = compose_hits if not hits: return f"No compose file found matching '{service_path}'." if len(hits) > 1: paths = "\n".join(f" {f.relative_to(REPO_PATH)}" for f in hits) return f"Multiple matches:\n{paths}\nBe more specific." return hits[0].read_text() # --------------------------------------------------------------------------- # Notifications # --------------------------------------------------------------------------- @mcp.tool() def send_notification( message: str, title: str = "Homelab", topic: str = "homelab-alerts", priority: str = "default", tags: Optional[str] = None, ) -> str: """ Send a push notification via ntfy. Args: message: The notification body. title: Notification title. Default: 'Homelab'. topic: ntfy topic to publish to. Default: 'homelab-alerts'. priority: urgent, high, default, low, or min. Default: 'default'. tags: Comma-separated emoji tags e.g. 'warning,robot'. Optional. """ headers = { "Title": title, "Priority": priority, } if tags: headers["Tags"] = tags with httpx.Client(timeout=10) as client: r = client.post( f"{NTFY_BASE}/{topic}", content=message.encode(), headers=headers, ) r.raise_for_status() return f"Notification sent to {NTFY_BASE}/{topic} — '{title}: {message}'" # --------------------------------------------------------------------------- # Gitea # --------------------------------------------------------------------------- @mcp.tool() def gitea_list_repos(owner: Optional[str] = None, limit: int = 50) -> str: """ List Gitea repositories. Args: owner: User or org name. Defaults to the service account's accessible repos. limit: Max repos to return. Default: 50. """ if owner: data = _gitea("GET", f"/repos/search", params={"owner": owner, "limit": limit}) repos = data.get("data", []) if isinstance(data, dict) else data else: repos = _gitea( "GET", f"/repos/search", params={"limit": limit, "token": GITEA_TOKEN} ) repos = repos.get("data", []) if isinstance(repos, dict) else repos rows = [] for r in repos: archived = " [archived]" if r.get("archived") else "" private = " [private]" if r.get("private") else "" rows.append( f" {r['full_name']}{private}{archived} — " f"⭐{r.get('stars_count', 0)} " f"updated: {r.get('updated', '')[:10]}" ) return f"Repos ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def gitea_list_issues( repo: str, state: str = "open", limit: int = 20, ) -> str: """ List issues for a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). state: 'open', 'closed', or 'all'. Default: 'open'. limit: Max issues to return. Default: 20. """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea( "GET", f"/repos/{repo}/issues", params={"state": state, "type": "issues", "limit": limit}, ) if not data: return f"No {state} issues in {repo}." rows = [] for issue in data: labels = ", ".join(l["name"] for l in issue.get("labels", [])) label_str = f" [{labels}]" if labels else "" rows.append( f" #{issue['number']} {issue['title']}{label_str} — @{issue['user']['login']}" ) return f"{state.capitalize()} issues in {repo} ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def gitea_create_issue(repo: str, title: str, body: str = "") -> str: """ Create a new issue in a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). title: Issue title. body: Issue body/description. Optional. """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea("POST", f"/repos/{repo}/issues", json={"title": title, "body": body}) if isinstance(data, dict): return f"Created issue #{data.get('number')}: {data.get('title')}\n URL: {data.get('html_url')}" return f"Issue created: {data}" @mcp.tool() def gitea_list_branches(repo: str) -> str: """ List branches for a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea("GET", f"/repos/{repo}/branches") rows = [ f" {b['name']}" + (" [default]" if b.get("is_default") else "") for b in data ] return f"Branches in {repo} ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Prometheus # --------------------------------------------------------------------------- @mcp.tool() def prometheus_query(query: str) -> str: """ Run an instant PromQL query. Args: query: PromQL expression e.g. 'up', 'node_memory_MemAvailable_bytes{job="node"}'. """ with httpx.Client(timeout=20) as client: r = client.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query}) r.raise_for_status() data = r.json() if data.get("status") != "success": return f"Query failed: {data.get('error', 'unknown error')}" results = data["data"]["result"] if not results: return f"No results for: {query}" rows = [] for item in results[:50]: # cap output metric = item["metric"] value = item["value"][1] if item.get("value") else "?" label_str = ", ".join( f'{k}="{v}"' for k, v in metric.items() if k != "__name__" ) name = metric.get("__name__", query) rows.append(f" {name}{{{label_str}}} = {value}") return f"Results ({len(results)}):\n" + "\n".join(rows) @mcp.tool() def prometheus_targets() -> str: """List all Prometheus scrape targets and their health status.""" with httpx.Client(timeout=20) as client: r = client.get(f"{PROMETHEUS_URL}/api/v1/targets") r.raise_for_status() data = r.json() active = data["data"].get("activeTargets", []) rows = [] for t in sorted(active, key=lambda x: x.get("labels", {}).get("job", "")): job = t.get("labels", {}).get("job", "?") instance = t.get("labels", {}).get("instance", "?") health = t.get("health", "?") last_scrape = t.get("lastScrapeDuration", 0) rows.append( f" {'✓' if health == 'up' else '✗'} {job:30s} {instance:40s} {health}" ) return f"Prometheus targets ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Grafana # --------------------------------------------------------------------------- @mcp.tool() def grafana_list_dashboards() -> str: """List all Grafana dashboards.""" with httpx.Client(timeout=20) as client: r = client.get( f"{GRAFANA_URL}/api/search", params={"type": "dash-db"}, auth=(GRAFANA_USER, GRAFANA_PASS), ) r.raise_for_status() data = r.json() rows = [] for d in data: folder = d.get("folderTitle", "General") rows.append(f" [{d['uid']:20s}] {d['title']:50s} folder={folder}") return f"Dashboards ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def grafana_list_alerts() -> str: """List Grafana alert rules and their current state.""" with httpx.Client(timeout=20) as client: r = client.get( f"{GRAFANA_URL}/api/v1/provisioning/alert-rules", auth=(GRAFANA_USER, GRAFANA_PASS), ) r.raise_for_status() data = r.json() if not data: return "No alert rules configured." rows = [] for rule in data: rows.append(f" {rule.get('title', '?'):50s} uid={rule.get('uid', '?')}") return f"Alert rules ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Sonarr # --------------------------------------------------------------------------- @mcp.tool() def sonarr_list_series(filter_name: Optional[str] = None) -> str: """ List all series in Sonarr. Args: filter_name: Optional substring to filter by series title. """ data = _arr(SONARR_URL, SONARR_API_KEY, "/series") if filter_name: term = filter_name.lower() data = [s for s in data if term in s.get("title", "").lower()] rows = [] for s in sorted(data, key=lambda x: x.get("sortTitle", "")): status = s.get("status", "?") monitored = "✓" if s.get("monitored") else "✗" ep_count = s.get("episodeCount", 0) rows.append(f" {monitored} {s['title']:50s} {status:12s} {ep_count} eps") return f"Series ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def sonarr_queue() -> str: """Show the Sonarr download queue.""" data = _arr(SONARR_URL, SONARR_API_KEY, "/queue") records = data.get("records", []) if isinstance(data, dict) else data if not records: return "Sonarr queue is empty." rows = [] for item in records: title = item.get("title", "?")[:60] status = item.get("status", "?") size = item.get("size", 0) sizeleft = item.get("sizeleft", 0) pct = int((1 - sizeleft / size) * 100) if size else 0 rows.append(f" {status:12s} {pct:3d}% {title}") return f"Sonarr queue ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Radarr # --------------------------------------------------------------------------- @mcp.tool() def radarr_list_movies(filter_name: Optional[str] = None) -> str: """ List all movies in Radarr. Args: filter_name: Optional substring to filter by movie title. """ data = _arr(RADARR_URL, RADARR_API_KEY, "/movie") if filter_name: term = filter_name.lower() data = [m for m in data if term in m.get("title", "").lower()] rows = [] for m in sorted(data, key=lambda x: x.get("sortTitle", "")): monitored = "✓" if m.get("monitored") else "✗" downloaded = "↓" if m.get("hasFile") else " " year = m.get("year", "?") rows.append(f" {monitored}{downloaded} {m['title']:50s} {year}") return f"Movies ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def radarr_queue() -> str: """Show the Radarr download queue.""" data = _arr(RADARR_URL, RADARR_API_KEY, "/queue") records = data.get("records", []) if isinstance(data, dict) else data if not records: return "Radarr queue is empty." rows = [] for item in records: title = item.get("title", "?")[:60] status = item.get("status", "?") size = item.get("size", 0) sizeleft = item.get("sizeleft", 0) pct = int((1 - sizeleft / size) * 100) if size else 0 rows.append(f" {status:12s} {pct:3d}% {title}") return f"Radarr queue ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # SABnzbd # --------------------------------------------------------------------------- @mcp.tool() def sabnzbd_queue() -> str: """Show the SABnzbd download queue.""" data = _sabnzbd("queue") queue = data.get("queue", {}) slots = queue.get("slots", []) if not slots: return f"SABnzbd queue empty. Status: {queue.get('status', '?')}" rows = [] for s in slots: name = s.get("filename", "?")[:60] status = s.get("status", "?") pct = s.get("percentage", "0") size = s.get("sizeleft", "?") rows.append(f" {status:12s} {pct:>4s}% {size:>8s} left {name}") speed = queue.get("speed", "0") eta = queue.get("timeleft", "?") return f"SABnzbd queue ({len(rows)}) — {speed} — ETA {eta}:\n" + "\n".join(rows) @mcp.tool() def sabnzbd_pause() -> str: """Pause the SABnzbd download queue.""" _sabnzbd("pause") return "SABnzbd queue paused." @mcp.tool() def sabnzbd_resume() -> str: """Resume the SABnzbd download queue.""" _sabnzbd("resume") return "SABnzbd queue resumed." # --------------------------------------------------------------------------- # SSH # --------------------------------------------------------------------------- @mcp.tool() def ssh_exec(host: str, command: str, timeout: int = 30) -> str: """ Run a command on a homelab host via SSH. Known hosts: atlantis, calypso, setillo, setillo-root, nuc, homelab-vm, rpi5. Requires the host to be in ~/.ssh/config or /etc/hosts. Args: host: SSH host alias (e.g. 'atlantis', 'calypso', 'setillo-root'). command: Shell command to execute remotely. timeout: Seconds before the command times out. Default: 30. """ if host not in SSH_KNOWN_HOSTS: return ( f"Host '{host}' not in allowed list.\n" f"Known hosts: {', '.join(SSH_KNOWN_HOSTS)}" ) try: result = subprocess.run( ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", host, command], capture_output=True, text=True, timeout=timeout, ) output = result.stdout if result.stderr: output += f"\n[stderr]\n{result.stderr}" if result.returncode != 0: output += f"\n[exit code {result.returncode}]" return output or "(no output)" except subprocess.TimeoutExpired: return f"Command timed out after {timeout}s." except Exception as e: return f"SSH error: {type(e).__name__}: {e}" # --------------------------------------------------------------------------- # Filesystem # --------------------------------------------------------------------------- @mcp.tool() def fs_read(path: str) -> str: """ Read a file from the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Absolute or ~-relative path to the file. """ try: p = _fs_safe(path) if not p.exists(): return f"File not found: {p}" if p.is_dir(): return f"'{p}' is a directory. Use fs_list to list it." size = p.stat().st_size if size > 1_000_000: return f"File too large ({size:,} bytes). Read it in parts or use grep." return p.read_text(errors="replace") except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error reading file: {e}" @mcp.tool() def fs_write(path: str, content: str) -> str: """ Write content to a file on the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Absolute or ~-relative path to the file. content: Text content to write. """ try: p = _fs_safe(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) return f"Written {len(content)} bytes to {p}" except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error writing file: {e}" @mcp.tool() def fs_list(path: str = "/home/homelab") -> str: """ List the contents of a directory on the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Directory path. Default: /home/homelab. """ try: p = _fs_safe(path) if not p.exists(): return f"Path not found: {p}" if not p.is_dir(): return f"'{p}' is a file, not a directory." entries = sorted(p.iterdir(), key=lambda x: (x.is_file(), x.name)) rows = [] for entry in entries: kind = "DIR " if entry.is_dir() else "FILE" size = entry.stat().st_size if entry.is_file() else "" size_str = f"{size:>10,}" if size != "" else f"{'':>10}" rows.append(f" {kind} {size_str} {entry.name}") return f"Contents of {p} ({len(rows)} entries):\n" + "\n".join(rows) except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error listing directory: {e}" # --------------------------------------------------------------------------- # Helper functions for new tools # --------------------------------------------------------------------------- def _adguard_token() -> str: """Get AdGuard session token via login.""" with httpx.Client(timeout=10) as client: r = client.post( f"{ADGUARD_URL}/control/login", json={"name": ADGUARD_USER, "password": ADGUARD_PASS}, ) r.raise_for_status() # Cookie-based auth return r.cookies.get("agh_session") or "" def _adguard(method: str, path: str, **kwargs) -> dict | list: """Make an AdGuard API request with cookie auth.""" with httpx.Client(timeout=15) as client: # Login first to get session cookie login = client.post( f"{ADGUARD_URL}/control/login", json={"name": ADGUARD_USER, "password": ADGUARD_PASS}, ) login.raise_for_status() r = client.request(method, f"{ADGUARD_URL}/control{path}", **kwargs) r.raise_for_status() if r.content and r.headers.get("content-type", "").startswith( "application/json" ): return r.json() return {} def _npm_token() -> str: """Get NPM API token.""" with httpx.Client(timeout=10) as client: r = client.post( f"{NPM_URL}/api/tokens", json={"identity": NPM_USER, "secret": NPM_PASS}, ) r.raise_for_status() return r.json()["token"] def _npm(method: str, path: str, token: str | None = None, **kwargs) -> dict | list: """Make an NPM API request.""" if token is None: token = _npm_token() with httpx.Client(timeout=15) as client: r = client.request( method, f"{NPM_URL}/api{path}", headers={"Authorization": f"Bearer {token}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _authentik(method: str, path: str, **kwargs) -> dict | list: """Make an Authentik API request.""" with httpx.Client(timeout=15, verify=False) as client: r = client.request( method, f"{AUTHENTIK_URL}/api/v3{path}", headers={"Authorization": f"Bearer {AUTHENTIK_TOKEN}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _cloudflare(method: str, path: str, **kwargs) -> dict: """Make a Cloudflare API request.""" with httpx.Client(timeout=15) as client: r = client.request( method, f"https://api.cloudflare.com/client/v4{path}", headers={"Authorization": f"Bearer {CLOUDFLARE_TOKEN}"}, **kwargs, ) r.raise_for_status() return r.json() # --------------------------------------------------------------------------- # AdGuard tools # --------------------------------------------------------------------------- @mcp.tool() def adguard_list_rewrites() -> str: """ List all DNS rewrite rules in AdGuard Home (Calypso). Returns domain → answer mappings. Useful for checking split-horizon DNS overrides (e.g. which *.vish.gg services are pinned to specific IPs). """ try: data = _adguard("GET", "/rewrite/list") if not data: return "No rewrite rules found." lines = [f" {r['domain']:45s} → {r['answer']}" for r in data] # type: ignore return f"AdGuard DNS rewrites ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def adguard_add_rewrite(domain: str, answer: str) -> str: """ Add a DNS rewrite rule to AdGuard Home. Use this to add split-horizon DNS overrides so internal services resolve to LAN/Tailscale IPs instead of public ones. Args: domain: The domain to override (e.g. 'pt.vish.gg' or '*.example.com'). answer: The IP address to return (e.g. '192.168.0.154'). """ try: _adguard("POST", "/rewrite/add", json={"domain": domain, "answer": answer}) return f"Added rewrite: {domain} → {answer}" except Exception as e: return f"Error adding rewrite: {e}" @mcp.tool() def adguard_delete_rewrite(domain: str, answer: str) -> str: """ Delete a DNS rewrite rule from AdGuard Home. Args: domain: The domain of the rule to delete. answer: The answer IP of the rule to delete. """ try: _adguard("POST", "/rewrite/delete", json={"domain": domain, "answer": answer}) return f"Deleted rewrite: {domain} → {answer}" except Exception as e: return f"Error deleting rewrite: {e}" # --------------------------------------------------------------------------- # NPM (Nginx Proxy Manager) tools # --------------------------------------------------------------------------- @mcp.tool() def npm_list_proxy_hosts(filter_domain: Optional[str] = None) -> str: """ List all proxy hosts in Nginx Proxy Manager (matrix-ubuntu). Args: filter_domain: Optional substring to filter by domain name. """ try: token = _npm_token() hosts = _npm("GET", "/nginx/proxy-hosts", token=token) results = [] for h in hosts: # type: ignore domains = ", ".join(h.get("domain_names", [])) if filter_domain and filter_domain.lower() not in domains.lower(): continue fwd = f"{h.get('forward_scheme')}://{h.get('forward_host')}:{h.get('forward_port')}" cert = h.get("certificate_id", 0) enabled = "✓" if h.get("enabled") else "✗" results.append( f" [{enabled}] ID:{h['id']:3d} {domains:45s} → {fwd:35s} cert:{cert}" ) if not results: return "No proxy hosts found." return f"NPM proxy hosts ({len(results)}):\n" + "\n".join(results) except Exception as e: return f"Error: {e}" @mcp.tool() def npm_list_certs() -> str: """ List all SSL certificates in Nginx Proxy Manager with their domains and expiry. """ try: token = _npm_token() certs = _npm("GET", "/nginx/certificates", token=token) lines = [] for c in certs: # type: ignore domains = ", ".join(c.get("domain_names", [])) provider = c.get("provider", "?") expires = (c.get("expires_on") or "?")[:10] nice = c.get("nice_name", "") lines.append( f" ID:{c['id']:3d} [{provider:12s}] expires:{expires} {nice or domains}" ) return f"NPM certificates ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def npm_get_proxy_host(host_id: int) -> str: """ Get details of a specific NPM proxy host including advanced config and cert. Args: host_id: The proxy host ID (from npm_list_proxy_hosts). """ try: token = _npm_token() h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token) lines = [ f"ID: {h['id']}", # type: ignore f"Domains: {', '.join(h['domain_names'])}", # type: ignore f"Forward: {h['forward_scheme']}://{h['forward_host']}:{h['forward_port']}", # type: ignore f"SSL forced: {h.get('ssl_forced')}", # type: ignore f"Certificate ID: {h.get('certificate_id')}", # type: ignore f"Websocket: {h.get('allow_websocket_upgrade')}", # type: ignore f"Enabled: {h.get('enabled')}", # type: ignore ] adv = h.get("advanced_config", "").strip() # type: ignore if adv: lines.append(f"Advanced config:\n{adv}") meta = h.get("meta", {}) # type: ignore if isinstance(meta, dict) and meta.get("nginx_err"): lines.append(f"nginx_err: {meta['nginx_err']}") return "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def npm_update_cert(host_id: int, certificate_id: int) -> str: """ Update the SSL certificate used by an NPM proxy host. Args: host_id: The proxy host ID. certificate_id: The certificate ID to assign (from npm_list_certs). """ try: token = _npm_token() h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token) h_dict = h if isinstance(h, dict) else {} payload = { k: h_dict.get(k) for k in [ "domain_names", "forward_scheme", "forward_host", "forward_port", "access_list_id", "ssl_forced", "caching_enabled", "block_exploits", "advanced_config", "meta", "allow_websocket_upgrade", "http2_support", "hsts_enabled", "hsts_subdomains", "locations", ] } payload["certificate_id"] = certificate_id result = _npm("PUT", f"/nginx/proxy-hosts/{host_id}", token=token, json=payload) return f"Updated host {host_id} ({', '.join(result.get('domain_names', []))}) to cert {certificate_id}" # type: ignore except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Headscale tools # --------------------------------------------------------------------------- @mcp.tool() def headscale_list_nodes() -> str: """ List all nodes registered in the Headscale tailnet. Shows node ID, hostname, Tailscale IP, online status, and expiry. """ try: result = subprocess.run( [ "ssh", HEADSCALE_CALYPSO_SSH, "sudo /usr/local/bin/docker exec headscale headscale nodes list --output json", ], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" nodes = json.loads(result.stdout) lines = [] for n in nodes: ips = ", ".join(n.get("ip_addresses", [])) online = "🟢" if n.get("online") else "🔴" name = n.get("given_name") or n.get("name", "?") lines.append(f" {online} ID:{str(n['id']):3s} {name:25s} {ips}") return f"Headscale nodes ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def headscale_create_preauth_key( expiration: str = "24h", reusable: bool = False, ephemeral: bool = False, ) -> str: """ Create a Headscale pre-authentication key for registering a new node. Args: expiration: Key expiry duration e.g. '24h', '7d'. Default: '24h'. reusable: Allow multiple nodes to use this key. Default: False. ephemeral: Node is removed when it goes offline. Default: False. """ try: flags = f"--expiration {expiration}" if reusable: flags += " --reusable" if ephemeral: flags += " --ephemeral" result = subprocess.run( [ "ssh", HEADSCALE_CALYPSO_SSH, f"sudo /usr/local/bin/docker exec headscale headscale preauthkeys create --user admin {flags} --output json", ], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" data = json.loads(result.stdout) key = data.get("key", "?") exp = data.get("expiration", "?") return ( f"Pre-auth key created:\n" f" Key: {key}\n" f" Expires: {exp}\n" f" Reusable: {reusable}, Ephemeral: {ephemeral}\n\n" f"Use on new node:\n" f" tailscale up --login-server=https://headscale.vish.gg:8443 --authkey={key} --accept-routes=false" ) except Exception as e: return f"Error: {e}" @mcp.tool() def headscale_delete_node(node_id: str) -> str: """ Delete a node from the Headscale tailnet. Args: node_id: The numeric node ID (from headscale_list_nodes). """ try: result = subprocess.run( [ "ssh", HEADSCALE_CALYPSO_SSH, f"sudo /usr/local/bin/docker exec headscale headscale nodes delete --identifier {node_id} --output json", ], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" return f"Node {node_id} deleted successfully." except Exception as e: return f"Error: {e}" @mcp.tool() def headscale_rename_node(node_id: str, new_name: str) -> str: """ Rename a node in the Headscale tailnet. Args: node_id: The numeric node ID. new_name: The new hostname/givenName. """ try: result = subprocess.run( [ "ssh", HEADSCALE_CALYPSO_SSH, f"sudo /usr/local/bin/docker exec headscale headscale nodes rename --identifier {node_id} {new_name}", ], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" return f"Node {node_id} renamed to '{new_name}'." except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Authentik tools # --------------------------------------------------------------------------- @mcp.tool() def authentik_list_applications() -> str: """ List all applications configured in Authentik SSO. """ try: data = _authentik("GET", "/core/applications/?page_size=100") apps = data.get("results", []) # type: ignore lines = [] for a in apps: slug = a.get("slug", "?") name = a.get("name", "?") provider = a.get("provider") launch = a.get("meta_launch_url") or "" lines.append(f" {slug:30s} {name:35s} provider:{provider} {launch}") return f"Authentik applications ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def authentik_list_providers() -> str: """ List all OAuth2/OIDC/proxy providers in Authentik. """ try: data = _authentik("GET", "/providers/all/?page_size=100") providers = data.get("results", []) # type: ignore lines = [] for p in providers: pk = p.get("pk") name = p.get("name", "?") component = p.get("component", "?").replace("ak-provider-", "") lines.append(f" PK:{pk:4} [{component:20s}] {name}") return f"Authentik providers ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def authentik_list_users() -> str: """ List all users in Authentik. """ try: data = _authentik("GET", "/core/users/?page_size=100") users = data.get("results", []) # type: ignore lines = [] for u in users: pk = u.get("pk") username = u.get("username", "?") email = u.get("email", "?") active = "✓" if u.get("is_active") else "✗" lines.append(f" [{active}] PK:{pk:4} {username:20s} {email}") return f"Authentik users ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def authentik_update_app_launch_url(slug: str, launch_url: str) -> str: """ Update the launch URL of an Authentik application (the link shown on the dashboard). Args: slug: The application slug (from authentik_list_applications). launch_url: The new launch URL e.g. 'https://pt.vish.gg'. """ try: result = _authentik( "PATCH", f"/core/applications/{slug}/", json={"meta_launch_url": launch_url} ) return f"Updated '{slug}' launch URL to: {result.get('meta_launch_url')}" # type: ignore except Exception as e: return f"Error: {e}" @mcp.tool() def authentik_set_provider_cookie_domain(provider_pk: int, cookie_domain: str) -> str: """ Set the cookie domain on an Authentik proxy provider. Required to prevent redirect loops with Forward Auth. Args: provider_pk: The provider PK (from authentik_list_providers). cookie_domain: Cookie domain e.g. 'vish.gg'. """ try: provider = _authentik("GET", f"/providers/proxy/{provider_pk}/") provider["cookie_domain"] = cookie_domain # type: ignore result = _authentik("PUT", f"/providers/proxy/{provider_pk}/", json=provider) return f"Provider {provider_pk} cookie_domain set to: {result.get('cookie_domain')}" # type: ignore except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Cloudflare tools # --------------------------------------------------------------------------- @mcp.tool() def cloudflare_list_dns_records(filter_name: Optional[str] = None) -> str: """ List DNS records for the vish.gg zone in Cloudflare. Args: filter_name: Optional substring to filter by record name. """ try: data = _cloudflare( "GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records?per_page=200" ) records = data.get("result", []) lines = [] for r in records: name = r.get("name", "?") if filter_name and filter_name.lower() not in name.lower(): continue rtype = r.get("type", "?") content = r.get("content", "?") proxied = "☁" if r.get("proxied") else "⚡" lines.append(f" {proxied} {rtype:6s} {name:45s} → {content}") return f"Cloudflare DNS records ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def cloudflare_create_dns_record( name: str, content: str, record_type: str = "A", proxied: bool = True, ttl: int = 1, ) -> str: """ Create a new DNS record in the vish.gg Cloudflare zone. Args: name: Record name e.g. 'pt' (becomes pt.vish.gg) or 'pt.vish.gg'. content: Record value e.g. '184.23.52.14' or 'calypso.vish.gg'. record_type: DNS type: 'A', 'CNAME', 'TXT', etc. Default: 'A'. proxied: Route through Cloudflare proxy. Default: True. ttl: TTL in seconds (1 = auto). Default: 1. """ try: data = _cloudflare( "POST", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records", json={ "type": record_type, "name": name, "content": content, "proxied": proxied, "ttl": ttl, }, ) r = data.get("result", {}) return f"Created: {r.get('type')} {r.get('name')} → {r.get('content')} proxied:{r.get('proxied')}" except Exception as e: return f"Error: {e}" @mcp.tool() def cloudflare_delete_dns_record(record_id: str) -> str: """ Delete a DNS record from the vish.gg Cloudflare zone. Args: record_id: The record ID (from cloudflare_list_dns_records — use the ID shown). """ try: _cloudflare("DELETE", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}") return f"Deleted DNS record {record_id}." except Exception as e: return f"Error: {e}" @mcp.tool() def cloudflare_update_dns_record( record_id: str, content: str, proxied: Optional[bool] = None, ) -> str: """ Update an existing DNS record in Cloudflare. Args: record_id: The record ID to update. content: New record value (IP or hostname). proxied: Update proxied status. None = keep existing. """ try: existing_data = _cloudflare( "GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}" ) existing = existing_data.get("result", {}) payload = { "type": existing.get("type"), "name": existing.get("name"), "content": content, "proxied": proxied if proxied is not None else existing.get("proxied"), "ttl": existing.get("ttl", 1), } data = _cloudflare( "PUT", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}", json=payload ) r = data.get("result", {}) return ( f"Updated: {r.get('name')} → {r.get('content')} proxied:{r.get('proxied')}" ) except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Uptime Kuma tools # --------------------------------------------------------------------------- def _kuma_sqlite(query: str) -> str: """Run a SQLite query against the Kuma DB via SSH.""" result = subprocess.run( [ "ssh", KUMA_HOST, f'docker exec {KUMA_CONTAINER} sqlite3 /app/data/kuma.db "{query}"', ], capture_output=True, text=True, timeout=20, ) if result.returncode != 0: raise RuntimeError(result.stderr) return result.stdout.strip() @mcp.tool() def kuma_list_monitors(filter_name: Optional[str] = None) -> str: """ List all monitors in Uptime Kuma with their status and type. Args: filter_name: Optional substring to filter by monitor name. """ try: rows = _kuma_sqlite( "SELECT id, name, type, active, url, hostname, port, parent FROM monitor ORDER BY parent, name;" ) if not rows: return "No monitors found." lines = [] for row in rows.splitlines(): parts = row.split("|") if len(parts) < 8: continue mid, name, mtype, active, url, hostname, port, parent = parts[:8] if filter_name and filter_name.lower() not in name.lower(): continue status = "✓" if active == "1" else "✗" target = url or (f"{hostname}:{port}" if hostname else "") indent = " └─ " if parent and parent != "" else "" lines.append( f" [{status}] ID:{mid:4s} {indent}{name:30s} [{mtype:8s}] {target}" ) return f"Kuma monitors ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def kuma_list_groups() -> str: """ List all monitor groups in Uptime Kuma (for use as parent IDs). """ try: rows = _kuma_sqlite( "SELECT id, name, parent FROM monitor WHERE type='group' ORDER BY name;" ) if not rows: return "No groups found." lines = [] for row in rows.splitlines(): parts = row.split("|") mid, name = parts[0], parts[1] parent = parts[2] if len(parts) > 2 else "" lines.append(f" ID:{mid:4s} {name:30s} parent:{parent or 'none'}") return f"Kuma groups:\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() def kuma_add_monitor( name: str, monitor_type: str, url: Optional[str] = None, hostname: Optional[str] = None, port: Optional[int] = None, parent_id: Optional[int] = None, interval: int = 60, ignore_tls: bool = False, ) -> str: """ Add a new monitor to Uptime Kuma. Requires a Kuma restart to take effect (call kuma_restart after adding monitors). Args: name: Monitor display name. monitor_type: 'http', 'port', 'ping', 'group'. url: URL for http monitors e.g. 'https://pt.vish.gg/'. hostname: Hostname/IP for port/ping monitors. port: Port number for port monitors. parent_id: Parent group monitor ID (from kuma_list_groups). interval: Check interval in seconds. Default: 60. ignore_tls: Ignore TLS cert errors. Default: False. """ try: url_val = url or "https://" host_val = hostname or "" port_val = port or 0 parent_val = parent_id or "NULL" ignore_tls_val = 1 if ignore_tls else 0 query = ( f"INSERT INTO monitor " f"(name, active, user_id, interval, url, type, weight, hostname, port, " f"maxretries, ignore_tls, upside_down, maxredirects, accepted_statuscodes_json, parent) " f"VALUES ('{name}', 1, 1, {interval}, '{url_val}', '{monitor_type}', 2000, " f"'{host_val}', {port_val}, 3, {ignore_tls_val}, 0, 10, '[\"200-299\"]', {parent_val});" f"SELECT last_insert_rowid();" ) result = _kuma_sqlite(query) monitor_id = result.strip().split("\n")[-1] return ( f"Monitor '{name}' added (ID: {monitor_id}).\n" f"Call kuma_restart to activate." ) except Exception as e: return f"Error: {e}" @mcp.tool() def kuma_set_parent(monitor_id: int, parent_id: Optional[int] = None) -> str: """ Set or clear the parent group of a Kuma monitor. Args: monitor_id: Monitor ID to update. parent_id: Parent group ID, or None to remove from group. """ try: parent_val = str(parent_id) if parent_id is not None else "NULL" _kuma_sqlite(f"UPDATE monitor SET parent={parent_val} WHERE id={monitor_id};") return f"Monitor {monitor_id} parent set to {parent_val}. Call kuma_restart to apply." except Exception as e: return f"Error: {e}" @mcp.tool() def kuma_restart() -> str: """ Restart the Uptime Kuma container to apply DB changes. Required after any kuma_add_monitor or kuma_set_parent operations. """ try: result = subprocess.run( ["ssh", KUMA_HOST, f"docker restart {KUMA_CONTAINER}"], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return f"Error restarting Kuma: {result.stderr}" return "Uptime Kuma restarted successfully." except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) mcp.run()