#!/usr/bin/env python3 """ Homelab MCP Server Provides MCP tools for managing homelab infrastructure: - Portainer: stack/container management across all endpoints - Gitea: repo, issue, and branch management - Prometheus: PromQL queries and target inspection - Grafana: dashboard and alert listing - Sonarr/Radarr: media library and download queue - SABnzbd: download queue management - SSH: remote command execution on homelab hosts - Filesystem: read/write files on the local machine """ import json import os import subprocess from pathlib import Path from typing import Optional import httpx from fastmcp import FastMCP # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- PORTAINER_URL = "https://192.168.0.200:9443" PORTAINER_TOKEN = "REDACTED_TOKEN" GITEA_TOKEN = "REDACTED_TOKEN" NTFY_BASE = "https://ntfy.vish.gg" REPO_PATH = Path("/home/homelab/organized/repos/homelab") ENDPOINTS: dict[str, int] = { "atlantis": 2, "calypso": 443397, "nuc": 443398, "homelab": 443399, "rpi5": 443395, } # Gitea GITEA_URL = "http://192.168.0.250:3052" GITEA_ORG = "vish" # Monitoring PROMETHEUS_URL = "http://192.168.0.210:9090" GRAFANA_URL = "http://192.168.0.210:3300" GRAFANA_USER = "admin" GRAFANA_PASS = "REDACTED_PASSWORD" # Media SONARR_URL = "http://192.168.0.200:8989" SONARR_API_KEY = "REDACTED_API_KEY" RADARR_URL = "http://192.168.0.200:7878" RADARR_API_KEY = "REDACTED_API_KEY" SABNZBD_URL = "http://192.168.0.200:8080" SABNZBD_API_KEY = "REDACTED_API_KEY" # SSH — hostnames must resolve via /etc/hosts or ~/.ssh/config SSH_KNOWN_HOSTS = [ "atlantis", "calypso", "setillo", "setillo-root", "nuc", "homelab-vm", "rpi5", "pi-5", "matrix-ubuntu", ] # Filesystem — restrict read/write to safe root paths FS_ALLOWED_ROOTS = [ Path("/home/homelab"), Path("/tmp"), ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _portainer(method: str, path: str, **kwargs) -> dict | list: """Make a Portainer API request. Raises on HTTP error.""" with httpx.Client(verify=False, timeout=30) as client: r = client.request( method, f"{PORTAINER_URL}/api{path}", headers={"X-API-Key": PORTAINER_TOKEN}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _resolve_endpoint(endpoint: str) -> int: """Resolve endpoint name or numeric string to an endpoint ID.""" if endpoint.isdigit(): return int(endpoint) ep = endpoint.lower() if ep not in ENDPOINTS: raise ValueError( f"Unknown endpoint '{endpoint}'. Known: {', '.join(ENDPOINTS)}" ) return ENDPOINTS[ep] def _gitea(method: str, path: str, **kwargs) -> dict | list: """Make a Gitea API request.""" with httpx.Client(timeout=20) as client: r = client.request( method, f"{GITEA_URL}/api/v1{path}", headers={"Authorization": f"token {GITEA_TOKEN}"}, **kwargs, ) r.raise_for_status() if r.content: return r.json() return {} def _arr( base_url: str, api_key: str, path: str, params: dict | None = None ) -> dict | list: """Make a Sonarr/Radarr API request.""" with httpx.Client(timeout=20) as client: r = client.get( f"{base_url}/api/v3{path}", headers={"X-Api-Key": api_key}, params=params or {}, ) r.raise_for_status() return r.json() def _sabnzbd(mode: str, extra: dict | None = None) -> dict: """Make a SABnzbd API request.""" params = {"apikey": SABNZBD_API_KEY, "output": "json", "mode": mode} if extra: params.update(extra) with httpx.Client(timeout=20) as client: r = client.get(f"{SABNZBD_URL}/api", params=params) r.raise_for_status() return r.json() def _fs_safe(path: str) -> Path: """Resolve a path and verify it's under an allowed root.""" p = Path(path).expanduser().resolve() for root in FS_ALLOWED_ROOTS: try: p.relative_to(root) return p except ValueError: continue allowed = ", ".join(str(r) for r in FS_ALLOWED_ROOTS) raise PermissionError(f"Path '{path}' is outside allowed roots: {allowed}") # --------------------------------------------------------------------------- # MCP Server # --------------------------------------------------------------------------- mcp = FastMCP( "Homelab", instructions=( "Tools for managing a homelab running Docker services across multiple hosts.\n\n" "PORTAINER — Docker orchestration across 5 endpoints:\n" " Endpoints: atlantis (main NAS, media stack), calypso (secondary NAS), " "nuc (mini PC), homelab (VM at 192.168.0.210), rpi5 (Raspberry Pi).\n" " Tools: list_endpoints, list_stacks, get_stack, redeploy_stack, " "list_containers, get_container_logs, restart_container, start_container, " "stop_container, list_stack_containers, check_portainer.\n\n" "GITEA — Self-hosted git at 192.168.0.250:3052, org=vish. Repo names can be " "'vish/homelab' or just 'homelab'.\n" " Tools: gitea_list_repos, gitea_list_issues, gitea_create_issue, gitea_list_branches.\n\n" "PROMETHEUS — PromQL queries against homelab metrics (192.168.0.210:9090).\n" " Tools: prometheus_query, prometheus_targets.\n\n" "GRAFANA — Dashboard and alert inspection (192.168.0.210:3300).\n" " Tools: grafana_list_dashboards, grafana_list_alerts.\n\n" "SONARR/RADARR — Media library and download queue on Atlantis (ports 8989/7878).\n" " Tools: sonarr_list_series, sonarr_queue, radarr_list_movies, radarr_queue.\n\n" "SABNZBD — Usenet download queue on Atlantis (port 8080).\n" " Tools: sabnzbd_queue, sabnzbd_pause, sabnzbd_resume.\n\n" "SSH — Run commands on homelab hosts. Allowed: atlantis, calypso, setillo, " "setillo-root, nuc, homelab-vm, rpi5. Requires key auth in ~/.ssh/config.\n" " Tool: ssh_exec(host, command).\n\n" "FILESYSTEM — Read/write files on the local machine. " "Allowed roots: /home/homelab, /tmp.\n" " Tools: fs_read, fs_write, fs_list.\n\n" "REPO — Inspect compose files in the homelab git repo at " "/home/homelab/organized/repos/homelab.\n" " Tools: list_homelab_services, get_compose_file.\n\n" "UTILITIES — check_url (HTTP health check), send_notification (ntfy push)." ), ) # --------------------------------------------------------------------------- # Portainer — Endpoints # --------------------------------------------------------------------------- @mcp.tool() def list_endpoints() -> str: """List all Portainer environments (servers) with their connection status.""" data = _portainer("GET", "/endpoints") rows = [] for ep in data: rows.append( f" {ep['Name']} (id={ep['Id']}) — " f"status={'online' if ep.get('Status') == 1 else 'offline'} — " f"containers={ep.get('Snapshots', [{}])[0].get('RunningContainerCount', '?')} running" ) return "Endpoints:\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Portainer — Stacks # --------------------------------------------------------------------------- @mcp.tool() def list_stacks(endpoint: Optional[str] = None) -> str: """ List all Portainer stacks with their status. Args: endpoint: Optional filter by endpoint name (atlantis, calypso, nuc, homelab, rpi5) or numeric ID. """ data = _portainer("GET", "/stacks") if endpoint: ep_id = _resolve_endpoint(endpoint) data = [s for s in data if s.get("EndpointId") == ep_id] rows = [] for s in sorted(data, key=lambda x: x["Name"]): status = "active" if s.get("Status") == 1 else "inactive" ep_name = next( (k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")), str(s.get("EndpointId")), ) git = s.get("GitConfig", {}) git_info = f" [git: {git.get('ConfigFilePath', '')}]" if git else "" rows.append( f" [{s['Id']}] {s['Name']} — {status} — endpoint={ep_name}{git_info}" ) return f"Stacks ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def get_stack(stack_name_or_id: str) -> str: """ Get detailed info about a specific stack. Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] git = s.get("GitConfig") or {} env = s.get("Env") or [] ep_name = next( (k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")), str(s.get("EndpointId")), ) lines = [ f"Stack: {s['Name']} (id={s['Id']})", f" Status: {'active' if s.get('Status') == 1 else 'inactive'}", f" Endpoint: {ep_name} (id={s.get('EndpointId')})", f" Created: {s.get('CreationDate', 'unknown')}", f" Updated: {s.get('UpdateDate', 'unknown')}", ] if git: lines += [ f" Git URL: {git.get('URL', '')}", f" Git Branch: {git.get('ReferenceName', '').replace('refs/heads/', '')}", f" Git File: {git.get('ConfigFilePath', '')}", ] if env: lines.append(f" Env vars: {len(env)} set") return "\n".join(lines) @mcp.tool() def redeploy_stack(stack_name_or_id: str) -> str: """ Trigger a GitOps redeploy of a stack (pull latest from Git and redeploy). Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] if not s.get("GitConfig"): return f"Stack '{s['Name']}' is not a GitOps stack — cannot redeploy via git." ep_id = s["EndpointId"] stack_id = s["Id"] _portainer( "PUT", f"/stacks/{stack_id}/git/redeploy?endpointId={ep_id}", json={ "pullImage": True, "prune": False, "repositoryAuthentication": True, "repositoryUsername": "vish", "repositoryPassword": GITEA_TOKEN, }, ) return f"Redeploy triggered for stack '{s['Name']}' (id={stack_id}) on endpoint {ep_id}." # --------------------------------------------------------------------------- # Portainer — Containers # --------------------------------------------------------------------------- @mcp.tool() def list_containers( endpoint: str = "atlantis", all_containers: bool = False, filter_name: Optional[str] = None, ) -> str: """ List containers on a Portainer endpoint. Args: endpoint: Endpoint name (atlantis, calypso, nuc, homelab, rpi5) or ID. all_containers: If True, include stopped containers. Default False (running only). filter_name: Optional substring to filter container names. """ ep_id = _resolve_endpoint(endpoint) params = "?all=true" if all_containers else "" data = _portainer("GET", f"/endpoints/{ep_id}/docker/containers/json{params}") if filter_name: term = filter_name.lower() data = [c for c in data if any(term in n.lower() for n in c.get("Names", []))] rows = [] for c in sorted(data, key=lambda x: x.get("Names", [""])[0]): name = c.get("Names", ["?"])[0].lstrip("/") image = c.get("Image", "?").split(":")[0].split("/")[-1] state = c.get("State", "?") short_id = c.get("Id", "")[:12] rows.append(f" {short_id} {state:10s} {name:40s} {image}") header = f"Containers on {endpoint} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image" return header + "\n" + "\n".join(rows) @mcp.tool() def get_container_logs( container_id_or_name: str, endpoint: str = "atlantis", tail: int = 100, ) -> str: """ Get recent logs from a container. Args: container_id_or_name: Container ID (short or full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. tail: Number of log lines to return. Default: 100. """ ep_id = _resolve_endpoint(endpoint) # Resolve by name if not a hex ID if not all(c in "0123456789abcdefABCDEF" for c in container_id_or_name): containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) term = container_id_or_name.lower() matches = [ c for c in containers if any(term in n.lower() for n in c.get("Names", [])) ] if not matches: return ( f"No container found matching '{container_id_or_name}' on {endpoint}." ) if len(matches) > 1: names = ", ".join(c["Names"][0].lstrip("/") for c in matches) return f"Multiple matches: {names}. Be more specific." container_id_or_name = matches[0]["Id"][:12] with httpx.Client(verify=False, timeout=30) as client: r = client.get( f"{PORTAINER_URL}/api/endpoints/{ep_id}/docker/containers/{container_id_or_name}/logs", headers={"X-API-Key": PORTAINER_TOKEN}, params={"stdout": 1, "stderr": 1, "tail": tail, "timestamps": 0}, ) r.raise_for_status() # Docker log stream has 8-byte header per line; strip it raw = r.content lines = [] i = 0 while i < len(raw): if i + 8 > len(raw): break size = int.from_bytes(raw[i + 4 : i + 8], "big") line = raw[i + 8 : i + 8 + size].decode("utf-8", errors="replace").rstrip() if line: lines.append(line) i += 8 + size if not lines: # fallback: treat as plain text lines = r.text.splitlines() return "\n".join(lines[-tail:]) @mcp.tool() def restart_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Restart a container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/restart") return f"Restarted container {cid} on {endpoint}." @mcp.tool() def stop_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Stop a running container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/stop") return f"Stopped container {cid} on {endpoint}." @mcp.tool() def start_container( container_id_or_name: str, endpoint: str = "atlantis", ) -> str: """ Start a stopped container. Args: container_id_or_name: Container ID (short/full) or name substring. endpoint: Endpoint name or ID. Default: atlantis. """ ep_id = _resolve_endpoint(endpoint) cid = _resolve_container_id(container_id_or_name, ep_id) _portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/start") return f"Started container {cid} on {endpoint}." def _resolve_container_id(name_or_id: str, ep_id: int) -> str: """Resolve a container name substring to a short container ID.""" if len(name_or_id) >= 12 and all(c in "0123456789abcdefABCDEF" for c in name_or_id): return name_or_id[:12] containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) term = name_or_id.lower() matches = [ c for c in containers if any(term in n.lower() for n in c.get("Names", [])) ] if not matches: raise ValueError(f"No container found matching '{name_or_id}'.") if len(matches) > 1: names = ", ".join(c["Names"][0].lstrip("/") for c in matches) raise ValueError( f"Multiple containers match '{name_or_id}': {names}. Be more specific." ) return matches[0]["Id"][:12] # --------------------------------------------------------------------------- # Portainer — Stack containers (convenience) # --------------------------------------------------------------------------- @mcp.tool() def list_stack_containers(stack_name_or_id: str) -> str: """ List all containers belonging to a specific stack. Args: stack_name_or_id: Stack name (partial match) or numeric ID. """ all_stacks = _portainer("GET", "/stacks") if stack_name_or_id.isdigit(): matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)] else: term = stack_name_or_id.lower() matches = [s for s in all_stacks if term in s["Name"].lower()] if not matches: return f"No stack found matching '{stack_name_or_id}'." if len(matches) > 1: names = ", ".join(s["Name"] for s in matches) return f"Multiple matches: {names}. Be more specific." s = matches[0] ep_id = s["EndpointId"] stack_name = s["Name"] containers = _portainer( "GET", f"/endpoints/{ep_id}/docker/containers/json?all=true" ) # Filter by compose project label (Portainer uses com.docker.compose.project) stack_containers = [ c for c in containers if c.get("Labels", {}).get("com.docker.compose.project", "").lower() == stack_name.lower() or any(stack_name.lower() in n.lower() for n in c.get("Names", [])) ] ep_name = next((k for k, v in ENDPOINTS.items() if v == ep_id), str(ep_id)) rows = [] for c in sorted(stack_containers, key=lambda x: x.get("Names", [""])[0]): name = c.get("Names", ["?"])[0].lstrip("/") state = c.get("State", "?") short_id = c.get("Id", "")[:12] image = c.get("Image", "?").split(":")[0].split("/")[-1] rows.append(f" {short_id} {state:10s} {name:40s} {image}") header = f"Containers in stack '{stack_name}' on {ep_name} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image" return header + "\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Health checks # --------------------------------------------------------------------------- @mcp.tool() def check_url(url: str, expected_status: int = 200) -> str: """ Perform an HTTP health check on a URL. Args: url: The URL to check (e.g. http://192.168.0.200:9443/api/status). expected_status: Expected HTTP status code. Default: 200. """ try: with httpx.Client(verify=False, timeout=10, follow_redirects=True) as client: r = client.get(url) ok = r.status_code == expected_status return ( f"{'OK' if ok else 'FAIL'} {url}\n" f" Status: {r.status_code} (expected {expected_status})\n" f" Latency: {r.elapsed.total_seconds() * 1000:.0f}ms" ) except Exception as e: return f"ERROR {url}\n {type(e).__name__}: {e}" @mcp.tool() def check_portainer() -> str: """Quick health check of the Portainer API and summary of infrastructure.""" try: status = _portainer("GET", "/status") stacks = _portainer("GET", "/stacks") active = sum(1 for s in stacks if s.get("Status") == 1) return ( f"Portainer OK — version {status.get('Version', '?')}\n" f" Stacks: {len(stacks)} total, {active} active" ) except Exception as e: return f"Portainer UNREACHABLE: {e}" # --------------------------------------------------------------------------- # Repo — service inspection # --------------------------------------------------------------------------- @mcp.tool() def list_homelab_services(host_filter: Optional[str] = None) -> str: """ List all services/stacks defined in the homelab repository. Args: host_filter: Optional substring to filter by host/path (e.g. 'atlantis', 'calypso', 'seattle'). """ compose_files = list(REPO_PATH.rglob("docker-compose.yml")) + list( REPO_PATH.rglob("docker-compose.yaml") ) # Exclude archive compose_files = [f for f in compose_files if "archive" not in str(f).parts] rows = [] for f in sorted(compose_files): rel = f.relative_to(REPO_PATH) if host_filter and host_filter.lower() not in str(rel).lower(): continue rows.append(f" {rel}") return f"Compose files ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def get_compose_file(service_path: str) -> str: """ Read a compose file from the homelab repo. Args: service_path: Relative path within the repo, e.g. 'hosts/synology/atlantis/arr-suite/docker-compose.yml' or a partial name like 'atlantis/arr-suite'. """ # Try exact relative path first candidate = REPO_PATH / service_path if candidate.is_file(): return candidate.read_text() # Try fuzzy: find compose files whose path contains the fragment term = service_path.lower().replace("\\", "/") hits = [ f for f in REPO_PATH.rglob("docker-compose.y*ml") if term in str(f.relative_to(REPO_PATH)).lower() and "archive" not in str(f).lower() ] if not hits: return f"No compose file found matching '{service_path}'." if len(hits) > 1: paths = "\n".join(f" {f.relative_to(REPO_PATH)}" for f in hits) return f"Multiple matches:\n{paths}\nBe more specific." return hits[0].read_text() # --------------------------------------------------------------------------- # Notifications # --------------------------------------------------------------------------- @mcp.tool() def send_notification( message: str, title: str = "Homelab", topic: str = "homelab-alerts", priority: str = "default", tags: Optional[str] = None, ) -> str: """ Send a push notification via ntfy. Args: message: The notification body. title: Notification title. Default: 'Homelab'. topic: ntfy topic to publish to. Default: 'homelab-alerts'. priority: urgent, high, default, low, or min. Default: 'default'. tags: Comma-separated emoji tags e.g. 'warning,robot'. Optional. """ headers = { "Title": title, "Priority": priority, } if tags: headers["Tags"] = tags with httpx.Client(timeout=10) as client: r = client.post( f"{NTFY_BASE}/{topic}", content=message.encode(), headers=headers, ) r.raise_for_status() return f"Notification sent to {NTFY_BASE}/{topic} — '{title}: {message}'" # --------------------------------------------------------------------------- # Gitea # --------------------------------------------------------------------------- @mcp.tool() def gitea_list_repos(owner: Optional[str] = None, limit: int = 50) -> str: """ List Gitea repositories. Args: owner: User or org name. Defaults to the service account's accessible repos. limit: Max repos to return. Default: 50. """ if owner: data = _gitea("GET", f"/repos/search", params={"owner": owner, "limit": limit}) repos = data.get("data", []) if isinstance(data, dict) else data else: repos = _gitea( "GET", f"/repos/search", params={"limit": limit, "token": GITEA_TOKEN} ) repos = repos.get("data", []) if isinstance(repos, dict) else repos rows = [] for r in repos: archived = " [archived]" if r.get("archived") else "" private = " [private]" if r.get("private") else "" rows.append( f" {r['full_name']}{private}{archived} — " f"⭐{r.get('stars_count', 0)} " f"updated: {r.get('updated', '')[:10]}" ) return f"Repos ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def gitea_list_issues( repo: str, state: str = "open", limit: int = 20, ) -> str: """ List issues for a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). state: 'open', 'closed', or 'all'. Default: 'open'. limit: Max issues to return. Default: 20. """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea( "GET", f"/repos/{repo}/issues", params={"state": state, "type": "issues", "limit": limit}, ) if not data: return f"No {state} issues in {repo}." rows = [] for issue in data: labels = ", ".join(l["name"] for l in issue.get("labels", [])) label_str = f" [{labels}]" if labels else "" rows.append( f" #{issue['number']} {issue['title']}{label_str} — @{issue['user']['login']}" ) return f"{state.capitalize()} issues in {repo} ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def gitea_create_issue(repo: str, title: str, body: str = "") -> str: """ Create a new issue in a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). title: Issue title. body: Issue body/description. Optional. """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea("POST", f"/repos/{repo}/issues", json={"title": title, "body": body}) return ( f"Created issue #{data['number']}: {data['title']}\n URL: {data['html_url']}" ) @mcp.tool() def gitea_list_branches(repo: str) -> str: """ List branches for a Gitea repository. Args: repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG). """ if "/" not in repo: repo = f"{GITEA_ORG}/{repo}" data = _gitea("GET", f"/repos/{repo}/branches") rows = [ f" {b['name']}" + (" [default]" if b.get("is_default") else "") for b in data ] return f"Branches in {repo} ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Prometheus # --------------------------------------------------------------------------- @mcp.tool() def prometheus_query(query: str) -> str: """ Run an instant PromQL query. Args: query: PromQL expression e.g. 'up', 'node_memory_MemAvailable_bytes{job="node"}'. """ with httpx.Client(timeout=20) as client: r = client.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query}) r.raise_for_status() data = r.json() if data.get("status") != "success": return f"Query failed: {data.get('error', 'unknown error')}" results = data["data"]["result"] if not results: return f"No results for: {query}" rows = [] for item in results[:50]: # cap output metric = item["metric"] value = item["value"][1] if item.get("value") else "?" label_str = ", ".join( f'{k}="{v}"' for k, v in metric.items() if k != "__name__" ) name = metric.get("__name__", query) rows.append(f" {name}{{{label_str}}} = {value}") return f"Results ({len(results)}):\n" + "\n".join(rows) @mcp.tool() def prometheus_targets() -> str: """List all Prometheus scrape targets and their health status.""" with httpx.Client(timeout=20) as client: r = client.get(f"{PROMETHEUS_URL}/api/v1/targets") r.raise_for_status() data = r.json() active = data["data"].get("activeTargets", []) rows = [] for t in sorted(active, key=lambda x: x.get("labels", {}).get("job", "")): job = t.get("labels", {}).get("job", "?") instance = t.get("labels", {}).get("instance", "?") health = t.get("health", "?") last_scrape = t.get("lastScrapeDuration", 0) rows.append( f" {'✓' if health == 'up' else '✗'} {job:30s} {instance:40s} {health}" ) return f"Prometheus targets ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Grafana # --------------------------------------------------------------------------- @mcp.tool() def grafana_list_dashboards() -> str: """List all Grafana dashboards.""" with httpx.Client(timeout=20) as client: r = client.get( f"{GRAFANA_URL}/api/search", params={"type": "dash-db"}, auth=(GRAFANA_USER, GRAFANA_PASS), ) r.raise_for_status() data = r.json() rows = [] for d in data: folder = d.get("folderTitle", "General") rows.append(f" [{d['uid']:20s}] {d['title']:50s} folder={folder}") return f"Dashboards ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def grafana_list_alerts() -> str: """List Grafana alert rules and their current state.""" with httpx.Client(timeout=20) as client: r = client.get( f"{GRAFANA_URL}/api/v1/provisioning/alert-rules", auth=(GRAFANA_USER, GRAFANA_PASS), ) r.raise_for_status() data = r.json() if not data: return "No alert rules configured." rows = [] for rule in data: rows.append(f" {rule.get('title', '?'):50s} uid={rule.get('uid', '?')}") return f"Alert rules ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Sonarr # --------------------------------------------------------------------------- @mcp.tool() def sonarr_list_series(filter_name: Optional[str] = None) -> str: """ List all series in Sonarr. Args: filter_name: Optional substring to filter by series title. """ data = _arr(SONARR_URL, SONARR_API_KEY, "/series") if filter_name: term = filter_name.lower() data = [s for s in data if term in s.get("title", "").lower()] rows = [] for s in sorted(data, key=lambda x: x.get("sortTitle", "")): status = s.get("status", "?") monitored = "✓" if s.get("monitored") else "✗" ep_count = s.get("episodeCount", 0) rows.append(f" {monitored} {s['title']:50s} {status:12s} {ep_count} eps") return f"Series ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def sonarr_queue() -> str: """Show the Sonarr download queue.""" data = _arr(SONARR_URL, SONARR_API_KEY, "/queue") records = data.get("records", []) if isinstance(data, dict) else data if not records: return "Sonarr queue is empty." rows = [] for item in records: title = item.get("title", "?")[:60] status = item.get("status", "?") size = item.get("size", 0) sizeleft = item.get("sizeleft", 0) pct = int((1 - sizeleft / size) * 100) if size else 0 rows.append(f" {status:12s} {pct:3d}% {title}") return f"Sonarr queue ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # Radarr # --------------------------------------------------------------------------- @mcp.tool() def radarr_list_movies(filter_name: Optional[str] = None) -> str: """ List all movies in Radarr. Args: filter_name: Optional substring to filter by movie title. """ data = _arr(RADARR_URL, RADARR_API_KEY, "/movie") if filter_name: term = filter_name.lower() data = [m for m in data if term in m.get("title", "").lower()] rows = [] for m in sorted(data, key=lambda x: x.get("sortTitle", "")): monitored = "✓" if m.get("monitored") else "✗" downloaded = "↓" if m.get("hasFile") else " " year = m.get("year", "?") rows.append(f" {monitored}{downloaded} {m['title']:50s} {year}") return f"Movies ({len(rows)}):\n" + "\n".join(rows) @mcp.tool() def radarr_queue() -> str: """Show the Radarr download queue.""" data = _arr(RADARR_URL, RADARR_API_KEY, "/queue") records = data.get("records", []) if isinstance(data, dict) else data if not records: return "Radarr queue is empty." rows = [] for item in records: title = item.get("title", "?")[:60] status = item.get("status", "?") size = item.get("size", 0) sizeleft = item.get("sizeleft", 0) pct = int((1 - sizeleft / size) * 100) if size else 0 rows.append(f" {status:12s} {pct:3d}% {title}") return f"Radarr queue ({len(rows)}):\n" + "\n".join(rows) # --------------------------------------------------------------------------- # SABnzbd # --------------------------------------------------------------------------- @mcp.tool() def sabnzbd_queue() -> str: """Show the SABnzbd download queue.""" data = _sabnzbd("queue") queue = data.get("queue", {}) slots = queue.get("slots", []) if not slots: return f"SABnzbd queue empty. Status: {queue.get('status', '?')}" rows = [] for s in slots: name = s.get("filename", "?")[:60] status = s.get("status", "?") pct = s.get("percentage", "0") size = s.get("sizeleft", "?") rows.append(f" {status:12s} {pct:>4s}% {size:>8s} left {name}") speed = queue.get("speed", "0") eta = queue.get("timeleft", "?") return f"SABnzbd queue ({len(rows)}) — {speed} — ETA {eta}:\n" + "\n".join(rows) @mcp.tool() def sabnzbd_pause() -> str: """Pause the SABnzbd download queue.""" _sabnzbd("pause") return "SABnzbd queue paused." @mcp.tool() def sabnzbd_resume() -> str: """Resume the SABnzbd download queue.""" _sabnzbd("resume") return "SABnzbd queue resumed." # --------------------------------------------------------------------------- # SSH # --------------------------------------------------------------------------- @mcp.tool() def ssh_exec(host: str, command: str, timeout: int = 30) -> str: """ Run a command on a homelab host via SSH. Known hosts: atlantis, calypso, setillo, setillo-root, nuc, homelab-vm, rpi5. Requires the host to be in ~/.ssh/config or /etc/hosts. Args: host: SSH host alias (e.g. 'atlantis', 'calypso', 'setillo-root'). command: Shell command to execute remotely. timeout: Seconds before the command times out. Default: 30. """ if host not in SSH_KNOWN_HOSTS: return ( f"Host '{host}' not in allowed list.\n" f"Known hosts: {', '.join(SSH_KNOWN_HOSTS)}" ) try: result = subprocess.run( ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", host, command], capture_output=True, text=True, timeout=timeout, ) output = result.stdout if result.stderr: output += f"\n[stderr]\n{result.stderr}" if result.returncode != 0: output += f"\n[exit code {result.returncode}]" return output or "(no output)" except subprocess.TimeoutExpired: return f"Command timed out after {timeout}s." except Exception as e: return f"SSH error: {type(e).__name__}: {e}" # --------------------------------------------------------------------------- # Filesystem # --------------------------------------------------------------------------- @mcp.tool() def fs_read(path: str) -> str: """ Read a file from the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Absolute or ~-relative path to the file. """ try: p = _fs_safe(path) if not p.exists(): return f"File not found: {p}" if p.is_dir(): return f"'{p}' is a directory. Use fs_list to list it." size = p.stat().st_size if size > 1_000_000: return f"File too large ({size:,} bytes). Read it in parts or use grep." return p.read_text(errors="replace") except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error reading file: {e}" @mcp.tool() def fs_write(path: str, content: str) -> str: """ Write content to a file on the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Absolute or ~-relative path to the file. content: Text content to write. """ try: p = _fs_safe(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) return f"Written {len(content)} bytes to {p}" except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error writing file: {e}" @mcp.tool() def fs_list(path: str = "/home/homelab") -> str: """ List the contents of a directory on the local filesystem. Allowed roots: /home/homelab, /tmp. Args: path: Directory path. Default: /home/homelab. """ try: p = _fs_safe(path) if not p.exists(): return f"Path not found: {p}" if not p.is_dir(): return f"'{p}' is a file, not a directory." entries = sorted(p.iterdir(), key=lambda x: (x.is_file(), x.name)) rows = [] for entry in entries: kind = "DIR " if entry.is_dir() else "FILE" size = entry.stat().st_size if entry.is_file() else "" size_str = f"{size:>10,}" if size != "" else f"{'':>10}" rows.append(f" {kind} {size_str} {entry.name}") return f"Contents of {p} ({len(rows)} entries):\n" + "\n".join(rows) except PermissionError as e: return f"Permission denied: {e}" except Exception as e: return f"Error listing directory: {e}" # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) mcp.run()