#!/usr/bin/env python3 """ Homelab MCP Server Provides MCP tools for managing homelab infrastructure: - Portainer: stack/container management across all endpoints - Gitea: repo, issue, and branch management - Prometheus: PromQL queries and target inspection - Grafana: dashboard and alert listing - Sonarr/Radarr: media library and download queue - SABnzbd: download queue management - SSH: remote command execution on homelab hosts - Filesystem: read/write files on the local machine """ import functools import json import logging import os import shlex import socket import subprocess import traceback from pathlib import Path from typing import Optional import httpx from fastmcp import FastMCP logger = logging.getLogger("homelab-mcp") def _safe(func): """Wrap MCP tool functions so unhandled exceptions return error strings instead of crashing the server process.""" @functools.wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: logger.error("Tool %s failed: %s\n%s", func.__name__, e, traceback.format_exc()) return f"Error in {func.__name__}: {type(e).__name__}: {e}" return wrapper # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- PORTAINER_URL = "http://100.83.230.112:10000" PORTAINER_TOKEN = "REDACTED_TOKEN" GITEA_TOKEN = "REDACTED_TOKEN" NTFY_BASE = "https://ntfy.vish.gg" REPO_PATH = Path("/home/homelab/organized/repos/homelab") ENDPOINTS: dict[str, int] = { "atlantis": 2, "calypso": 443397, "nuc": 443398, "homelab": 443399, "rpi5": 443395, # Portainer endpoint is literally named "rpi5" "pi-5": 443395, # alias — matches the SSH alias, same physical host } # Gitea — now on matrix-ubuntu via NPM GITEA_URL = "https://git.vish.gg" GITEA_ORG = "vish" # Monitoring PROMETHEUS_URL = "http://192.168.0.210:9090" GRAFANA_URL = "http://192.168.0.210:3300" GRAFANA_USER = "admin" GRAFANA_PASS = "REDACTED_PASSWORD" # Media SONARR_URL = "http://192.168.0.200:8989" SONARR_API_KEY = "REDACTED_API_KEY" RADARR_URL = "http://192.168.0.200:7878" RADARR_API_KEY = "REDACTED_API_KEY" SABNZBD_URL = "http://192.168.0.200:8080" SABNZBD_API_KEY = "REDACTED_API_KEY" # AdGuard — primary on Calypso, secondary on NUC ADGUARD_URL = "http://192.168.0.250:9080" ADGUARD_USER = "vish" ADGUARD_PASS = "REDACTED_PASSWORD" # NPM — now on matrix-ubuntu NPM_URL = "http://192.168.0.154:81" NPM_USER = "your-email@example.com" NPM_PASS = "REDACTED_PASSWORD" # Headscale — on Calypso HEADSCALE_URL = "https://headscale.vish.gg:8443" HEADSCALE_CONTAINER = "headscale" HEADSCALE_CALYPSO_SSH = "calypso" # Authentik — on Calypso AUTHENTIK_URL = "https://sso.vish.gg" AUTHENTIK_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret # Cloudflare CLOUDFLARE_TOKEN = ( "FGXlHM7doB8Z4vxv84_ntzhG_Cx15RXs66zoouZU" # pragma: allowlist secret ) CLOUDFLARE_ZONE_ID = "4dbd15d096d71101b7c0c6362b307a66" # Ollama — on Olares (LAN NodePort, bypasses Olares auth proxy) OLLAMA_URL = "http://192.168.0.145:31434" OLLAMA_MODEL = "qwen3-coder:latest" # Jellyfin — on Olares (via kubectl exec to bypass envoy auth sidecar) JELLYFIN_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret JELLYFIN_USER_ID = "308e0dab19ce4a2180a2933d73694514" JELLYFIN_URL = "http://192.168.0.145:30096" # LAN NodePort, used for display only # Authentik — constants for provider/app creation AUTHENTIK_OUTPOST_PK = "9dcb1d53-a023-4222-a320-d27f66f06eb9" AUTHENTIK_DEFAULT_AUTH_FLOW = "default-provider-authorization-implicit-consent" AUTHENTIK_DEFAULT_INVALIDATION_FLOW = "default-provider-invalidation-flow" # Uptime Kuma — on Pi-5 KUMA_DB_PATH = "/home/vish/docker/kuma/data/kuma.db" KUMA_CONTAINER = "uptime-kuma" KUMA_HOST = "pi-5" # SSH — hostnames must resolve via /etc/hosts or ~/.ssh/config SSH_KNOWN_HOSTS = [ "atlantis", "calypso", "setillo", "setillo-root", "nuc", "homelab-vm", "pi-5", "matrix-ubuntu", "moon", "olares", "guava", "pve", "seattle", "seattle-tailscale", "gl-mt3000", "gl-be3600", "jellyfish", "deck", ] # Friendly-name aliases that map to real SSH config entries. # Lets callers use the Portainer endpoint name (e.g. "rpi5") and have ssh_exec # still reach the right box without manual translation. SSH_HOST_ALIASES: dict[str, str] = { "rpi5": "pi-5", } # Hosts that refer to the machine this MCP server runs on — ssh_exec executes # them locally via subprocess instead of dialing SSH. Matches CLAUDE.md: # "homelab-vm IS localhost — never SSH into it, use local commands." LOCAL_HOSTS: set[str] = { "localhost", "127.0.0.1", "homelab-vm", socket.gethostname(), socket.gethostname().split(".")[0], } # Filesystem — restrict read/write to safe root paths FS_ALLOWED_ROOTS = [ Path("/home/homelab"), Path("/tmp"), ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _portainer(method: str, path: str, **kwargs) -> dict | list: """Make a Portainer API request. Raises on HTTP error.""" with httpx.Client(verify=False, timeout=30) as client: r = client.request( method, f"{PORTAINER_URL}/api{path}", headers={"X-API-Key": PORTAINER_TOKEN}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _resolve_endpoint(endpoint: str) -> int: """Resolve endpoint name or numeric string to an endpoint ID.""" if endpoint.isdigit(): return int(endpoint) ep = endpoint.lower() if ep not in ENDPOINTS: raise ValueError( f"Unknown endpoint '{endpoint}'. Known: {', '.join(ENDPOINTS)}" ) return ENDPOINTS[ep] def _gitea(method: str, path: str, **kwargs) -> dict | list: """Make a Gitea API request.""" with httpx.Client(timeout=20) as client: r = client.request( method, f"{GITEA_URL}/api/v1{path}", headers={"Authorization": f"token {GITEA_TOKEN}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _arr( base_url: str, api_key: str, path: str, params: dict | None = None ) -> dict | list: """Make a Sonarr/Radarr API request.""" with httpx.Client(timeout=20) as client: r = client.get( f"{base_url}/api/v3{path}", headers={"X-Api-Key": api_key}, params=params or {}, ) r.raise_for_status() return r.json() def _sabnzbd(mode: str, extra: dict | None = None) -> dict: """Make a SABnzbd API request.""" params = {"apikey": SABNZBD_API_KEY, "output": "json", "mode": mode} if extra: params.update(extra) with httpx.Client(timeout=20) as client: r = client.get(f"{SABNZBD_URL}/api", params=params) r.raise_for_status() return r.json() def _fs_safe(path: str) -> Path: """Resolve a path and verify it's under an allowed root.""" p = Path(path).expanduser().resolve() for root in FS_ALLOWED_ROOTS: try: p.relative_to(root) return p except ValueError: continue allowed = ", ".join(str(r) for r in FS_ALLOWED_ROOTS) raise PermissionError(f"Path '{path}' is outside allowed roots: {allowed}") # --------------------------------------------------------------------------- # MCP Server # --------------------------------------------------------------------------- mcp = FastMCP( "Homelab", instructions=( "Tools for managing a homelab running Docker services across multiple hosts.\n\n" "PORTAINER — Docker orchestration across 5 endpoints:\n" " Endpoints: atlantis (main NAS, media stack), calypso (secondary NAS), " "nuc (mini PC), homelab (VM at 192.168.0.210), rpi5 (Raspberry Pi).\n" " Tools: list_endpoints, list_stacks, get_stack, redeploy_stack, " "list_containers, get_container_logs, restart_container, start_container, " "stop_container, list_stack_containers, check_portainer.\n\n" "GITEA — Self-hosted git at git.vish.gg, org=vish. Repo names can be " "'vish/homelab' or just 'homelab'.\n" " Tools: gitea_list_repos, gitea_list_issues, gitea_create_issue, gitea_list_branches.\n\n" "ADGUARD — DNS rewrite management for split-horizon DNS (Calypso, 192.168.0.250:9080).\n" " The wildcard *.vish.gg → 100.85.21.51 (matrix-ubuntu) requires specific overrides\n" " for unproxied services accessed internally (pt.vish.gg, sso.vish.gg, git.vish.gg).\n" " Tools: adguard_list_rewrites, adguard_add_rewrite, adguard_delete_rewrite.\n\n" "NPM — Nginx Proxy Manager on matrix-ubuntu (192.168.0.154:81).\n" " Cert IDs: npm-6=mx.vish.gg, npm-7=livekit.mx.vish.gg, npm-8=*.vish.gg(CF),\n" " npm-11=pt.vish.gg, npm-12=sso.vish.gg. NEVER reuse an existing npm-N ID.\n" " Tools: npm_list_proxy_hosts, npm_list_certs, npm_get_proxy_host, npm_update_cert.\n\n" "HEADSCALE — Self-hosted Tailscale coordination server on Calypso.\n" " Access via SSH to calypso → docker exec headscale.\n" " Tools: headscale_list_nodes, headscale_create_preauth_key, headscale_delete_node, " "headscale_rename_node, headscale_expire_node, headscale_list_routes, " "headscale_approve_routes, headscale_list_preauth_keys, headscale_tag_node, " "headscale_list_users, headscale_get_policy.\n\n" "AUTHENTIK — SSO identity provider at sso.vish.gg (Calypso).\n" " All proxy providers need cookie_domain=vish.gg to avoid redirect loops.\n" " Embedded outpost PK: 9dcb1d53-a023-4222-a320-d27f66f06eb9.\n" " To onboard a new service: create_proxy_provider → create_application (auto-binds to outpost).\n" " Tools: authentik_list_applications, authentik_list_providers, authentik_list_users,\n" " authentik_update_app_launch_url, authentik_set_provider_cookie_domain,\n" " authentik_create_proxy_provider, authentik_create_application,\n" " authentik_list_sessions, authentik_delete_session, authentik_get_events.\n\n" "CLOUDFLARE — DNS management for vish.gg zone.\n" " Most *.vish.gg are proxied (orange cloud). Unproxied: mx.vish.gg, headscale.vish.gg,\n" " livekit.mx.vish.gg, pt.vish.gg, sso.vish.gg, derp*.vish.gg.\n" " Tools: cloudflare_list_dns_records, cloudflare_create_dns_record,\n" " cloudflare_delete_dns_record, cloudflare_update_dns_record.\n\n" "UPTIME KUMA — Monitoring on Pi-5 (100.77.151.40:3001). DB manipulation via SSH+SQLite.\n" " Always call kuma_restart after adding/modifying monitors.\n" " Tools: kuma_list_monitors, kuma_list_groups, kuma_add_monitor, kuma_set_parent, " "kuma_restart.\n\n" "PROMETHEUS — PromQL queries against homelab metrics (192.168.0.210:9090).\n" " Tools: prometheus_query, prometheus_targets.\n\n" "GRAFANA — Dashboard and alert inspection (192.168.0.210:3300).\n" " Tools: grafana_list_dashboards, grafana_list_alerts.\n\n" "SONARR/RADARR — Media library and download queue on Atlantis (ports 8989/7878).\n" " Tools: sonarr_list_series, sonarr_queue, radarr_list_movies, radarr_queue.\n\n" "SABNZBD — Usenet download queue on Atlantis (port 8080).\n" " Tools: sabnzbd_queue, sabnzbd_pause, sabnzbd_resume.\n\n" "SSH — Run commands on homelab hosts. Allowed: atlantis, calypso, setillo, " "setillo-root, nuc, homelab-vm, pi-5 (alias: rpi5), matrix-ubuntu, moon, " "olares, guava, pve, seattle, gl-mt3000, gl-be3600, jellyfish. " "Requires key auth in ~/.ssh/config.\n" " NOTE: This MCP runs ON homelab-vm. Passing host='homelab-vm' (or " "'localhost') executes locally via subprocess — no SSH hop.\n" " Tool: ssh_exec(host, command).\n\n" "FILESYSTEM — Read/write files on the local machine. " "Allowed roots: /home/homelab, /tmp.\n" " Tools: fs_read, fs_write, fs_list.\n\n" "JELLYFIN — Media server on Olares (LAN 192.168.0.145:30096).\n" " Tools: jellyfin_libraries, jellyfin_sessions, jellyfin_system_info.\n\n" "OLARES — Kubernetes (K3s) cluster on Olares (192.168.0.145).\n" " RTX 5090 GPU, runs Jellyfin, Ollama, vLLM. Access via SSH+kubectl.\n" " Tools: olares_pods, olares_gpu, olares_pod_logs, olares_restart.\n\n" "OLLAMA — Local LLM on Olares (LAN 192.168.0.145:31434).\n" " Model: qwen3-coder:latest. Bypasses Olares auth proxy.\n" " Tool: ollama_query(prompt, model, max_tokens, temperature).\n\n" "REPO — Inspect compose files in the homelab git repo at " "/home/homelab/organized/repos/homelab.\n" " Tools: list_homelab_services, get_compose_file.\n\n" "UTILITIES — check_url (HTTP health check), send_notification (ntfy push)." ), ) # --------------------------------------------------------------------------- # Portainer — Endpoints # --------------------------------------------------------------------------- @mcp.tool() @_safe def list_endpoints() -> str: """List all Portainer environments (servers) with their connection status.""" data = _portainer("GET", "/endpoints") rows = [] for ep in data: rows.append( f" {ep['Name']} (id={ep['Id']}) — " f"status={'online' if ep.get('Status') == 1 else 'offline'} — " f"containers={ep.get('Snapshots', [{}])[0].get('RunningContainerCount', '?')} running" ) return "Endpoints:\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Portainer — Stacks # --------------------------------------------------------------------------- @mcp.tool() @_safe def list_stacks(endpoint: Optional[str] = None) -> str: """ List all Portainer stacks with their status. Args: endpoint: Optional filter by endpoint name (atlantis, calypso, nuc, homelab, rpi5) or numeric ID. """ data = _portainer("GET", "/stacks") if endpoint: ep_id = _resolve_endpoint(endpoint) data = [s for s in data if s.get("EndpointId") == ep_id] rows = [] for s in sorted(data, key=lambda x: x["Name"]): status = "active" if s.get("Status") == 1 else "inactive" ep_name = next( (k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")), str(s.get("EndpointId")), ) git = s.get("GitConfig", {}) git_info = f" [git: {git.get('ConfigFilePath', '')}]" if git else "" rows.append( f" [{s['Id']}] {s['Name']} — {status} — endpoint={ep_name}{git_info}" ) return f"Stacks ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def get_stack(stack_name_or_id: str) -> str: """ Get detailed info about a specific stack. Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] git = s.get("GitConfig") or {} env = s.get("Env") or [] ep_name = next( (k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")), str(s.get("EndpointId")), ) lines = [ f"Stack: {s['Name']} (id={s['Id']})", f" Status: {'active' if s.get('Status') == 1 else 'inactive'}", f" Endpoint: {ep_name} (id={s.get('EndpointId')})", f" Created: {s.get('CreationDate', 'unknown')}", f" Updated: {s.get('UpdateDate', 'unknown')}", ] if git: lines += [ f" Git URL: {git.get('URL', '')}", f" Git Branch: {git.get('ReferenceName', '').replace('refs/heads/', '')}", f" Git File: {git.get('ConfigFilePath', '')}", ] if env: lines.append(f" Env vars: {len(env)} set") return "\n".join(lines) @mcp.tool() @_safe def redeploy_stack(stack_name_or_id: str) -> str: """ Trigger a GitOps redeploy of a stack (pull latest from Git and redeploy). Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] if not s.get("GitConfig"): return f"Stack '{s['Name']}' is not a GitOps stack — cannot redeploy via git." ep_id = s["EndpointId"] stack_id = s["Id"] _portainer( "PUT", f"/stacks/{stack_id}/git/redeploy?endpointId={ep_id}", json={ "pullImage": True, "prune": False, "repositoryAuthentication": True, "repositoryUsername": "vish", "repositoryPassword": GITEA_TOKEN, }, ) return f"Redeploy triggered for stack '{s['Name']}' (id={stack_id}) on endpoint {ep_id}." # --------------------------------------------------------------------------- # Portainer — Containers # --------------------------------------------------------------------------- @mcp.tool() @_safe def list_containers( endpoint: str = "atlantis", all_containers: bool = False, filter_name: Optional[str] = None, ) -> str: """ List containers on a Portainer endpoint. Args: endpoint: Endpoint name (atlantis, calypso, nuc, homelab, rpi5) or ID. all_containers: If True, include stopped containers. Default False (running only). filter_name: Optional substring to filter container names. """ ep_id = _resolve_endpoint(endpoint) params = "?all=true" if all_containers else "" data = _portainer("GET", f"/endpoints/{ep_id}/docker/containers/json{params}") if filter_name: term = filter_name.lower() data = [c for c in data if any(term in n.lower() for n in c.get("Names", []))] rows = [] for c in sorted(data, key=lambda x: x.get("Names", [""])[0]): name = c.get("Names", ["?"])[0].lstrip("/") image = c.get("Image", "?").split(":")[0].split("/")[-1] state = c.get("State", "?") short_id = c.get("Id", "")[:12] rows.append(f" {short_id} {state:10s} {name:40s} {image}") header = f"Containers on {endpoint} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image" return header + "\n" + "\n".join(rows) @mcp.tool() @_safe def get_container_logs( container_id_or_name: str, endpoint: str = "atlantis", tail: int = 100, ) -> str: """ Get recent logs from a container. Args: container_id_or_name: Container ID (short or full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. tail: Number of log lines to return. Default: 100. """ ep_id = _resolve_endpoint(endpoint) # Resolve by name if not a hex ID if not all(c in "0123456789abcdefABCDEF" for c in container_id_or_name): containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) term = container_id_or_name.lower() matches = [ c for c in containers if any(term in n.lower() for n in c.get("Names", [])) ] if not matches: return ( f"No container found matching '{container_id_or_name}' on {endpoint}." ) if len(matches) > 1: names = ", ".join(c["Names"][0].lstrip("/") for c in matches) return f"Multiple matches: {names}. Be more specific." container_id_or_name = matches[0]["Id"][:12] with httpx.Client(verify=False, timeout=30) as client: r = client.get( f"{PORTAINER_URL}/api/endpoints/{ep_id}/docker/containers/{container_id_or_name}/logs", headers={"X-API-Key": PORTAINER_TOKEN}, params={"stdout": 1, "stderr": 1, "tail": tail, "timestamps": 0}, ) r.raise_for_status() # Docker log stream has 8-byte header per line; strip it raw = r.content lines = [] i = 0 while i < len(raw): if i + 8 > len(raw): break size = int.from_bytes(raw[i + 4 : i + 8], "big") line = raw[i + 8 : i + 8 + size].decode("utf-8", errors="replace").rstrip() if line: lines.append(line) i += 8 + size if not lines: # fallback: treat as plain text lines = r.text.splitlines() return "\n".join(lines[-tail:]) @mcp.tool() @_safe def restart_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Restart a container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/restart") return f"Restarted container {cid} on {endpoint}." @mcp.tool() @_safe def stop_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Stop a running container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/stop") return f"Stopped container {cid} on {endpoint}." @mcp.tool() @_safe def start_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Start a stopped container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/start") return f"Started container {cid} on {endpoint}." def _resolve_container_id(name_or_id: str, ep_id: int) -> str: """Resolve a container name substring to a short container ID.""" if len(name_or_id) >= 12 and all(c in "0123456789abcdefABCDEF" for c in name_or_id): return name_or_id[:12] containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) term = name_or_id.lower() matches = [ c for c in containers if any(term in n.lower() for n in c.get("Names", [])) ] if not matches: raise ValueError(f"No container found matching '{name_or_id}'.") if len(matches) > 1: names = ", ".join(c["Names"][0].lstrip("/") for c in matches) raise ValueError( f"Multiple containers match '{name_or_id}': {names}. Be more specific." ) return matches[0]["Id"][:12] # --------------------------------------------------------------------------- # Portainer — Stack containers (convenience) # --------------------------------------------------------------------------- @mcp.tool() @_safe def list_stack_containers(stack_name_or_id: str) -> str: """ List all containers belonging to a specific stack. Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] ep_id = s["EndpointId"] stack_name = s["Name"] containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) # Filter by compose project label (Portainer uses com.docker.compose.project) stack_containers = [ c for c in containers if c.get("Labels", {}).get("com.docker.compose.project", "").lower() == stack_name.lower() or any(stack_name.lower() in n.lower() for n in c.get("Names", [])) ] ep_name = next((k for k, v in ENDPOINTS.items() if v == ep_id), str(ep_id)) rows = [] for c in sorted(stack_containers, key=lambda x: x.get("Names", [""])[0]): name = c.get("Names", ["?"])[0].lstrip("/") state = c.get("State", "?") short_id = c.get("Id", "")[:12] image = c.get("Image", "?").split(":")[0].split("/")[-1] rows.append(f" {short_id} {state:10s} {name:40s} {image}") header = f"Containers in stack '{stack_name}' on {ep_name} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image" return header + "\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Health checks # --------------------------------------------------------------------------- @mcp.tool() @_safe def check_url(url: str, expected_status: int = 200) -> str: """ Perform an HTTP health check on a URL. Args: url: The URL to check (e.g. http://192.168.0.200:9443/api/status). expected_status: Expected HTTP status code. Default: 200. """ try: with httpx.Client(verify=False, timeout=10, follow_redirects=True) as client: r = client.get(url) ok = r.status_code == expected_status return ( f"{'OK' if ok else 'FAIL'} {url}\n" f" Status: {r.status_code} (expected {expected_status})\n" f" Latency: {r.elapsed.total_seconds() * 1000:.0f}ms" ) except Exception as e: return f"ERROR {url}\n {type(e).__name__}: {e}" @mcp.tool() @_safe def check_portainer() -> str: """Quick health check of the Portainer API and summary of infrastructure.""" try: status = _portainer("GET", "/status") stacks = _portainer("GET", "/stacks") stacks_list = stacks if isinstance(stacks, list) else [] status_dict = status if isinstance(status, dict) else {} active = sum( 1 for s in stacks_list if isinstance(s, dict) and s.get("Status") == 1 ) return ( f"Portainer OK — version {status_dict.get('Version', '?')}\n" f" Stacks: {len(stacks_list)} total, {active} active" ) except Exception as e: return f"Portainer UNREACHABLE: {e}" # --------------------------------------------------------------------------- # Repo — service inspection # --------------------------------------------------------------------------- @mcp.tool() @_safe def list_compose_files(host_filter: Optional[str] = None) -> str: """ List all docker-compose files found in the homelab repository. Use list_homelab_services for a service URL directory instead. Args: host_filter: Optional substring to filter by host/path (e.g. 'atlantis', 'calypso', 'seattle'). """ compose_files = list(REPO_PATH.rglob("docker-compose.yml")) + list( REPO_PATH.rglob("docker-compose.yaml") ) # Exclude archive compose_files = [f for f in compose_files if "archive" not in f.parts] rows = [] for f in sorted(compose_files): rel = f.relative_to(REPO_PATH) if host_filter and host_filter.lower() not in str(rel).lower(): continue rows.append(f" {rel}") return f"Compose files ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def get_compose_file(service_path: str) -> str: """ Read a compose file from the homelab repo. Args: service_path: Relative path within the repo, e.g. 'hosts/synology/atlantis/arr-suite/docker-compose.yml' or a partial name like 'atlantis/arr-suite'. """ # Try exact relative path first candidate = REPO_PATH / service_path if candidate.is_file(): return candidate.read_text() # Try fuzzy: find compose files whose path contains the fragment # Searches docker-compose.yml/yaml AND standalone *.yaml/*.yml stack files term = service_path.lower().replace("\\", "/") all_compose = ( list(REPO_PATH.rglob("docker-compose.yml")) + list(REPO_PATH.rglob("docker-compose.yaml")) + list(REPO_PATH.rglob("*.yaml")) + list(REPO_PATH.rglob("*.yml")) ) hits = [ f for f in all_compose if term in str(f.relative_to(REPO_PATH)).lower() and "archive" not in f.parts and ".git" not in f.parts and "node_modules" not in f.parts ] # Prefer docker-compose files if both match compose_hits = [f for f in hits if "docker-compose" in f.name] if compose_hits: hits = compose_hits if not hits: return f"No compose file found matching '{service_path}'." if len(hits) > 1: paths = "\n".join(f" {f.relative_to(REPO_PATH)}" for f in hits) return f"Multiple matches:\n{paths}\nBe more specific." return hits[0].read_text() # --------------------------------------------------------------------------- # Notifications # --------------------------------------------------------------------------- @mcp.tool() @_safe def send_notification( message: str, title: str = "Homelab", topic: str = "homelab-alerts", priority: str = "default", tags: Optional[str] = None, ) -> str: """ Send a push notification via ntfy. Args: message: The notification body. title: Notification title. Default: 'Homelab'. topic: ntfy topic to publish to. Default: 'homelab-alerts'. priority: urgent, high, default, low, or min. Default: 'default'. tags: Comma-separated emoji tags e.g. 'warning,robot'. Optional. """ headers = { "Title": title, "Priority": priority, } if tags: headers["Tags"] = tags with httpx.Client(timeout=10) as client: r = client.post( f"{NTFY_BASE}/{topic}", content=message.encode(), headers=headers, ) r.raise_for_status() return f"Notification sent to {NTFY_BASE}/{topic} — '{title}: {message}'" # --------------------------------------------------------------------------- # Gitea # --------------------------------------------------------------------------- @mcp.tool() @_safe def gitea_list_repos(owner: Optional[str] = None, limit: int = 50) -> str: """ List Gitea repositories. Args: owner: User or org name. Defaults to the service account's accessible repos. limit: Max repos to return. Default: 50. """ if owner: data = _gitea("GET", f"/repos/search", params={"owner": owner, "limit": limit}) repos = data.get("data", []) if isinstance(data, dict) else data else: repos = _gitea( "GET", f"/repos/search", params={"limit": limit, "token": GITEA_TOKEN} ) repos = repos.get("data", []) if isinstance(repos, dict) else repos rows = [] for r in repos: archived = " [archived]" if r.get("archived") else "" private = " [private]" if r.get("private") else "" rows.append( f" {r['full_name']}{private}{archived} — " f"⭐{r.get('stars_count', 0)} " f"updated: {r.get('updated', '')[:10]}" ) return f"Repos ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def gitea_list_issues( repo: str, state: str = "open", limit: int = 20, ) -> str: """ List issues for a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). state: 'open', 'closed', or 'all'. Default: 'open'. limit: Max issues to return. Default: 20. """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea( "GET", f"/repos/{repo}/issues", params={"state": state, "type": "issues", "limit": limit}, ) if not data: return f"No {state} issues in {repo}." rows = [] for issue in data: labels = ", ".join(l["name"] for l in issue.get("labels", [])) label_str = f" [{labels}]" if labels else "" rows.append( f" #{issue['number']} {issue['title']}{label_str} — @{issue['user']['login']}" ) return f"{state.capitalize()} issues in {repo} ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def gitea_create_issue(repo: str, title: str, body: str = "") -> str: """ Create a new issue in a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). title: Issue title. body: Issue body/description. Optional. """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea("POST", f"/repos/{repo}/issues", json={"title": title, "body": body}) if isinstance(data, dict): return f"Created issue #{data.get('number')}: {data.get('title')}\n URL: {data.get('html_url')}" return f"Issue created: {data}" @mcp.tool() @_safe def gitea_list_branches(repo: str) -> str: """ List branches for a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea("GET", f"/repos/{repo}/branches") rows = [ f" {b['name']}" + (" [default]" if b.get("is_default") else "") for b in data ] return f"Branches in {repo} ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Prometheus # --------------------------------------------------------------------------- @mcp.tool() @_safe def prometheus_query(query: str) -> str: """ Run an instant PromQL query. Args: query: PromQL expression e.g. 'up', 'node_memory_MemAvailable_bytes{job="node"}'. """ with httpx.Client(timeout=20) as client: r = client.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query}) r.raise_for_status() data = r.json() if data.get("status") != "success": return f"Query failed: {data.get('error', 'unknown error')}" results = data["data"]["result"] if not results: return f"No results for: {query}" rows = [] for item in results[:50]: # cap output metric = item["metric"] value = item["value"][1] if item.get("value") else "?" label_str = ", ".join( f'{k}="{v}"' for k, v in metric.items() if k != "__name__" ) name = metric.get("__name__", query) rows.append(f" {name}{{{label_str}}} = {value}") return f"Results ({len(results)}):\n" + "\n".join(rows) @mcp.tool() @_safe def prometheus_targets() -> str: """List all Prometheus scrape targets and their health status.""" with httpx.Client(timeout=20) as client: r = client.get(f"{PROMETHEUS_URL}/api/v1/targets") r.raise_for_status() data = r.json() active = data["data"].get("activeTargets", []) rows = [] for t in sorted(active, key=lambda x: x.get("labels", {}).get("job", "")): job = t.get("labels", {}).get("job", "?") instance = t.get("labels", {}).get("instance", "?") health = t.get("health", "?") last_scrape = t.get("lastScrapeDuration", 0) rows.append( f" {'✓' if health == 'up' else '✗'} {job:30s} {instance:40s} {health}" ) return f"Prometheus targets ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Grafana # --------------------------------------------------------------------------- @mcp.tool() @_safe def grafana_list_dashboards() -> str: """List all Grafana dashboards.""" with httpx.Client(timeout=20) as client: r = client.get( f"{GRAFANA_URL}/api/search", params={"type": "dash-db"}, auth=(GRAFANA_USER, GRAFANA_PASS), ) r.raise_for_status() data = r.json() rows = [] for d in data: folder = d.get("folderTitle", "General") rows.append(f" [{d['uid']:20s}] {d['title']:50s} folder={folder}") return f"Dashboards ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def grafana_list_alerts() -> str: """List Grafana alert rules and their current state.""" with httpx.Client(timeout=20) as client: r = client.get( f"{GRAFANA_URL}/api/v1/provisioning/alert-rules", auth=(GRAFANA_USER, GRAFANA_PASS), ) r.raise_for_status() data = r.json() if not data: return "No alert rules configured." rows = [] for rule in data: rows.append(f" {rule.get('title', '?'):50s} uid={rule.get('uid', '?')}") return f"Alert rules ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Sonarr # --------------------------------------------------------------------------- @mcp.tool() @_safe def sonarr_list_series(filter_name: Optional[str] = None) -> str: """ List all series in Sonarr. Args: filter_name: Optional substring to filter by series title. """ data = _arr(SONARR_URL, SONARR_API_KEY, "/series") if filter_name: term = filter_name.lower() data = [s for s in data if term in s.get("title", "").lower()] rows = [] for s in sorted(data, key=lambda x: x.get("sortTitle", "")): status = s.get("status", "?") monitored = "✓" if s.get("monitored") else "✗" ep_count = s.get("episodeCount", 0) rows.append(f" {monitored} {s['title']:50s} {status:12s} {ep_count} eps") return f"Series ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def sonarr_queue() -> str: """Show the Sonarr download queue.""" data = _arr(SONARR_URL, SONARR_API_KEY, "/queue") records = data.get("records", []) if isinstance(data, dict) else data if not records: return "Sonarr queue is empty." rows = [] for item in records: title = item.get("title", "?")[:60] status = item.get("status", "?") size = item.get("size", 0) sizeleft = item.get("sizeleft", 0) pct = int((1 - sizeleft / size) * 100) if size else 0 rows.append(f" {status:12s} {pct:3d}% {title}") return f"Sonarr queue ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Radarr # --------------------------------------------------------------------------- @mcp.tool() @_safe def radarr_list_movies(filter_name: Optional[str] = None) -> str: """ List all movies in Radarr. Args: filter_name: Optional substring to filter by movie title. """ data = _arr(RADARR_URL, RADARR_API_KEY, "/movie") if filter_name: term = filter_name.lower() data = [m for m in data if term in m.get("title", "").lower()] rows = [] for m in sorted(data, key=lambda x: x.get("sortTitle", "")): monitored = "✓" if m.get("monitored") else "✗" downloaded = "↓" if m.get("hasFile") else " " year = m.get("year", "?") rows.append(f" {monitored}{downloaded} {m['title']:50s} {year}") return f"Movies ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() @_safe def radarr_queue() -> str: """Show the Radarr download queue.""" data = _arr(RADARR_URL, RADARR_API_KEY, "/queue") records = data.get("records", []) if isinstance(data, dict) else data if not records: return "Radarr queue is empty." rows = [] for item in records: title = item.get("title", "?")[:60] status = item.get("status", "?") size = item.get("size", 0) sizeleft = item.get("sizeleft", 0) pct = int((1 - sizeleft / size) * 100) if size else 0 rows.append(f" {status:12s} {pct:3d}% {title}") return f"Radarr queue ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # SABnzbd # --------------------------------------------------------------------------- @mcp.tool() @_safe def sabnzbd_queue() -> str: """Show the SABnzbd download queue.""" data = _sabnzbd("queue") queue = data.get("queue", {}) slots = queue.get("slots", []) if not slots: return f"SABnzbd queue empty. Status: {queue.get('status', '?')}" rows = [] for s in slots: name = s.get("filename", "?")[:60] status = s.get("status", "?") pct = s.get("percentage", "0") size = s.get("sizeleft", "?") rows.append(f" {status:12s} {pct:>4s}% {size:>8s} left {name}") speed = queue.get("speed", "0") eta = queue.get("timeleft", "?") return f"SABnzbd queue ({len(rows)}) — {speed} — ETA {eta}:\n" + "\n".join(rows) @mcp.tool() @_safe def sabnzbd_pause() -> str: """Pause the SABnzbd download queue.""" _sabnzbd("pause") return "SABnzbd queue paused." @mcp.tool() @_safe def sabnzbd_resume() -> str: """Resume the SABnzbd download queue.""" _sabnzbd("resume") return "SABnzbd queue resumed." # --------------------------------------------------------------------------- # SSH # --------------------------------------------------------------------------- @mcp.tool() @_safe def ssh_exec(host: str, command: str, timeout: int = 30) -> str: """ Run a command on a homelab host via SSH. Known hosts: atlantis, calypso, setillo, setillo-root, nuc, homelab-vm, pi-5 (alias: rpi5), matrix-ubuntu, moon, olares, guava, pve, seattle, gl-mt3000, gl-be3600, jellyfish. Requires the host to be in ~/.ssh/config or /etc/hosts. Passing "homelab-vm" (or the local hostname / "localhost") runs the command locally via subprocess — this MCP server runs ON homelab-vm, so SSH-ing to it would either fail (name doesn't resolve) or hairpin. Args: host: SSH host alias (e.g. 'atlantis', 'calypso', 'setillo-root'). command: Shell command to execute remotely. timeout: Seconds before the command times out. Default: 30. """ host = SSH_HOST_ALIASES.get(host, host) is_local = host in LOCAL_HOSTS if not is_local and host not in SSH_KNOWN_HOSTS: return ( f"Host '{host}' not in allowed list.\n" f"Known hosts: {', '.join(SSH_KNOWN_HOSTS)}\n" f"Local aliases (run on this host): {', '.join(sorted(LOCAL_HOSTS))}" ) try: if is_local: cmd = ["bash", "-c", command] else: cmd = ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", host, command] result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, ) output = result.stdout if result.stderr: output += f"\n[stderr]\n{result.stderr}" if result.returncode != 0: output += f"\n[exit code {result.returncode}]" return output or "(no output)" except subprocess.TimeoutExpired: return f"Command timed out after {timeout}s." except Exception as e: return f"SSH error: {type(e).__name__}: {e}" # --------------------------------------------------------------------------- # Filesystem # --------------------------------------------------------------------------- @mcp.tool() @_safe def fs_read(path: str) -> str: """ Read a file from the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Absolute or ~-relative path to the file. """ try: p = _fs_safe(path) if not p.exists(): return f"File not found: {p}" if p.is_dir(): return f"'{p}' is a directory. Use fs_list to list it." size = p.stat().st_size if size > 1_000_000: return f"File too large ({size:,} bytes). Read it in parts or use grep." return p.read_text(errors="replace") except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error reading file: {e}" @mcp.tool() @_safe def fs_write(path: str, content: str) -> str: """ Write content to a file on the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Absolute or ~-relative path to the file. content: Text content to write. """ try: p = _fs_safe(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) return f"Written {len(content)} bytes to {p}" except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error writing file: {e}" @mcp.tool() @_safe def fs_list(path: str = "/home/homelab") -> str: """ List the contents of a directory on the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Directory path. Default: /home/homelab. """ try: p = _fs_safe(path) if not p.exists(): return f"Path not found: {p}" if not p.is_dir(): return f"'{p}' is a file, not a directory." entries = sorted(p.iterdir(), key=lambda x: (x.is_file(), x.name)) rows = [] for entry in entries: kind = "DIR " if entry.is_dir() else "FILE" size = entry.stat().st_size if entry.is_file() else "" size_str = f"{size:>10,}" if size != "" else f"{'':>10}" rows.append(f" {kind} {size_str} {entry.name}") return f"Contents of {p} ({len(rows)} entries):\n" + "\n".join(rows) except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error listing directory: {e}" # --------------------------------------------------------------------------- # Helper functions for new tools # --------------------------------------------------------------------------- _adguard_session_cache: dict = {"client": None} def _adguard(method: str, path: str, **kwargs) -> dict | list: """Make an AdGuard API request with cached session cookie.""" client = _adguard_session_cache.get("client") if client is None: client = httpx.Client(timeout=15) client.post( f"{ADGUARD_URL}/control/login", json={"name": ADGUARD_USER, "password": ADGUARD_PASS}, ).raise_for_status() _adguard_session_cache["client"] = client try: r = client.request(method, f"{ADGUARD_URL}/control{path}", **kwargs) if r.status_code == 403: # Session expired, re-login client.post( f"{ADGUARD_URL}/control/login", json={"name": ADGUARD_USER, "password": ADGUARD_PASS}, ).raise_for_status() r = client.request(method, f"{ADGUARD_URL}/control{path}", **kwargs) r.raise_for_status() if r.content and r.headers.get("content-type", "").startswith("application/json"): return r.json() return {} except httpx.HTTPError: # Reset session on error _adguard_session_cache["client"] = None raise _npm_token_cache: dict = {"token": None} def _npm_token() -> str: """Get NPM API token (cached).""" if _npm_token_cache["token"]: return _npm_token_cache["token"] with httpx.Client(timeout=10) as client: r = client.post( f"{NPM_URL}/api/tokens", json={"identity": NPM_USER, "secret": NPM_PASS}, ) r.raise_for_status() _npm_token_cache["token"] = r.json()["token"] return _npm_token_cache["token"] def _npm(method: str, path: str, **kwargs) -> dict | list: """Make an NPM API request with cached token.""" token = _npm_token() with httpx.Client(timeout=15) as client: r = client.request( method, f"{NPM_URL}/api{path}", headers={"Authorization": f"Bearer {token}"}, **kwargs, ) if r.status_code == 401: # Token expired, refresh _npm_token_cache["token"] = None token = _npm_token() r = client.request( method, f"{NPM_URL}/api{path}", headers={"Authorization": f"Bearer {token}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _authentik(method: str, path: str, **kwargs) -> dict | list: """Make an Authentik API request.""" with httpx.Client(timeout=15, verify=False) as client: r = client.request( method, f"{AUTHENTIK_URL}/api/v3{path}", headers={"Authorization": f"Bearer {AUTHENTIK_TOKEN}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _cloudflare(method: str, path: str, **kwargs) -> dict: """Make a Cloudflare API request.""" with httpx.Client(timeout=15) as client: r = client.request( method, f"https://api.cloudflare.com/client/v4{path}", headers={"Authorization": f"Bearer {CLOUDFLARE_TOKEN}"}, **kwargs, ) r.raise_for_status() return r.json() # --------------------------------------------------------------------------- # AdGuard tools # --------------------------------------------------------------------------- @mcp.tool() @_safe def adguard_list_rewrites() -> str: """ List all DNS rewrite rules in AdGuard Home (Calypso). Returns domain → answer mappings. Useful for checking split-horizon DNS overrides (e.g. which *.vish.gg services are pinned to specific IPs). """ try: data = _adguard("GET", "/rewrite/list") if not data: return "No rewrite rules found." lines = [f" {r['domain']:45s} → {r['answer']}" for r in data] # type: ignore return f"AdGuard DNS rewrites ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def adguard_add_rewrite(domain: str, answer: str) -> str: """ Add a DNS rewrite rule to AdGuard Home. Use this to add split-horizon DNS overrides so internal services resolve to LAN/Tailscale IPs instead of public ones. Args: domain: The domain to override (e.g. 'pt.vish.gg' or '*.example.com'). answer: The IP address to return (e.g. '192.168.0.154'). """ try: _adguard("POST", "/rewrite/add", json={"domain": domain, "answer": answer}) return f"Added rewrite: {domain} → {answer}" except Exception as e: return f"Error adding rewrite: {e}" @mcp.tool() @_safe def adguard_delete_rewrite(domain: str, answer: str) -> str: """ Delete a DNS rewrite rule from AdGuard Home. Args: domain: The domain of the rule to delete. answer: The answer IP of the rule to delete. """ try: _adguard("POST", "/rewrite/delete", json={"domain": domain, "answer": answer}) return f"Deleted rewrite: {domain} → {answer}" except Exception as e: return f"Error deleting rewrite: {e}" # --------------------------------------------------------------------------- # NPM (Nginx Proxy Manager) tools # --------------------------------------------------------------------------- @mcp.tool() @_safe def npm_list_proxy_hosts(filter_domain: Optional[str] = None) -> str: """ List all proxy hosts in Nginx Proxy Manager (matrix-ubuntu). Args: filter_domain: Optional substring to filter by domain name. """ try: token = _npm_token() hosts = _npm("GET", "/nginx/proxy-hosts", token=token) results = [] for h in hosts: # type: ignore domains = ", ".join(h.get("domain_names", [])) if filter_domain and filter_domain.lower() not in domains.lower(): continue fwd = f"{h.get('forward_scheme')}://{h.get('forward_host')}:{h.get('forward_port')}" cert = h.get("certificate_id", 0) enabled = "✓" if h.get("enabled") else "✗" results.append( f" [{enabled}] ID:{h['id']:3d} {domains:45s} → {fwd:35s} cert:{cert}" ) if not results: return "No proxy hosts found." return f"NPM proxy hosts ({len(results)}):\n" + "\n".join(results) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def npm_list_certs() -> str: """ List all SSL certificates in Nginx Proxy Manager with their domains and expiry. """ try: token = _npm_token() certs = _npm("GET", "/nginx/certificates", token=token) lines = [] for c in certs: # type: ignore domains = ", ".join(c.get("domain_names", [])) provider = c.get("provider", "?") expires = (c.get("expires_on") or "?")[:10] nice = c.get("nice_name", "") lines.append( f" ID:{c['id']:3d} [{provider:12s}] expires:{expires} {nice or domains}" ) return f"NPM certificates ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def npm_get_proxy_host(host_id: int) -> str: """ Get details of a specific NPM proxy host including advanced config and cert. Args: host_id: The proxy host ID (from npm_list_proxy_hosts). """ try: token = _npm_token() h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token) lines = [ f"ID: {h['id']}", # type: ignore f"Domains: {', '.join(h['domain_names'])}", # type: ignore f"Forward: {h['forward_scheme']}://{h['forward_host']}:{h['forward_port']}", # type: ignore f"SSL forced: {h.get('ssl_forced')}", # type: ignore f"Certificate ID: {h.get('certificate_id')}", # type: ignore f"Websocket: {h.get('allow_websocket_upgrade')}", # type: ignore f"Enabled: {h.get('enabled')}", # type: ignore ] adv = h.get("advanced_config", "").strip() # type: ignore if adv: lines.append(f"Advanced config:\n{adv}") meta = h.get("meta", {}) # type: ignore if isinstance(meta, dict) and meta.get("nginx_err"): lines.append(f"nginx_err: {meta['nginx_err']}") return "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def npm_update_cert(host_id: int, certificate_id: int) -> str: """ Update the SSL certificate used by an NPM proxy host. Args: host_id: The proxy host ID. certificate_id: The certificate ID to assign (from npm_list_certs). """ try: token = _npm_token() h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token) h_dict = h if isinstance(h, dict) else {} payload = { k: h_dict.get(k) for k in [ "domain_names", "forward_scheme", "forward_host", "forward_port", "access_list_id", "ssl_forced", "caching_enabled", "block_exploits", "advanced_config", "meta", "allow_websocket_upgrade", "http2_support", "hsts_enabled", "hsts_subdomains", "locations", ] } payload["certificate_id"] = certificate_id result = _npm("PUT", f"/nginx/proxy-hosts/{host_id}", token=token, json=payload) return f"Updated host {host_id} ({', '.join(result.get('domain_names', []))}) to cert {certificate_id}" # type: ignore except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Headscale tools # --------------------------------------------------------------------------- def _headscale_argv(*argv: str) -> list[str]: """Build an `ssh HEADSCALE_CALYPSO_SSH ...` argv, shell-quoting every value. Every dynamic piece (node IDs, names, tags, CIDRs) goes through shlex.join so that a value like `"; rm -rf /"` cannot escape the remote command. """ remote = "sudo /usr/local/bin/docker exec headscale headscale " + shlex.join(argv) return ["ssh", HEADSCALE_CALYPSO_SSH, remote] @mcp.tool() @_safe def headscale_list_nodes() -> str: """ List all nodes registered in the Headscale tailnet. Shows node ID, hostname, Tailscale IP, online status, and expiry. """ try: result = subprocess.run( _headscale_argv("nodes", "list", "--output", "json"), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" nodes = json.loads(result.stdout) lines = [] for n in nodes: ips = ", ".join(n.get("ip_addresses", [])) online = "🟢" if n.get("online") else "🔴" name = n.get("given_name") or n.get("name", "?") lines.append(f" {online} ID:{str(n['id']):3s} {name:25s} {ips}") return f"Headscale nodes ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_create_preauth_key( expiration: str = "24h", reusable: bool = False, ephemeral: bool = False, ) -> str: """ Create a Headscale pre-authentication key for registering a new node. Args: expiration: Key expiry duration e.g. '24h', '7d'. Default: '24h'. reusable: Allow multiple nodes to use this key. Default: False. ephemeral: Node is removed when it goes offline. Default: False. """ try: argv = ["preauthkeys", "create", "--user", "1", "--expiration", expiration] if reusable: argv.append("--reusable") if ephemeral: argv.append("--ephemeral") argv += ["--output", "json"] result = subprocess.run( _headscale_argv(*argv), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" data = json.loads(result.stdout) key = data.get("key", "?") exp = data.get("expiration", "?") return ( f"Pre-auth key created:\n" f" Key: {key}\n" f" Expires: {exp}\n" f" Reusable: {reusable}, Ephemeral: {ephemeral}\n\n" f"Use on new node:\n" f" tailscale up --login-server=https://headscale.vish.gg:8443 --authkey={key} --accept-routes=false" ) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_delete_node(node_id: str) -> str: """ Delete a node from the Headscale tailnet. Args: node_id: The numeric node ID (from headscale_list_nodes). """ try: result = subprocess.run( _headscale_argv("nodes", "delete", "--identifier", node_id, "--output", "json"), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" return f"Node {node_id} deleted successfully." except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_rename_node(node_id: str, new_name: str) -> str: """ Rename a node in the Headscale tailnet. Args: node_id: The numeric node ID. new_name: The new hostname/givenName. """ try: result = subprocess.run( _headscale_argv("nodes", "rename", "--identifier", node_id, new_name), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" return f"Node {node_id} renamed to '{new_name}'." except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_expire_node(node_id: str) -> str: """ Force a node to re-authenticate by expiring it. Args: node_id: The numeric node ID (from headscale_list_nodes). """ try: result = subprocess.run( _headscale_argv("nodes", "expire", "--identifier", node_id), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" return f"Node {node_id} expired — it must re-authenticate to rejoin the tailnet." except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_list_routes(node_id: str) -> str: """ List routes advertised by a node. Args: node_id: The numeric node ID (from headscale_list_nodes). """ try: result = subprocess.run( _headscale_argv("nodes", "list-routes", "--identifier", node_id, "--output", "json"), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" routes = json.loads(result.stdout) if not routes: return f"Node {node_id} has no advertised routes." lines = [] for r in routes: prefix = r.get("prefix", "?") enabled = "✅" if r.get("enabled") else "❌" is_primary = " (primary)" if r.get("is_primary") else "" lines.append(f" {enabled} {prefix}{is_primary}") return f"Routes for node {node_id}:\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_approve_routes(node_id: str, routes: str) -> str: """ Approve or revoke routes advertised by a node. Args: node_id: The numeric node ID (from headscale_list_nodes). routes: Comma-separated CIDRs to approve (e.g. '192.168.1.0/24,10.0.0.0/8'). Pass empty string to revoke all. """ try: result = subprocess.run( _headscale_argv("nodes", "approve-routes", "--identifier", node_id, "--routes", routes or ""), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" action = "revoked all routes" if not routes else f"approved routes: {routes}" return f"Node {node_id}: {action}." except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_list_preauth_keys() -> str: """ List all pre-authentication keys in the Headscale tailnet. """ try: result = subprocess.run( _headscale_argv("preauthkeys", "list", "--user", "1", "--output", "json"), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" keys = json.loads(result.stdout) if not keys: return "No pre-auth keys found." lines = [] for k in keys: key = k.get("key", "?")[:12] + "..." exp = k.get("expiration", "?") reusable = "reusable" if k.get("reusable") else "single-use" ephemeral = ", ephemeral" if k.get("ephemeral") else "" used = "used" if k.get("used") else "unused" lines.append(f" {key} {reusable}{ephemeral} {used} expires: {exp}") return f"Pre-auth keys ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_tag_node(node_id: str, tags: str) -> str: """ Set ACL tags on a node. Args: node_id: The numeric node ID (from headscale_list_nodes). tags: Comma-separated tags (e.g. 'tag:server,tag:lan'). Pass empty string to clear all tags. """ try: result = subprocess.run( _headscale_argv("nodes", "tag", "--identifier", node_id, "--tags", tags or ""), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" action = "cleared all tags" if not tags else f"set tags: {tags}" return f"Node {node_id}: {action}." except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_list_users() -> str: """ List all users (namespaces) in the Headscale tailnet. """ try: result = subprocess.run( _headscale_argv("users", "list", "--output", "json"), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" users = json.loads(result.stdout) if not users: return "No users found." lines = [] for u in users: uid = u.get("id", "?") name = u.get("name", "?") created = u.get("created_at", "?") lines.append(f" ID:{uid} {name} created: {created}") return f"Users ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def headscale_get_policy() -> str: """ Get the current Headscale ACL policy. """ try: result = subprocess.run( _headscale_argv("policy", "get"), capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"Error: {result.stderr}" return f"Headscale ACL policy:\n{result.stdout}" except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Authentik tools # --------------------------------------------------------------------------- @mcp.tool() @_safe def authentik_list_applications() -> str: """ List all applications configured in Authentik SSO. """ try: data = _authentik("GET", "/core/applications/?page_size=100") apps = data.get("results", []) # type: ignore lines = [] for a in apps: slug = a.get("slug", "?") name = a.get("name", "?") provider = a.get("provider") launch = a.get("meta_launch_url") or "" lines.append(f" {slug:30s} {name:35s} provider:{provider} {launch}") return f"Authentik applications ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_list_providers() -> str: """ List all OAuth2/OIDC/proxy providers in Authentik. """ try: data = _authentik("GET", "/providers/all/?page_size=100") providers = data.get("results", []) # type: ignore lines = [] for p in providers: pk = p.get("pk") name = p.get("name", "?") component = p.get("component", "?").replace("ak-provider-", "") lines.append(f" PK:{pk:4} [{component:20s}] {name}") return f"Authentik providers ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_list_users() -> str: """ List all users in Authentik. """ try: data = _authentik("GET", "/core/users/?page_size=100") users = data.get("results", []) # type: ignore lines = [] for u in users: pk = u.get("pk") username = u.get("username", "?") email = u.get("email", "?") active = "✓" if u.get("is_active") else "✗" lines.append(f" [{active}] PK:{pk:4} {username:20s} {email}") return f"Authentik users ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_update_app_launch_url(slug: str, launch_url: str) -> str: """ Update the launch URL of an Authentik application (the link shown on the dashboard). Args: slug: The application slug (from authentik_list_applications). launch_url: The new launch URL e.g. 'https://pt.vish.gg'. """ try: result = _authentik( "PATCH", f"/core/applications/{slug}/", json={"meta_launch_url": launch_url} ) return f"Updated '{slug}' launch URL to: {result.get('meta_launch_url')}" # type: ignore except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_set_provider_cookie_domain(provider_pk: int, cookie_domain: str) -> str: """ Set the cookie domain on an Authentik proxy provider. Required to prevent redirect loops with Forward Auth. Args: provider_pk: The provider PK (from authentik_list_providers). cookie_domain: Cookie domain e.g. 'vish.gg'. """ try: provider = _authentik("GET", f"/providers/proxy/{provider_pk}/") provider["cookie_domain"] = cookie_domain # type: ignore result = _authentik("PUT", f"/providers/proxy/{provider_pk}/", json=provider) return f"Provider {provider_pk} cookie_domain set to: {result.get('cookie_domain')}" # type: ignore except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_create_proxy_provider( name: str, external_host: str, mode: str = "forward_single", cookie_domain: str = "vish.gg", ) -> str: """ Create a proxy provider in Authentik and bind it to the embedded outpost. Args: name: Provider name, e.g. 'Grafana Forward Auth'. external_host: External URL, e.g. 'https://grafana.vish.gg'. mode: Proxy mode — 'forward_single' (default, for NPM forward auth) or 'forward_domain'. cookie_domain: Cookie domain to prevent redirect loops. Default 'vish.gg'. """ try: # Resolve authorization + invalidation flow slugs to PKs auth_flows = _authentik("GET", f"/flows/instances/?slug={AUTHENTIK_DEFAULT_AUTH_FLOW}") auth_results = auth_flows.get("results", []) # type: ignore if not auth_results: return f"Error: authorization flow '{AUTHENTIK_DEFAULT_AUTH_FLOW}' not found" auth_flow_pk = auth_results[0]["pk"] inval_flows = _authentik("GET", f"/flows/instances/?slug={AUTHENTIK_DEFAULT_INVALIDATION_FLOW}") inval_results = inval_flows.get("results", []) # type: ignore if not inval_results: return f"Error: invalidation flow '{AUTHENTIK_DEFAULT_INVALIDATION_FLOW}' not found" inval_flow_pk = inval_results[0]["pk"] provider = _authentik("POST", "/providers/proxy/", json={ "name": name, "authorization_flow": auth_flow_pk, "invalidation_flow": inval_flow_pk, "external_host": external_host, "mode": mode, "cookie_domain": cookie_domain, }) pk = provider.get("pk") # type: ignore # Bind to embedded outpost outpost = _authentik("GET", f"/outposts/instances/{AUTHENTIK_OUTPOST_PK}/") providers = outpost.get("providers", []) # type: ignore if pk not in providers: providers.append(pk) _authentik("PATCH", f"/outposts/instances/{AUTHENTIK_OUTPOST_PK}/", json={ "providers": providers, }) return ( f"Created proxy provider '{name}' (PK:{pk})\n" f" external_host: {external_host}\n" f" mode: {mode}\n" f" cookie_domain: {cookie_domain}\n" f" Bound to embedded outpost" ) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_create_application( name: str, slug: str, provider_pk: int, launch_url: str = "", ) -> str: """ Create an application in Authentik linked to an existing provider. Args: name: Display name, e.g. 'Grafana'. slug: URL-safe slug, e.g. 'grafana'. provider_pk: PK of the provider to attach (from authentik_create_proxy_provider or authentik_list_providers). launch_url: Optional launch URL shown on the Authentik dashboard. """ try: app_data: dict = { "name": name, "slug": slug, "provider": provider_pk, } if launch_url: app_data["meta_launch_url"] = launch_url result = _authentik("POST", "/core/applications/", json=app_data) return ( f"Created application '{name}'\n" f" slug: {result.get('slug')}\n" # type: ignore f" provider: PK:{provider_pk}\n" f" launch_url: {result.get('meta_launch_url', '(none)')}" # type: ignore ) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_list_sessions() -> str: """ List active authenticated sessions in Authentik. Useful for debugging login issues. """ try: data = _authentik("GET", "/core/authenticated_sessions/?page_size=50") sessions = data.get("results", []) # type: ignore if not sessions: return "No active sessions." lines = [] for s in sessions: uuid = s.get("uuid", "?") user = s.get("user", {}) username = user.get("username", "?") if isinstance(user, dict) else f"uid:{user}" last_ip = s.get("last_ip", "?") last_used = (s.get("last_used") or "?")[:19] ua = s.get("user_agent", {}) browser = ua.get("user_agent", {}).get("family", "?") if isinstance(ua, dict) else "?" lines.append(f" {uuid[:8]} {username:15s} {last_ip:16s} {browser:10s} {last_used}") return f"Active sessions ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_delete_session(session_uuid: str) -> str: """ Delete (invalidate) an authenticated session in Authentik. Use authentik_list_sessions to find the UUID. Args: session_uuid: The session UUID to delete. """ try: _authentik("DELETE", f"/core/authenticated_sessions/{session_uuid}/") return f"Deleted session {session_uuid}" except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def authentik_get_events(action: str = "", limit: int = 20) -> str: """ Get recent Authentik audit log events. Useful for debugging auth failures, login issues, and policy denials. Args: action: Optional filter by action type, e.g. 'login', 'login_failed', 'authorize_application', 'model_created', 'model_updated', 'secret_rotate', 'policy_exception'. Leave empty for all. limit: Number of events to return (max 50, default 20). """ try: limit = min(limit, 50) params = f"page_size={limit}&ordering=-created" if action: params += f"&action={action}" data = _authentik("GET", f"/events/events/?{params}") events = data.get("results", []) # type: ignore if not events: return "No events found." lines = [] for e in events: ts = (e.get("created") or "?")[:19] act = e.get("action", "?") user = e.get("user", {}) username = user.get("username", "system") if isinstance(user, dict) else "system" app = e.get("app", "").replace("authentik.", "") ctx = e.get("context", {}) msg = ctx.get("message", "") if isinstance(ctx, dict) else "" detail = msg[:60] if msg else app lines.append(f" {ts} {act:30s} {username:15s} {detail}") total = data.get("pagination", {}).get("count", "?") # type: ignore return f"Events (showing {len(lines)} of {total}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Cloudflare tools # --------------------------------------------------------------------------- @mcp.tool() @_safe def cloudflare_list_dns_records(filter_name: Optional[str] = None) -> str: """ List DNS records for the vish.gg zone in Cloudflare. Args: filter_name: Optional substring to filter by record name. """ try: data = _cloudflare( "GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records?per_page=200" ) records = data.get("result", []) lines = [] for r in records: name = r.get("name", "?") if filter_name and filter_name.lower() not in name.lower(): continue rtype = r.get("type", "?") content = r.get("content", "?") proxied = "☁" if r.get("proxied") else "⚡" lines.append(f" {proxied} {rtype:6s} {name:45s} → {content}") return f"Cloudflare DNS records ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def cloudflare_create_dns_record( name: str, content: str, record_type: str = "A", proxied: bool = True, ttl: int = 1, ) -> str: """ Create a new DNS record in the vish.gg Cloudflare zone. Args: name: Record name e.g. 'pt' (becomes pt.vish.gg) or 'pt.vish.gg'. content: Record value e.g. '184.23.52.14' or 'calypso.vish.gg'. record_type: DNS type: 'A', 'CNAME', 'TXT', etc. Default: 'A'. proxied: Route through Cloudflare proxy. Default: True. ttl: TTL in seconds (1 = auto). Default: 1. """ try: data = _cloudflare( "POST", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records", json={ "type": record_type, "name": name, "content": content, "proxied": proxied, "ttl": ttl, }, ) r = data.get("result", {}) return f"Created: {r.get('type')} {r.get('name')} → {r.get('content')} proxied:{r.get('proxied')}" except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def cloudflare_delete_dns_record(record_id: str) -> str: """ Delete a DNS record from the vish.gg Cloudflare zone. Args: record_id: The record ID (from cloudflare_list_dns_records — use the ID shown). """ try: _cloudflare("DELETE", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}") return f"Deleted DNS record {record_id}." except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def cloudflare_update_dns_record( record_id: str, content: str, proxied: Optional[bool] = None, ) -> str: """ Update an existing DNS record in Cloudflare. Args: record_id: The record ID to update. content: New record value (IP or hostname). proxied: Update proxied status. None = keep existing. """ try: existing_data = _cloudflare( "GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}" ) existing = existing_data.get("result", {}) payload = { "type": existing.get("type"), "name": existing.get("name"), "content": content, "proxied": proxied if proxied is not None else existing.get("proxied"), "ttl": existing.get("ttl", 1), } data = _cloudflare( "PUT", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}", json=payload ) r = data.get("result", {}) return ( f"Updated: {r.get('name')} → {r.get('content')} proxied:{r.get('proxied')}" ) except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Uptime Kuma tools # --------------------------------------------------------------------------- def _kuma_sqlite(query: str) -> str: """Run a SQLite query against the Kuma DB via SSH. The query is sent over stdin (docker exec -i | sqlite3) rather than interpolated into the shell command, so backticks, $, and quotes in query values cannot shell-inject. """ remote = f"docker exec -i {shlex.quote(KUMA_CONTAINER)} sqlite3 /app/data/kuma.db" result = subprocess.run( ["ssh", KUMA_HOST, remote], input=query, capture_output=True, text=True, timeout=20, ) if result.returncode != 0: raise RuntimeError(result.stderr) return result.stdout.strip() @mcp.tool() @_safe def kuma_list_monitors(filter_name: Optional[str] = None) -> str: """ List all monitors in Uptime Kuma with their status and type. Args: filter_name: Optional substring to filter by monitor name. """ try: rows = _kuma_sqlite( "SELECT id, name, type, active, url, hostname, port, parent FROM monitor ORDER BY parent, name;" ) if not rows: return "No monitors found." lines = [] for row in rows.splitlines(): parts = row.split("|") if len(parts) < 8: continue mid, name, mtype, active, url, hostname, port, parent = parts[:8] if filter_name and filter_name.lower() not in name.lower(): continue status = "✓" if active == "1" else "✗" target = url or (f"{hostname}:{port}" if hostname else "") indent = " └─ " if parent and parent != "" else "" lines.append( f" [{status}] ID:{mid:4s} {indent}{name:30s} [{mtype:8s}] {target}" ) return f"Kuma monitors ({len(lines)}):\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def kuma_list_groups() -> str: """ List all monitor groups in Uptime Kuma (for use as parent IDs). """ try: rows = _kuma_sqlite( "SELECT id, name, parent FROM monitor WHERE type='group' ORDER BY name;" ) if not rows: return "No groups found." lines = [] for row in rows.splitlines(): parts = row.split("|") mid, name = parts[0], parts[1] parent = parts[2] if len(parts) > 2 else "" lines.append(f" ID:{mid:4s} {name:30s} parent:{parent or 'none'}") return f"Kuma groups:\n" + "\n".join(lines) except Exception as e: return f"Error: {e}" @mcp.tool() @_safe def kuma_add_monitor( name: str, monitor_type: str, url: Optional[str] = None, hostname: Optional[str] = None, port: Optional[int] = None, parent_id: Optional[int] = None, interval: int = 60, ignore_tls: bool = False, ) -> str: """ Add a new monitor to Uptime Kuma. Requires a Kuma restart to take effect (call kuma_restart after adding monitors). Args: name: Monitor display name. monitor_type: 'http', 'port', 'ping', 'group'. url: URL for http monitors e.g. 'https://pt.vish.gg/'. hostname: Hostname/IP for port/ping monitors. port: Port number for port monitors. parent_id: Parent group monitor ID (from kuma_list_groups). interval: Check interval in seconds. Default: 60. ignore_tls: Ignore TLS cert errors. Default: False. """ url_val = (url or "https://").replace("'", "''") host_val = (hostname or "").replace("'", "''") port_val = int(port or 0) parent_val = int(parent_id) if parent_id is not None else "NULL" ignore_tls_val = 1 if ignore_tls else 0 safe_name = name.replace("'", "''") safe_type = monitor_type.replace("'", "''") query = ( f"INSERT INTO monitor " f"(name, active, user_id, interval, url, type, weight, hostname, port, " f"maxretries, ignore_tls, upside_down, maxredirects, accepted_statuscodes_json, parent) " f"VALUES ('{safe_name}', 1, 1, {int(interval)}, '{url_val}', '{safe_type}', 2000, " f"'{host_val}', {port_val}, 3, {ignore_tls_val}, 0, 10, '[\"200-299\"]', {parent_val});" f"SELECT last_insert_rowid();" ) result = _kuma_sqlite(query) monitor_id = result.strip().split("\n")[-1] return ( f"Monitor '{name}' added (ID: {monitor_id}).\n" f"Call kuma_restart to activate." ) @mcp.tool() @_safe def kuma_set_parent(monitor_id: int, parent_id: Optional[int] = None) -> str: """ Set or clear the parent group of a Kuma monitor. Args: monitor_id: Monitor ID to update. parent_id: Parent group ID, or None to remove from group. """ parent_val = int(parent_id) if parent_id is not None else "NULL" _kuma_sqlite(f"UPDATE monitor SET parent={parent_val} WHERE id={int(monitor_id)};") return f"Monitor {monitor_id} parent set to {parent_val}. Call kuma_restart to apply." @mcp.tool() @_safe def kuma_restart() -> str: """ Restart the Uptime Kuma container to apply DB changes. Required after any kuma_add_monitor or kuma_set_parent operations. """ try: result = subprocess.run( ["ssh", KUMA_HOST, f"docker restart {KUMA_CONTAINER}"], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return f"Error restarting Kuma: {result.stderr}" return "Uptime Kuma restarted successfully." except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Ollama LLM # --------------------------------------------------------------------------- @mcp.tool() @_safe def ollama_query(prompt: str, model: str = "qwen3-coder:latest", max_tokens: int = 2000, temperature: float = 0.3) -> str: """Query the local Ollama LLM for homelab-specific analysis. Useful for: analyzing logs, explaining configs, generating commands, summarizing infrastructure state, or any homelab question. Args: prompt: The question or analysis prompt. model: Ollama model name (default: qwen3-coder:latest). max_tokens: Maximum response tokens. temperature: Sampling temperature (0.0-1.0, lower = more focused). """ import re as _re with httpx.Client(timeout=120) as c: resp = c.post( f"{OLLAMA_URL}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": {"temperature": temperature, "num_predict": max_tokens}, }, ) resp.raise_for_status() raw = resp.json().get("response", "").strip() return _re.sub(r".*?", "", raw, flags=_re.DOTALL).strip() # --------------------------------------------------------------------------- # Jellyfin tools # --------------------------------------------------------------------------- def _jellyfin(path: str) -> dict | list: """Make a Jellyfin API request via kubectl exec (bypasses envoy auth sidecar).""" sep = "&" if "?" in path else "?" url = f"http://localhost:8096{path}{sep}api_key={JELLYFIN_TOKEN}" result = subprocess.run( ["ssh", "olares", f"kubectl exec -n jellyfin-vishinator deploy/jellyfin -c jellyfin -- " f"curl -s '{url}'"], capture_output=True, text=True, timeout=20, ) if result.returncode != 0: raise RuntimeError(f"Jellyfin API error: {result.stderr.strip()}") import json as _json return _json.loads(result.stdout) @mcp.tool() @_safe def jellyfin_libraries() -> str: """List all Jellyfin media libraries with item counts.""" data = _jellyfin("/Library/VirtualFolders") if not data: return "No libraries found." lines = [] for lib in data: name = lib.get("Name", "?") ltype = lib.get("CollectionType", "unknown") paths = ", ".join(lib.get("Locations", [])) lines.append(f" {name} ({ltype}): {paths}") return f"Jellyfin Libraries ({len(data)}):\n" + "\n".join(lines) @mcp.tool() @_safe def jellyfin_sessions() -> str: """List active Jellyfin playback sessions.""" data = _jellyfin("/Sessions") if not data: return "No active sessions." lines = [] for s in data: user = s.get("UserName", "?") client_name = s.get("Client", "?") device = s.get("DeviceName", "?") now_playing = s.get("NowPlayingItem") if now_playing: title = now_playing.get("Name", "?") media_type = now_playing.get("Type", "?") lines.append(f" {user} on {device} ({client_name}): {title} ({media_type})") else: lines.append(f" {user} on {device} ({client_name}): idle") return f"Jellyfin Sessions ({len(lines)}):\n" + "\n".join(lines) @mcp.tool() @_safe def jellyfin_system_info() -> str: """Get Jellyfin server system info (version, OS, transcoding).""" data = _jellyfin("/System/Info/Public") return ( f"Jellyfin {data.get('Version', '?')}\n" f" OS: {data.get('OperatingSystem', '?')}\n" f" Architecture: {data.get('SystemArchitecture', '?')}\n" f" Server: {data.get('ServerName', '?')}\n" f" Local Address: {data.get('LocalAddress', '?')}" ) # --------------------------------------------------------------------------- # Olares / Kubernetes tools # --------------------------------------------------------------------------- def _kubectl(*argv: str, timeout: int = 15) -> str: """Run kubectl against the Olares K3s cluster. Every argv entry is shell-quoted before being sent over SSH, so namespace/pod/deployment names coming from the LLM cannot shell-inject. """ remote = "kubectl " + shlex.join(argv) result = subprocess.run( ["ssh", "olares", remote], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: raise RuntimeError(f"kubectl error: {result.stderr.strip()}") return result.stdout.strip() @mcp.tool() @_safe def olares_pods(namespace: Optional[str] = None) -> str: """List pods on the Olares K3s cluster. Args: namespace: Filter by namespace. If None, lists all namespaces. """ ns_args = ["-n", namespace] if namespace else ["-A"] return _kubectl("get", "pods", *ns_args, "-o", "wide") @mcp.tool() @_safe def olares_gpu() -> str: """Show NVIDIA GPU status on Olares (nvidia-smi).""" result = subprocess.run( ["ssh", "olares", "nvidia-smi"], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: return f"nvidia-smi failed: {result.stderr.strip()}" return result.stdout.strip() @mcp.tool() @_safe def olares_pod_logs(namespace: str, pod: str, container: Optional[str] = None, tail: int = 50) -> str: """Get pod logs from Olares K3s. Args: namespace: Kubernetes namespace. pod: Pod name or deployment name (use deploy/NAME). container: Container name (if pod has multiple). tail: Number of lines from the end. """ argv = ["logs", "-n", namespace, pod] if container: argv += ["-c", container] argv.append(f"--tail={int(tail)}") return _kubectl(*argv, timeout=20) @mcp.tool() @_safe def olares_restart(namespace: str, deployment: str) -> str: """Restart a deployment on Olares K3s. Args: namespace: Kubernetes namespace. deployment: Deployment name. """ return _kubectl("rollout", "restart", f"deployment/{deployment}", "-n", namespace) # --------------------------------------------------------------------------- # Homelab services list # --------------------------------------------------------------------------- @mcp.tool() @_safe def list_homelab_services() -> str: """List all known homelab services with their URLs and hosts. Returns a quick reference of service endpoints across the homelab. """ services = [ ("Portainer", PORTAINER_URL, "atlantis (Tailscale)"), ("Gitea", GITEA_URL, "matrix-ubuntu via NPM"), ("Prometheus", PROMETHEUS_URL, "homelab-vm"), ("Grafana", GRAFANA_URL, "homelab-vm"), ("Sonarr", SONARR_URL, "atlantis"), ("Radarr", RADARR_URL, "atlantis"), ("SABnzbd", SABNZBD_URL, "atlantis"), ("AdGuard", ADGUARD_URL, "calypso"), ("NPM", NPM_URL, "matrix-ubuntu"), ("Authentik", AUTHENTIK_URL, "calypso"), ("Jellyfin", JELLYFIN_URL, "olares (NodePort 30096)"), ("Ollama", OLLAMA_URL, "olares (NodePort 31434)"), ("Kuma", f"ssh://{KUMA_HOST}", "pi-5 (Portainer: rpi5)"), ] lines = [f" {name:12} {url:45} ({host})" for name, url, host in services] return f"Homelab Services ({len(services)}):\n" + "\n".join(lines) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) mcp.run()