Files
homelab-optimized/scripts/homelab-mcp/server.py
Gitea Mirror Bot 09f2569fb5
Some checks failed
Documentation / Build Docusaurus (push) Failing after 2m58s
Documentation / Deploy to GitHub Pages (push) Has been skipped
Sanitized mirror from private repository - 2026-03-30 09:47:37 UTC
2026-03-30 09:47:37 +00:00

2254 lines
76 KiB
Python

#!/usr/bin/env python3
"""
Homelab MCP Server
Provides MCP tools for managing homelab infrastructure:
- Portainer: stack/container management across all endpoints
- Gitea: repo, issue, and branch management
- Prometheus: PromQL queries and target inspection
- Grafana: dashboard and alert listing
- Sonarr/Radarr: media library and download queue
- SABnzbd: download queue management
- SSH: remote command execution on homelab hosts
- Filesystem: read/write files on the local machine
"""
import json
import os
import subprocess
from pathlib import Path
from typing import Optional
import httpx
from fastmcp import FastMCP
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
PORTAINER_URL = "http://100.83.230.112:10000"
PORTAINER_TOKEN = "REDACTED_TOKEN"
GITEA_TOKEN = "REDACTED_TOKEN"
NTFY_BASE = "https://ntfy.vish.gg"
REPO_PATH = Path("/home/homelab/organized/repos/homelab")
ENDPOINTS: dict[str, int] = {
"atlantis": 2,
"calypso": 443397,
"nuc": 443398,
"homelab": 443399,
"rpi5": 443395,
}
# Gitea — now on matrix-ubuntu via NPM
GITEA_URL = "https://git.vish.gg"
GITEA_ORG = "vish"
# Monitoring
PROMETHEUS_URL = "http://192.168.0.210:9090"
GRAFANA_URL = "http://192.168.0.210:3300"
GRAFANA_USER = "admin"
GRAFANA_PASS = "REDACTED_PASSWORD"
# Media
SONARR_URL = "http://192.168.0.200:8989"
SONARR_API_KEY = "REDACTED_API_KEY"
RADARR_URL = "http://192.168.0.200:7878"
RADARR_API_KEY = "REDACTED_API_KEY"
SABNZBD_URL = "http://192.168.0.200:8080"
SABNZBD_API_KEY = "REDACTED_API_KEY"
# AdGuard — primary on Calypso, secondary on NUC
ADGUARD_URL = "http://192.168.0.250:9080"
ADGUARD_USER = "vish"
ADGUARD_PASS = "REDACTED_PASSWORD"
# NPM — now on matrix-ubuntu
NPM_URL = "http://192.168.0.154:81"
NPM_USER = "your-email@example.com"
NPM_PASS = "REDACTED_PASSWORD"
# Headscale — on Calypso
HEADSCALE_URL = "https://headscale.vish.gg:8443"
HEADSCALE_CONTAINER = "headscale"
HEADSCALE_CALYPSO_SSH = "calypso"
# Authentik — on Calypso
AUTHENTIK_URL = "https://sso.vish.gg"
AUTHENTIK_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret
# Cloudflare
CLOUDFLARE_TOKEN = (
"FGXlHM7doB8Z4vxv84_ntzhG_Cx15RXs66zoouZU" # pragma: allowlist secret
)
CLOUDFLARE_ZONE_ID = "4dbd15d096d71101b7c0c6362b307a66"
# Uptime Kuma — on Pi-5
KUMA_DB_PATH = "/home/vish/docker/kuma/data/kuma.db"
KUMA_CONTAINER = "uptime-kuma"
KUMA_HOST = "pi-5"
# SSH — hostnames must resolve via /etc/hosts or ~/.ssh/config
SSH_KNOWN_HOSTS = [
"atlantis",
"calypso",
"setillo",
"setillo-root",
"nuc",
"homelab-vm",
"rpi5",
"pi-5",
"matrix-ubuntu",
"moon",
"olares",
"guava",
"pve",
"seattle",
"seattle-tailscale",
"gl-mt3000",
"gl-be3600",
"jellyfish",
]
# Filesystem — restrict read/write to safe root paths
FS_ALLOWED_ROOTS = [
Path("/home/homelab"),
Path("/tmp"),
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _portainer(method: str, path: str, **kwargs) -> dict | list:
"""Make a Portainer API request. Raises on HTTP error."""
with httpx.Client(verify=False, timeout=30) as client:
r = client.request(
method,
f"{PORTAINER_URL}/api{path}",
headers={"X-API-Key": PORTAINER_TOKEN},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _resolve_endpoint(endpoint: str) -> int:
"""Resolve endpoint name or numeric string to an endpoint ID."""
if endpoint.isdigit():
return int(endpoint)
ep = endpoint.lower()
if ep not in ENDPOINTS:
raise ValueError(
f"Unknown endpoint '{endpoint}'. Known: {', '.join(ENDPOINTS)}"
)
return ENDPOINTS[ep]
def _gitea(method: str, path: str, **kwargs) -> dict | list:
"""Make a Gitea API request."""
with httpx.Client(timeout=20) as client:
r = client.request(
method,
f"{GITEA_URL}/api/v1{path}",
headers={"Authorization": f"token {GITEA_TOKEN}"},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _arr(
base_url: str, api_key: str, path: str, params: dict | None = None
) -> dict | list:
"""Make a Sonarr/Radarr API request."""
with httpx.Client(timeout=20) as client:
r = client.get(
f"{base_url}/api/v3{path}",
headers={"X-Api-Key": api_key},
params=params or {},
)
r.raise_for_status()
return r.json()
def _sabnzbd(mode: str, extra: dict | None = None) -> dict:
"""Make a SABnzbd API request."""
params = {"apikey": SABNZBD_API_KEY, "output": "json", "mode": mode}
if extra:
params.update(extra)
with httpx.Client(timeout=20) as client:
r = client.get(f"{SABNZBD_URL}/api", params=params)
r.raise_for_status()
return r.json()
def _fs_safe(path: str) -> Path:
"""Resolve a path and verify it's under an allowed root."""
p = Path(path).expanduser().resolve()
for root in FS_ALLOWED_ROOTS:
try:
p.relative_to(root)
return p
except ValueError:
continue
allowed = ", ".join(str(r) for r in FS_ALLOWED_ROOTS)
raise PermissionError(f"Path '{path}' is outside allowed roots: {allowed}")
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP(
"Homelab",
instructions=(
"Tools for managing a homelab running Docker services across multiple hosts.\n\n"
"PORTAINER — Docker orchestration across 5 endpoints:\n"
" Endpoints: atlantis (main NAS, media stack), calypso (secondary NAS), "
"nuc (mini PC), homelab (VM at 192.168.0.210), rpi5 (Raspberry Pi).\n"
" Tools: list_endpoints, list_stacks, get_stack, redeploy_stack, "
"list_containers, get_container_logs, restart_container, start_container, "
"stop_container, list_stack_containers, check_portainer.\n\n"
"GITEA — Self-hosted git at git.vish.gg, org=vish. Repo names can be "
"'vish/homelab' or just 'homelab'.\n"
" Tools: gitea_list_repos, gitea_list_issues, gitea_create_issue, gitea_list_branches.\n\n"
"ADGUARD — DNS rewrite management for split-horizon DNS (Calypso, 192.168.0.250:9080).\n"
" The wildcard *.vish.gg → 100.85.21.51 (matrix-ubuntu) requires specific overrides\n"
" for unproxied services accessed internally (pt.vish.gg, sso.vish.gg, git.vish.gg).\n"
" Tools: adguard_list_rewrites, adguard_add_rewrite, adguard_delete_rewrite.\n\n"
"NPM — Nginx Proxy Manager on matrix-ubuntu (192.168.0.154:81).\n"
" Cert IDs: npm-6=mx.vish.gg, npm-7=livekit.mx.vish.gg, npm-8=*.vish.gg(CF),\n"
" npm-11=pt.vish.gg, npm-12=sso.vish.gg. NEVER reuse an existing npm-N ID.\n"
" Tools: npm_list_proxy_hosts, npm_list_certs, npm_get_proxy_host, npm_update_cert.\n\n"
"HEADSCALE — Self-hosted Tailscale coordination server on Calypso.\n"
" Access via SSH to calypso → docker exec headscale.\n"
" Tools: headscale_list_nodes, headscale_create_preauth_key, headscale_delete_node, "
"headscale_rename_node.\n\n"
"AUTHENTIK — SSO identity provider at sso.vish.gg (Calypso).\n"
" All proxy providers need cookie_domain=vish.gg to avoid redirect loops.\n"
" Embedded outpost PK: 9dcb1d53-a023-4222-a320-d27f66f06eb9.\n"
" To onboard a new service: create_proxy_provider → create_application (auto-binds to outpost).\n"
" Tools: authentik_list_applications, authentik_list_providers, authentik_list_users,\n"
" authentik_update_app_launch_url, authentik_set_provider_cookie_domain,\n"
" authentik_create_proxy_provider, authentik_create_application,\n"
" authentik_list_sessions, authentik_delete_session, authentik_get_events.\n\n"
"CLOUDFLARE — DNS management for vish.gg zone.\n"
" Most *.vish.gg are proxied (orange cloud). Unproxied: mx.vish.gg, headscale.vish.gg,\n"
" livekit.mx.vish.gg, pt.vish.gg, sso.vish.gg, derp*.vish.gg.\n"
" Tools: cloudflare_list_dns_records, cloudflare_create_dns_record,\n"
" cloudflare_delete_dns_record, cloudflare_update_dns_record.\n\n"
"UPTIME KUMA — Monitoring on Pi-5 (100.77.151.40:3001). DB manipulation via SSH+SQLite.\n"
" Always call kuma_restart after adding/modifying monitors.\n"
" Tools: kuma_list_monitors, kuma_list_groups, kuma_add_monitor, kuma_set_parent, "
"kuma_restart.\n\n"
"PROMETHEUS — PromQL queries against homelab metrics (192.168.0.210:9090).\n"
" Tools: prometheus_query, prometheus_targets.\n\n"
"GRAFANA — Dashboard and alert inspection (192.168.0.210:3300).\n"
" Tools: grafana_list_dashboards, grafana_list_alerts.\n\n"
"SONARR/RADARR — Media library and download queue on Atlantis (ports 8989/7878).\n"
" Tools: sonarr_list_series, sonarr_queue, radarr_list_movies, radarr_queue.\n\n"
"SABNZBD — Usenet download queue on Atlantis (port 8080).\n"
" Tools: sabnzbd_queue, sabnzbd_pause, sabnzbd_resume.\n\n"
"SSH — Run commands on homelab hosts. Allowed: atlantis, calypso, setillo, "
"setillo-root, nuc, homelab-vm, rpi5. Requires key auth in ~/.ssh/config.\n"
" Tool: ssh_exec(host, command).\n\n"
"FILESYSTEM — Read/write files on the local machine. "
"Allowed roots: /home/homelab, /tmp.\n"
" Tools: fs_read, fs_write, fs_list.\n\n"
"REPO — Inspect compose files in the homelab git repo at "
"/home/homelab/organized/repos/homelab.\n"
" Tools: list_homelab_services, get_compose_file.\n\n"
"UTILITIES — check_url (HTTP health check), send_notification (ntfy push)."
),
)
# ---------------------------------------------------------------------------
# Portainer — Endpoints
# ---------------------------------------------------------------------------
@mcp.tool()
def list_endpoints() -> str:
"""List all Portainer environments (servers) with their connection status."""
data = _portainer("GET", "/endpoints")
rows = []
for ep in data:
rows.append(
f" {ep['Name']} (id={ep['Id']}) — "
f"status={'online' if ep.get('Status') == 1 else 'offline'}"
f"containers={ep.get('Snapshots', [{}])[0].get('RunningContainerCount', '?')} running"
)
return "Endpoints:\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Portainer — Stacks
# ---------------------------------------------------------------------------
@mcp.tool()
def list_stacks(endpoint: Optional[str] = None) -> str:
"""
List all Portainer stacks with their status.
Args:
endpoint: Optional filter by endpoint name (atlantis, calypso, nuc,
homelab, rpi5) or numeric ID.
"""
data = _portainer("GET", "/stacks")
if endpoint:
ep_id = _resolve_endpoint(endpoint)
data = [s for s in data if s.get("EndpointId") == ep_id]
rows = []
for s in sorted(data, key=lambda x: x["Name"]):
status = "active" if s.get("Status") == 1 else "inactive"
ep_name = next(
(k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")),
str(s.get("EndpointId")),
)
git = s.get("GitConfig", {})
git_info = f" [git: {git.get('ConfigFilePath', '')}]" if git else ""
rows.append(
f" [{s['Id']}] {s['Name']}{status} — endpoint={ep_name}{git_info}"
)
return f"Stacks ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def get_stack(stack_name_or_id: str) -> str:
"""
Get detailed info about a specific stack.
Args:
stack_name_or_id: Stack name (partial match) or numeric ID.
"""
all_stacks = _portainer("GET", "/stacks")
if stack_name_or_id.isdigit():
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
else:
term = stack_name_or_id.lower()
matches = [s for s in all_stacks if term in s["Name"].lower()]
if not matches:
return f"No stack found matching '{stack_name_or_id}'."
if len(matches) > 1:
names = ", ".join(s["Name"] for s in matches)
return f"Multiple matches: {names}. Be more specific."
s = matches[0]
git = s.get("GitConfig") or {}
env = s.get("Env") or []
ep_name = next(
(k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")),
str(s.get("EndpointId")),
)
lines = [
f"Stack: {s['Name']} (id={s['Id']})",
f" Status: {'active' if s.get('Status') == 1 else 'inactive'}",
f" Endpoint: {ep_name} (id={s.get('EndpointId')})",
f" Created: {s.get('CreationDate', 'unknown')}",
f" Updated: {s.get('UpdateDate', 'unknown')}",
]
if git:
lines += [
f" Git URL: {git.get('URL', '')}",
f" Git Branch: {git.get('ReferenceName', '').replace('refs/heads/', '')}",
f" Git File: {git.get('ConfigFilePath', '')}",
]
if env:
lines.append(f" Env vars: {len(env)} set")
return "\n".join(lines)
@mcp.tool()
def redeploy_stack(stack_name_or_id: str) -> str:
"""
Trigger a GitOps redeploy of a stack (pull latest from Git and redeploy).
Args:
stack_name_or_id: Stack name (partial match) or numeric ID.
"""
all_stacks = _portainer("GET", "/stacks")
if stack_name_or_id.isdigit():
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
else:
term = stack_name_or_id.lower()
matches = [s for s in all_stacks if term in s["Name"].lower()]
if not matches:
return f"No stack found matching '{stack_name_or_id}'."
if len(matches) > 1:
names = ", ".join(s["Name"] for s in matches)
return f"Multiple matches: {names}. Be more specific."
s = matches[0]
if not s.get("GitConfig"):
return f"Stack '{s['Name']}' is not a GitOps stack — cannot redeploy via git."
ep_id = s["EndpointId"]
stack_id = s["Id"]
_portainer(
"PUT",
f"/stacks/{stack_id}/git/redeploy?endpointId={ep_id}",
json={
"pullImage": True,
"prune": False,
"repositoryAuthentication": True,
"repositoryUsername": "vish",
"repositoryPassword": GITEA_TOKEN,
},
)
return f"Redeploy triggered for stack '{s['Name']}' (id={stack_id}) on endpoint {ep_id}."
# ---------------------------------------------------------------------------
# Portainer — Containers
# ---------------------------------------------------------------------------
@mcp.tool()
def list_containers(
endpoint: str = "atlantis",
all_containers: bool = False,
filter_name: Optional[str] = None,
) -> str:
"""
List containers on a Portainer endpoint.
Args:
endpoint: Endpoint name (atlantis, calypso, nuc, homelab, rpi5) or ID.
all_containers: If True, include stopped containers. Default False (running only).
filter_name: Optional substring to filter container names.
"""
ep_id = _resolve_endpoint(endpoint)
params = "?all=true" if all_containers else ""
data = _portainer("GET", f"/endpoints/{ep_id}/docker/containers/json{params}")
if filter_name:
term = filter_name.lower()
data = [c for c in data if any(term in n.lower() for n in c.get("Names", []))]
rows = []
for c in sorted(data, key=lambda x: x.get("Names", [""])[0]):
name = c.get("Names", ["?"])[0].lstrip("/")
image = c.get("Image", "?").split(":")[0].split("/")[-1]
state = c.get("State", "?")
short_id = c.get("Id", "")[:12]
rows.append(f" {short_id} {state:10s} {name:40s} {image}")
header = f"Containers on {endpoint} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image"
return header + "\n" + "\n".join(rows)
@mcp.tool()
def get_container_logs(
container_id_or_name: str,
endpoint: str = "atlantis",
tail: int = 100,
) -> str:
"""
Get recent logs from a container.
Args:
container_id_or_name: Container ID (short or full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
tail: Number of log lines to return. Default: 100.
"""
ep_id = _resolve_endpoint(endpoint)
# Resolve by name if not a hex ID
if not all(c in "0123456789abcdefABCDEF" for c in container_id_or_name):
containers = _portainer(
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
)
term = container_id_or_name.lower()
matches = [
c for c in containers if any(term in n.lower() for n in c.get("Names", []))
]
if not matches:
return (
f"No container found matching '{container_id_or_name}' on {endpoint}."
)
if len(matches) > 1:
names = ", ".join(c["Names"][0].lstrip("/") for c in matches)
return f"Multiple matches: {names}. Be more specific."
container_id_or_name = matches[0]["Id"][:12]
with httpx.Client(verify=False, timeout=30) as client:
r = client.get(
f"{PORTAINER_URL}/api/endpoints/{ep_id}/docker/containers/{container_id_or_name}/logs",
headers={"X-API-Key": PORTAINER_TOKEN},
params={"stdout": 1, "stderr": 1, "tail": tail, "timestamps": 0},
)
r.raise_for_status()
# Docker log stream has 8-byte header per line; strip it
raw = r.content
lines = []
i = 0
while i < len(raw):
if i + 8 > len(raw):
break
size = int.from_bytes(raw[i + 4 : i + 8], "big")
line = raw[i + 8 : i + 8 + size].decode("utf-8", errors="replace").rstrip()
if line:
lines.append(line)
i += 8 + size
if not lines:
# fallback: treat as plain text
lines = r.text.splitlines()
return "\n".join(lines[-tail:])
@mcp.tool()
def restart_container(
container_id_or_name: str,
endpoint: str = "atlantis",
) -> str:
"""
Restart a container.
Args:
container_id_or_name: Container ID (short/full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
"""
ep_id = _resolve_endpoint(endpoint)
cid = _resolve_container_id(container_id_or_name, ep_id)
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/restart")
return f"Restarted container {cid} on {endpoint}."
@mcp.tool()
def stop_container(
container_id_or_name: str,
endpoint: str = "atlantis",
) -> str:
"""
Stop a running container.
Args:
container_id_or_name: Container ID (short/full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
"""
ep_id = _resolve_endpoint(endpoint)
cid = _resolve_container_id(container_id_or_name, ep_id)
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/stop")
return f"Stopped container {cid} on {endpoint}."
@mcp.tool()
def start_container(
container_id_or_name: str,
endpoint: str = "atlantis",
) -> str:
"""
Start a stopped container.
Args:
container_id_or_name: Container ID (short/full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
"""
ep_id = _resolve_endpoint(endpoint)
cid = _resolve_container_id(container_id_or_name, ep_id)
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/start")
return f"Started container {cid} on {endpoint}."
def _resolve_container_id(name_or_id: str, ep_id: int) -> str:
"""Resolve a container name substring to a short container ID."""
if len(name_or_id) >= 12 and all(c in "0123456789abcdefABCDEF" for c in name_or_id):
return name_or_id[:12]
containers = _portainer(
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
)
term = name_or_id.lower()
matches = [
c for c in containers if any(term in n.lower() for n in c.get("Names", []))
]
if not matches:
raise ValueError(f"No container found matching '{name_or_id}'.")
if len(matches) > 1:
names = ", ".join(c["Names"][0].lstrip("/") for c in matches)
raise ValueError(
f"Multiple containers match '{name_or_id}': {names}. Be more specific."
)
return matches[0]["Id"][:12]
# ---------------------------------------------------------------------------
# Portainer — Stack containers (convenience)
# ---------------------------------------------------------------------------
@mcp.tool()
def list_stack_containers(stack_name_or_id: str) -> str:
"""
List all containers belonging to a specific stack.
Args:
stack_name_or_id: Stack name (partial match) or numeric ID.
"""
all_stacks = _portainer("GET", "/stacks")
if stack_name_or_id.isdigit():
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
else:
term = stack_name_or_id.lower()
matches = [s for s in all_stacks if term in s["Name"].lower()]
if not matches:
return f"No stack found matching '{stack_name_or_id}'."
if len(matches) > 1:
names = ", ".join(s["Name"] for s in matches)
return f"Multiple matches: {names}. Be more specific."
s = matches[0]
ep_id = s["EndpointId"]
stack_name = s["Name"]
containers = _portainer(
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
)
# Filter by compose project label (Portainer uses com.docker.compose.project)
stack_containers = [
c
for c in containers
if c.get("Labels", {}).get("com.docker.compose.project", "").lower()
== stack_name.lower()
or any(stack_name.lower() in n.lower() for n in c.get("Names", []))
]
ep_name = next((k for k, v in ENDPOINTS.items() if v == ep_id), str(ep_id))
rows = []
for c in sorted(stack_containers, key=lambda x: x.get("Names", [""])[0]):
name = c.get("Names", ["?"])[0].lstrip("/")
state = c.get("State", "?")
short_id = c.get("Id", "")[:12]
image = c.get("Image", "?").split(":")[0].split("/")[-1]
rows.append(f" {short_id} {state:10s} {name:40s} {image}")
header = f"Containers in stack '{stack_name}' on {ep_name} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image"
return header + "\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Health checks
# ---------------------------------------------------------------------------
@mcp.tool()
def check_url(url: str, expected_status: int = 200) -> str:
"""
Perform an HTTP health check on a URL.
Args:
url: The URL to check (e.g. http://192.168.0.200:9443/api/status).
expected_status: Expected HTTP status code. Default: 200.
"""
try:
with httpx.Client(verify=False, timeout=10, follow_redirects=True) as client:
r = client.get(url)
ok = r.status_code == expected_status
return (
f"{'OK' if ok else 'FAIL'} {url}\n"
f" Status: {r.status_code} (expected {expected_status})\n"
f" Latency: {r.elapsed.total_seconds() * 1000:.0f}ms"
)
except Exception as e:
return f"ERROR {url}\n {type(e).__name__}: {e}"
@mcp.tool()
def check_portainer() -> str:
"""Quick health check of the Portainer API and summary of infrastructure."""
try:
status = _portainer("GET", "/status")
stacks = _portainer("GET", "/stacks")
stacks_list = stacks if isinstance(stacks, list) else []
status_dict = status if isinstance(status, dict) else {}
active = sum(
1 for s in stacks_list if isinstance(s, dict) and s.get("Status") == 1
)
return (
f"Portainer OK — version {status_dict.get('Version', '?')}\n"
f" Stacks: {len(stacks_list)} total, {active} active"
)
except Exception as e:
return f"Portainer UNREACHABLE: {e}"
# ---------------------------------------------------------------------------
# Repo — service inspection
# ---------------------------------------------------------------------------
@mcp.tool()
def list_homelab_services(host_filter: Optional[str] = None) -> str:
"""
List all services/stacks defined in the homelab repository.
Args:
host_filter: Optional substring to filter by host/path (e.g. 'atlantis', 'calypso', 'seattle').
"""
compose_files = list(REPO_PATH.rglob("docker-compose.yml")) + list(
REPO_PATH.rglob("docker-compose.yaml")
)
# Exclude archive
compose_files = [f for f in compose_files if "archive" not in f.parts]
rows = []
for f in sorted(compose_files):
rel = f.relative_to(REPO_PATH)
if host_filter and host_filter.lower() not in str(rel).lower():
continue
rows.append(f" {rel}")
return f"Compose files ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def get_compose_file(service_path: str) -> str:
"""
Read a compose file from the homelab repo.
Args:
service_path: Relative path within the repo, e.g.
'hosts/synology/atlantis/arr-suite/docker-compose.yml'
or a partial name like 'atlantis/arr-suite'.
"""
# Try exact relative path first
candidate = REPO_PATH / service_path
if candidate.is_file():
return candidate.read_text()
# Try fuzzy: find compose files whose path contains the fragment
# Searches docker-compose.yml/yaml AND standalone *.yaml/*.yml stack files
term = service_path.lower().replace("\\", "/")
all_compose = (
list(REPO_PATH.rglob("docker-compose.yml"))
+ list(REPO_PATH.rglob("docker-compose.yaml"))
+ list(REPO_PATH.rglob("*.yaml"))
+ list(REPO_PATH.rglob("*.yml"))
)
hits = [
f
for f in all_compose
if term in str(f.relative_to(REPO_PATH)).lower()
and "archive" not in f.parts
and ".git" not in f.parts
and "node_modules" not in f.parts
]
# Prefer docker-compose files if both match
compose_hits = [f for f in hits if "docker-compose" in f.name]
if compose_hits:
hits = compose_hits
if not hits:
return f"No compose file found matching '{service_path}'."
if len(hits) > 1:
paths = "\n".join(f" {f.relative_to(REPO_PATH)}" for f in hits)
return f"Multiple matches:\n{paths}\nBe more specific."
return hits[0].read_text()
# ---------------------------------------------------------------------------
# Notifications
# ---------------------------------------------------------------------------
@mcp.tool()
def send_notification(
message: str,
title: str = "Homelab",
topic: str = "homelab-alerts",
priority: str = "default",
tags: Optional[str] = None,
) -> str:
"""
Send a push notification via ntfy.
Args:
message: The notification body.
title: Notification title. Default: 'Homelab'.
topic: ntfy topic to publish to. Default: 'homelab-alerts'.
priority: urgent, high, default, low, or min. Default: 'default'.
tags: Comma-separated emoji tags e.g. 'warning,robot'. Optional.
"""
headers = {
"Title": title,
"Priority": priority,
}
if tags:
headers["Tags"] = tags
with httpx.Client(timeout=10) as client:
r = client.post(
f"{NTFY_BASE}/{topic}",
content=message.encode(),
headers=headers,
)
r.raise_for_status()
return f"Notification sent to {NTFY_BASE}/{topic}'{title}: {message}'"
# ---------------------------------------------------------------------------
# Gitea
# ---------------------------------------------------------------------------
@mcp.tool()
def gitea_list_repos(owner: Optional[str] = None, limit: int = 50) -> str:
"""
List Gitea repositories.
Args:
owner: User or org name. Defaults to the service account's accessible repos.
limit: Max repos to return. Default: 50.
"""
if owner:
data = _gitea("GET", f"/repos/search", params={"owner": owner, "limit": limit})
repos = data.get("data", []) if isinstance(data, dict) else data
else:
repos = _gitea(
"GET", f"/repos/search", params={"limit": limit, "token": GITEA_TOKEN}
)
repos = repos.get("data", []) if isinstance(repos, dict) else repos
rows = []
for r in repos:
archived = " [archived]" if r.get("archived") else ""
private = " [private]" if r.get("private") else ""
rows.append(
f" {r['full_name']}{private}{archived}"
f"{r.get('stars_count', 0)} "
f"updated: {r.get('updated', '')[:10]}"
)
return f"Repos ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def gitea_list_issues(
repo: str,
state: str = "open",
limit: int = 20,
) -> str:
"""
List issues for a Gitea repository.
Args:
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
state: 'open', 'closed', or 'all'. Default: 'open'.
limit: Max issues to return. Default: 20.
"""
if "/" not in repo:
repo = f"{GITEA_ORG}/{repo}"
data = _gitea(
"GET",
f"/repos/{repo}/issues",
params={"state": state, "type": "issues", "limit": limit},
)
if not data:
return f"No {state} issues in {repo}."
rows = []
for issue in data:
labels = ", ".join(l["name"] for l in issue.get("labels", []))
label_str = f" [{labels}]" if labels else ""
rows.append(
f" #{issue['number']} {issue['title']}{label_str} — @{issue['user']['login']}"
)
return f"{state.capitalize()} issues in {repo} ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def gitea_create_issue(repo: str, title: str, body: str = "") -> str:
"""
Create a new issue in a Gitea repository.
Args:
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
title: Issue title.
body: Issue body/description. Optional.
"""
if "/" not in repo:
repo = f"{GITEA_ORG}/{repo}"
data = _gitea("POST", f"/repos/{repo}/issues", json={"title": title, "body": body})
if isinstance(data, dict):
return f"Created issue #{data.get('number')}: {data.get('title')}\n URL: {data.get('html_url')}"
return f"Issue created: {data}"
@mcp.tool()
def gitea_list_branches(repo: str) -> str:
"""
List branches for a Gitea repository.
Args:
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
"""
if "/" not in repo:
repo = f"{GITEA_ORG}/{repo}"
data = _gitea("GET", f"/repos/{repo}/branches")
rows = [
f" {b['name']}" + (" [default]" if b.get("is_default") else "") for b in data
]
return f"Branches in {repo} ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Prometheus
# ---------------------------------------------------------------------------
@mcp.tool()
def prometheus_query(query: str) -> str:
"""
Run an instant PromQL query.
Args:
query: PromQL expression e.g. 'up', 'node_memory_MemAvailable_bytes{job="node"}'.
"""
with httpx.Client(timeout=20) as client:
r = client.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query})
r.raise_for_status()
data = r.json()
if data.get("status") != "success":
return f"Query failed: {data.get('error', 'unknown error')}"
results = data["data"]["result"]
if not results:
return f"No results for: {query}"
rows = []
for item in results[:50]: # cap output
metric = item["metric"]
value = item["value"][1] if item.get("value") else "?"
label_str = ", ".join(
f'{k}="{v}"' for k, v in metric.items() if k != "__name__"
)
name = metric.get("__name__", query)
rows.append(f" {name}{{{label_str}}} = {value}")
return f"Results ({len(results)}):\n" + "\n".join(rows)
@mcp.tool()
def prometheus_targets() -> str:
"""List all Prometheus scrape targets and their health status."""
with httpx.Client(timeout=20) as client:
r = client.get(f"{PROMETHEUS_URL}/api/v1/targets")
r.raise_for_status()
data = r.json()
active = data["data"].get("activeTargets", [])
rows = []
for t in sorted(active, key=lambda x: x.get("labels", {}).get("job", "")):
job = t.get("labels", {}).get("job", "?")
instance = t.get("labels", {}).get("instance", "?")
health = t.get("health", "?")
last_scrape = t.get("lastScrapeDuration", 0)
rows.append(
f" {'' if health == 'up' else ''} {job:30s} {instance:40s} {health}"
)
return f"Prometheus targets ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Grafana
# ---------------------------------------------------------------------------
@mcp.tool()
def grafana_list_dashboards() -> str:
"""List all Grafana dashboards."""
with httpx.Client(timeout=20) as client:
r = client.get(
f"{GRAFANA_URL}/api/search",
params={"type": "dash-db"},
auth=(GRAFANA_USER, GRAFANA_PASS),
)
r.raise_for_status()
data = r.json()
rows = []
for d in data:
folder = d.get("folderTitle", "General")
rows.append(f" [{d['uid']:20s}] {d['title']:50s} folder={folder}")
return f"Dashboards ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def grafana_list_alerts() -> str:
"""List Grafana alert rules and their current state."""
with httpx.Client(timeout=20) as client:
r = client.get(
f"{GRAFANA_URL}/api/v1/provisioning/alert-rules",
auth=(GRAFANA_USER, GRAFANA_PASS),
)
r.raise_for_status()
data = r.json()
if not data:
return "No alert rules configured."
rows = []
for rule in data:
rows.append(f" {rule.get('title', '?'):50s} uid={rule.get('uid', '?')}")
return f"Alert rules ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Sonarr
# ---------------------------------------------------------------------------
@mcp.tool()
def sonarr_list_series(filter_name: Optional[str] = None) -> str:
"""
List all series in Sonarr.
Args:
filter_name: Optional substring to filter by series title.
"""
data = _arr(SONARR_URL, SONARR_API_KEY, "/series")
if filter_name:
term = filter_name.lower()
data = [s for s in data if term in s.get("title", "").lower()]
rows = []
for s in sorted(data, key=lambda x: x.get("sortTitle", "")):
status = s.get("status", "?")
monitored = "" if s.get("monitored") else ""
ep_count = s.get("episodeCount", 0)
rows.append(f" {monitored} {s['title']:50s} {status:12s} {ep_count} eps")
return f"Series ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def sonarr_queue() -> str:
"""Show the Sonarr download queue."""
data = _arr(SONARR_URL, SONARR_API_KEY, "/queue")
records = data.get("records", []) if isinstance(data, dict) else data
if not records:
return "Sonarr queue is empty."
rows = []
for item in records:
title = item.get("title", "?")[:60]
status = item.get("status", "?")
size = item.get("size", 0)
sizeleft = item.get("sizeleft", 0)
pct = int((1 - sizeleft / size) * 100) if size else 0
rows.append(f" {status:12s} {pct:3d}% {title}")
return f"Sonarr queue ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Radarr
# ---------------------------------------------------------------------------
@mcp.tool()
def radarr_list_movies(filter_name: Optional[str] = None) -> str:
"""
List all movies in Radarr.
Args:
filter_name: Optional substring to filter by movie title.
"""
data = _arr(RADARR_URL, RADARR_API_KEY, "/movie")
if filter_name:
term = filter_name.lower()
data = [m for m in data if term in m.get("title", "").lower()]
rows = []
for m in sorted(data, key=lambda x: x.get("sortTitle", "")):
monitored = "" if m.get("monitored") else ""
downloaded = "" if m.get("hasFile") else " "
year = m.get("year", "?")
rows.append(f" {monitored}{downloaded} {m['title']:50s} {year}")
return f"Movies ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
def radarr_queue() -> str:
"""Show the Radarr download queue."""
data = _arr(RADARR_URL, RADARR_API_KEY, "/queue")
records = data.get("records", []) if isinstance(data, dict) else data
if not records:
return "Radarr queue is empty."
rows = []
for item in records:
title = item.get("title", "?")[:60]
status = item.get("status", "?")
size = item.get("size", 0)
sizeleft = item.get("sizeleft", 0)
pct = int((1 - sizeleft / size) * 100) if size else 0
rows.append(f" {status:12s} {pct:3d}% {title}")
return f"Radarr queue ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# SABnzbd
# ---------------------------------------------------------------------------
@mcp.tool()
def sabnzbd_queue() -> str:
"""Show the SABnzbd download queue."""
data = _sabnzbd("queue")
queue = data.get("queue", {})
slots = queue.get("slots", [])
if not slots:
return f"SABnzbd queue empty. Status: {queue.get('status', '?')}"
rows = []
for s in slots:
name = s.get("filename", "?")[:60]
status = s.get("status", "?")
pct = s.get("percentage", "0")
size = s.get("sizeleft", "?")
rows.append(f" {status:12s} {pct:>4s}% {size:>8s} left {name}")
speed = queue.get("speed", "0")
eta = queue.get("timeleft", "?")
return f"SABnzbd queue ({len(rows)}) — {speed} — ETA {eta}:\n" + "\n".join(rows)
@mcp.tool()
def sabnzbd_pause() -> str:
"""Pause the SABnzbd download queue."""
_sabnzbd("pause")
return "SABnzbd queue paused."
@mcp.tool()
def sabnzbd_resume() -> str:
"""Resume the SABnzbd download queue."""
_sabnzbd("resume")
return "SABnzbd queue resumed."
# ---------------------------------------------------------------------------
# SSH
# ---------------------------------------------------------------------------
@mcp.tool()
def ssh_exec(host: str, command: str, timeout: int = 30) -> str:
"""
Run a command on a homelab host via SSH.
Known hosts: atlantis, calypso, setillo, setillo-root, nuc, homelab-vm, rpi5.
Requires the host to be in ~/.ssh/config or /etc/hosts.
Args:
host: SSH host alias (e.g. 'atlantis', 'calypso', 'setillo-root').
command: Shell command to execute remotely.
timeout: Seconds before the command times out. Default: 30.
"""
if host not in SSH_KNOWN_HOSTS:
return (
f"Host '{host}' not in allowed list.\n"
f"Known hosts: {', '.join(SSH_KNOWN_HOSTS)}"
)
try:
result = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", host, command],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout
if result.stderr:
output += f"\n[stderr]\n{result.stderr}"
if result.returncode != 0:
output += f"\n[exit code {result.returncode}]"
return output or "(no output)"
except subprocess.TimeoutExpired:
return f"Command timed out after {timeout}s."
except Exception as e:
return f"SSH error: {type(e).__name__}: {e}"
# ---------------------------------------------------------------------------
# Filesystem
# ---------------------------------------------------------------------------
@mcp.tool()
def fs_read(path: str) -> str:
"""
Read a file from the local filesystem.
Allowed roots: /home/homelab, /tmp.
Args:
path: Absolute or ~-relative path to the file.
"""
try:
p = _fs_safe(path)
if not p.exists():
return f"File not found: {p}"
if p.is_dir():
return f"'{p}' is a directory. Use fs_list to list it."
size = p.stat().st_size
if size > 1_000_000:
return f"File too large ({size:,} bytes). Read it in parts or use grep."
return p.read_text(errors="replace")
except PermissionError as e:
return f"Permission denied: {e}"
except Exception as e:
return f"Error reading file: {e}"
@mcp.tool()
def fs_write(path: str, content: str) -> str:
"""
Write content to a file on the local filesystem.
Allowed roots: /home/homelab, /tmp.
Args:
path: Absolute or ~-relative path to the file.
content: Text content to write.
"""
try:
p = _fs_safe(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return f"Written {len(content)} bytes to {p}"
except PermissionError as e:
return f"Permission denied: {e}"
except Exception as e:
return f"Error writing file: {e}"
@mcp.tool()
def fs_list(path: str = "/home/homelab") -> str:
"""
List the contents of a directory on the local filesystem.
Allowed roots: /home/homelab, /tmp.
Args:
path: Directory path. Default: /home/homelab.
"""
try:
p = _fs_safe(path)
if not p.exists():
return f"Path not found: {p}"
if not p.is_dir():
return f"'{p}' is a file, not a directory."
entries = sorted(p.iterdir(), key=lambda x: (x.is_file(), x.name))
rows = []
for entry in entries:
kind = "DIR " if entry.is_dir() else "FILE"
size = entry.stat().st_size if entry.is_file() else ""
size_str = f"{size:>10,}" if size != "" else f"{'':>10}"
rows.append(f" {kind} {size_str} {entry.name}")
return f"Contents of {p} ({len(rows)} entries):\n" + "\n".join(rows)
except PermissionError as e:
return f"Permission denied: {e}"
except Exception as e:
return f"Error listing directory: {e}"
# ---------------------------------------------------------------------------
# Helper functions for new tools
# ---------------------------------------------------------------------------
def _adguard_token() -> str:
"""Get AdGuard session token via login."""
with httpx.Client(timeout=10) as client:
r = client.post(
f"{ADGUARD_URL}/control/login",
json={"name": ADGUARD_USER, "password": ADGUARD_PASS},
)
r.raise_for_status()
# Cookie-based auth
return r.cookies.get("agh_session") or ""
def _adguard(method: str, path: str, **kwargs) -> dict | list:
"""Make an AdGuard API request with cookie auth."""
with httpx.Client(timeout=15) as client:
# Login first to get session cookie
login = client.post(
f"{ADGUARD_URL}/control/login",
json={"name": ADGUARD_USER, "password": ADGUARD_PASS},
)
login.raise_for_status()
r = client.request(method, f"{ADGUARD_URL}/control{path}", **kwargs)
r.raise_for_status()
if r.content and r.headers.get("content-type", "").startswith(
"application/json"
):
return r.json()
return {}
def _npm_token() -> str:
"""Get NPM API token."""
with httpx.Client(timeout=10) as client:
r = client.post(
f"{NPM_URL}/api/tokens",
json={"identity": NPM_USER, "secret": NPM_PASS},
)
r.raise_for_status()
return r.json()["token"]
def _npm(method: str, path: str, token: str | None = None, **kwargs) -> dict | list:
"""Make an NPM API request."""
if token is None:
token = _npm_token()
with httpx.Client(timeout=15) as client:
r = client.request(
method,
f"{NPM_URL}/api{path}",
headers={"Authorization": f"Bearer {token}"},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _authentik(method: str, path: str, **kwargs) -> dict | list:
"""Make an Authentik API request."""
with httpx.Client(timeout=15, verify=False) as client:
r = client.request(
method,
f"{AUTHENTIK_URL}/api/v3{path}",
headers={"Authorization": f"Bearer {AUTHENTIK_TOKEN}"},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _cloudflare(method: str, path: str, **kwargs) -> dict:
"""Make a Cloudflare API request."""
with httpx.Client(timeout=15) as client:
r = client.request(
method,
f"https://api.cloudflare.com/client/v4{path}",
headers={"Authorization": f"Bearer {CLOUDFLARE_TOKEN}"},
**kwargs,
)
r.raise_for_status()
return r.json()
# ---------------------------------------------------------------------------
# AdGuard tools
# ---------------------------------------------------------------------------
@mcp.tool()
def adguard_list_rewrites() -> str:
"""
List all DNS rewrite rules in AdGuard Home (Calypso).
Returns domain → answer mappings. Useful for checking split-horizon DNS
overrides (e.g. which *.vish.gg services are pinned to specific IPs).
"""
try:
data = _adguard("GET", "/rewrite/list")
if not data:
return "No rewrite rules found."
lines = [f" {r['domain']:45s}{r['answer']}" for r in data] # type: ignore
return f"AdGuard DNS rewrites ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def adguard_add_rewrite(domain: str, answer: str) -> str:
"""
Add a DNS rewrite rule to AdGuard Home.
Use this to add split-horizon DNS overrides so internal services
resolve to LAN/Tailscale IPs instead of public ones.
Args:
domain: The domain to override (e.g. 'pt.vish.gg' or '*.example.com').
answer: The IP address to return (e.g. '192.168.0.154').
"""
try:
_adguard("POST", "/rewrite/add", json={"domain": domain, "answer": answer})
return f"Added rewrite: {domain}{answer}"
except Exception as e:
return f"Error adding rewrite: {e}"
@mcp.tool()
def adguard_delete_rewrite(domain: str, answer: str) -> str:
"""
Delete a DNS rewrite rule from AdGuard Home.
Args:
domain: The domain of the rule to delete.
answer: The answer IP of the rule to delete.
"""
try:
_adguard("POST", "/rewrite/delete", json={"domain": domain, "answer": answer})
return f"Deleted rewrite: {domain}{answer}"
except Exception as e:
return f"Error deleting rewrite: {e}"
# ---------------------------------------------------------------------------
# NPM (Nginx Proxy Manager) tools
# ---------------------------------------------------------------------------
@mcp.tool()
def npm_list_proxy_hosts(filter_domain: Optional[str] = None) -> str:
"""
List all proxy hosts in Nginx Proxy Manager (matrix-ubuntu).
Args:
filter_domain: Optional substring to filter by domain name.
"""
try:
token = _npm_token()
hosts = _npm("GET", "/nginx/proxy-hosts", token=token)
results = []
for h in hosts: # type: ignore
domains = ", ".join(h.get("domain_names", []))
if filter_domain and filter_domain.lower() not in domains.lower():
continue
fwd = f"{h.get('forward_scheme')}://{h.get('forward_host')}:{h.get('forward_port')}"
cert = h.get("certificate_id", 0)
enabled = "" if h.get("enabled") else ""
results.append(
f" [{enabled}] ID:{h['id']:3d} {domains:45s}{fwd:35s} cert:{cert}"
)
if not results:
return "No proxy hosts found."
return f"NPM proxy hosts ({len(results)}):\n" + "\n".join(results)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def npm_list_certs() -> str:
"""
List all SSL certificates in Nginx Proxy Manager with their domains and expiry.
"""
try:
token = _npm_token()
certs = _npm("GET", "/nginx/certificates", token=token)
lines = []
for c in certs: # type: ignore
domains = ", ".join(c.get("domain_names", []))
provider = c.get("provider", "?")
expires = (c.get("expires_on") or "?")[:10]
nice = c.get("nice_name", "")
lines.append(
f" ID:{c['id']:3d} [{provider:12s}] expires:{expires} {nice or domains}"
)
return f"NPM certificates ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def npm_get_proxy_host(host_id: int) -> str:
"""
Get details of a specific NPM proxy host including advanced config and cert.
Args:
host_id: The proxy host ID (from npm_list_proxy_hosts).
"""
try:
token = _npm_token()
h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token)
lines = [
f"ID: {h['id']}", # type: ignore
f"Domains: {', '.join(h['domain_names'])}", # type: ignore
f"Forward: {h['forward_scheme']}://{h['forward_host']}:{h['forward_port']}", # type: ignore
f"SSL forced: {h.get('ssl_forced')}", # type: ignore
f"Certificate ID: {h.get('certificate_id')}", # type: ignore
f"Websocket: {h.get('allow_websocket_upgrade')}", # type: ignore
f"Enabled: {h.get('enabled')}", # type: ignore
]
adv = h.get("advanced_config", "").strip() # type: ignore
if adv:
lines.append(f"Advanced config:\n{adv}")
meta = h.get("meta", {}) # type: ignore
if isinstance(meta, dict) and meta.get("nginx_err"):
lines.append(f"nginx_err: {meta['nginx_err']}")
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def npm_update_cert(host_id: int, certificate_id: int) -> str:
"""
Update the SSL certificate used by an NPM proxy host.
Args:
host_id: The proxy host ID.
certificate_id: The certificate ID to assign (from npm_list_certs).
"""
try:
token = _npm_token()
h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token)
h_dict = h if isinstance(h, dict) else {}
payload = {
k: h_dict.get(k)
for k in [
"domain_names",
"forward_scheme",
"forward_host",
"forward_port",
"access_list_id",
"ssl_forced",
"caching_enabled",
"block_exploits",
"advanced_config",
"meta",
"allow_websocket_upgrade",
"http2_support",
"hsts_enabled",
"hsts_subdomains",
"locations",
]
}
payload["certificate_id"] = certificate_id
result = _npm("PUT", f"/nginx/proxy-hosts/{host_id}", token=token, json=payload)
return f"Updated host {host_id} ({', '.join(result.get('domain_names', []))}) to cert {certificate_id}" # type: ignore
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Headscale tools
# ---------------------------------------------------------------------------
@mcp.tool()
def headscale_list_nodes() -> str:
"""
List all nodes registered in the Headscale tailnet.
Shows node ID, hostname, Tailscale IP, online status, and expiry.
"""
try:
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
"sudo /usr/local/bin/docker exec headscale headscale nodes list --output json",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
nodes = json.loads(result.stdout)
lines = []
for n in nodes:
ips = ", ".join(n.get("ip_addresses", []))
online = "🟢" if n.get("online") else "🔴"
name = n.get("given_name") or n.get("name", "?")
lines.append(f" {online} ID:{str(n['id']):3s} {name:25s} {ips}")
return f"Headscale nodes ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def headscale_create_preauth_key(
expiration: str = "24h",
reusable: bool = False,
ephemeral: bool = False,
) -> str:
"""
Create a Headscale pre-authentication key for registering a new node.
Args:
expiration: Key expiry duration e.g. '24h', '7d'. Default: '24h'.
reusable: Allow multiple nodes to use this key. Default: False.
ephemeral: Node is removed when it goes offline. Default: False.
"""
try:
flags = f"--expiration {expiration}"
if reusable:
flags += " --reusable"
if ephemeral:
flags += " --ephemeral"
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
f"sudo /usr/local/bin/docker exec headscale headscale preauthkeys create --user 1 {flags} --output json",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
data = json.loads(result.stdout)
key = data.get("key", "?")
exp = data.get("expiration", "?")
return (
f"Pre-auth key created:\n"
f" Key: {key}\n"
f" Expires: {exp}\n"
f" Reusable: {reusable}, Ephemeral: {ephemeral}\n\n"
f"Use on new node:\n"
f" tailscale up --login-server=https://headscale.vish.gg:8443 --authkey={key} --accept-routes=false"
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def headscale_delete_node(node_id: str) -> str:
"""
Delete a node from the Headscale tailnet.
Args:
node_id: The numeric node ID (from headscale_list_nodes).
"""
try:
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
f"sudo /usr/local/bin/docker exec headscale headscale nodes delete --identifier {node_id} --output json",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
return f"Node {node_id} deleted successfully."
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def headscale_rename_node(node_id: str, new_name: str) -> str:
"""
Rename a node in the Headscale tailnet.
Args:
node_id: The numeric node ID.
new_name: The new hostname/givenName.
"""
try:
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
f"sudo /usr/local/bin/docker exec headscale headscale nodes rename --identifier {node_id} {new_name}",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
return f"Node {node_id} renamed to '{new_name}'."
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Authentik tools
# ---------------------------------------------------------------------------
@mcp.tool()
def authentik_list_applications() -> str:
"""
List all applications configured in Authentik SSO.
"""
try:
data = _authentik("GET", "/core/applications/?page_size=100")
apps = data.get("results", []) # type: ignore
lines = []
for a in apps:
slug = a.get("slug", "?")
name = a.get("name", "?")
provider = a.get("provider")
launch = a.get("meta_launch_url") or ""
lines.append(f" {slug:30s} {name:35s} provider:{provider} {launch}")
return f"Authentik applications ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_list_providers() -> str:
"""
List all OAuth2/OIDC/proxy providers in Authentik.
"""
try:
data = _authentik("GET", "/providers/all/?page_size=100")
providers = data.get("results", []) # type: ignore
lines = []
for p in providers:
pk = p.get("pk")
name = p.get("name", "?")
component = p.get("component", "?").replace("ak-provider-", "")
lines.append(f" PK:{pk:4} [{component:20s}] {name}")
return f"Authentik providers ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_list_users() -> str:
"""
List all users in Authentik.
"""
try:
data = _authentik("GET", "/core/users/?page_size=100")
users = data.get("results", []) # type: ignore
lines = []
for u in users:
pk = u.get("pk")
username = u.get("username", "?")
email = u.get("email", "?")
active = "" if u.get("is_active") else ""
lines.append(f" [{active}] PK:{pk:4} {username:20s} {email}")
return f"Authentik users ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_update_app_launch_url(slug: str, launch_url: str) -> str:
"""
Update the launch URL of an Authentik application (the link shown on the dashboard).
Args:
slug: The application slug (from authentik_list_applications).
launch_url: The new launch URL e.g. 'https://pt.vish.gg'.
"""
try:
result = _authentik(
"PATCH", f"/core/applications/{slug}/", json={"meta_launch_url": launch_url}
)
return f"Updated '{slug}' launch URL to: {result.get('meta_launch_url')}" # type: ignore
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_set_provider_cookie_domain(provider_pk: int, cookie_domain: str) -> str:
"""
Set the cookie domain on an Authentik proxy provider.
Required to prevent redirect loops with Forward Auth.
Args:
provider_pk: The provider PK (from authentik_list_providers).
cookie_domain: Cookie domain e.g. 'vish.gg'.
"""
try:
provider = _authentik("GET", f"/providers/proxy/{provider_pk}/")
provider["cookie_domain"] = cookie_domain # type: ignore
result = _authentik("PUT", f"/providers/proxy/{provider_pk}/", json=provider)
return f"Provider {provider_pk} cookie_domain set to: {result.get('cookie_domain')}" # type: ignore
except Exception as e:
return f"Error: {e}"
AUTHENTIK_OUTPOST_PK = "9dcb1d53-a023-4222-a320-d27f66f06eb9"
AUTHENTIK_DEFAULT_AUTH_FLOW = "default-provider-authorization-implicit-consent"
AUTHENTIK_DEFAULT_INVALIDATION_FLOW = "default-provider-invalidation-flow"
@mcp.tool()
def authentik_create_proxy_provider(
name: str,
external_host: str,
mode: str = "forward_single",
cookie_domain: str = "vish.gg",
) -> str:
"""
Create a proxy provider in Authentik and bind it to the embedded outpost.
Args:
name: Provider name, e.g. 'Grafana Forward Auth'.
external_host: External URL, e.g. 'https://grafana.vish.gg'.
mode: Proxy mode — 'forward_single' (default, for NPM forward auth) or 'forward_domain'.
cookie_domain: Cookie domain to prevent redirect loops. Default 'vish.gg'.
"""
try:
# Resolve authorization + invalidation flow slugs to PKs
auth_flows = _authentik("GET", f"/flows/instances/?slug={AUTHENTIK_DEFAULT_AUTH_FLOW}")
auth_results = auth_flows.get("results", []) # type: ignore
if not auth_results:
return f"Error: authorization flow '{AUTHENTIK_DEFAULT_AUTH_FLOW}' not found"
auth_flow_pk = auth_results[0]["pk"]
inval_flows = _authentik("GET", f"/flows/instances/?slug={AUTHENTIK_DEFAULT_INVALIDATION_FLOW}")
inval_results = inval_flows.get("results", []) # type: ignore
if not inval_results:
return f"Error: invalidation flow '{AUTHENTIK_DEFAULT_INVALIDATION_FLOW}' not found"
inval_flow_pk = inval_results[0]["pk"]
provider = _authentik("POST", "/providers/proxy/", json={
"name": name,
"authorization_flow": auth_flow_pk,
"invalidation_flow": inval_flow_pk,
"external_host": external_host,
"mode": mode,
"cookie_domain": cookie_domain,
})
pk = provider.get("pk") # type: ignore
# Bind to embedded outpost
outpost = _authentik("GET", f"/outposts/instances/{AUTHENTIK_OUTPOST_PK}/")
providers = outpost.get("providers", []) # type: ignore
if pk not in providers:
providers.append(pk)
_authentik("PATCH", f"/outposts/instances/{AUTHENTIK_OUTPOST_PK}/", json={
"providers": providers,
})
return (
f"Created proxy provider '{name}' (PK:{pk})\n"
f" external_host: {external_host}\n"
f" mode: {mode}\n"
f" cookie_domain: {cookie_domain}\n"
f" Bound to embedded outpost"
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_create_application(
name: str,
slug: str,
provider_pk: int,
launch_url: str = "",
) -> str:
"""
Create an application in Authentik linked to an existing provider.
Args:
name: Display name, e.g. 'Grafana'.
slug: URL-safe slug, e.g. 'grafana'.
provider_pk: PK of the provider to attach (from authentik_create_proxy_provider or authentik_list_providers).
launch_url: Optional launch URL shown on the Authentik dashboard.
"""
try:
app_data: dict = {
"name": name,
"slug": slug,
"provider": provider_pk,
}
if launch_url:
app_data["meta_launch_url"] = launch_url
result = _authentik("POST", "/core/applications/", json=app_data)
return (
f"Created application '{name}'\n"
f" slug: {result.get('slug')}\n" # type: ignore
f" provider: PK:{provider_pk}\n"
f" launch_url: {result.get('meta_launch_url', '(none)')}" # type: ignore
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_list_sessions() -> str:
"""
List active authenticated sessions in Authentik.
Useful for debugging login issues.
"""
try:
data = _authentik("GET", "/core/authenticated_sessions/?page_size=50")
sessions = data.get("results", []) # type: ignore
if not sessions:
return "No active sessions."
lines = []
for s in sessions:
uuid = s.get("uuid", "?")
user = s.get("user", {})
username = user.get("username", "?") if isinstance(user, dict) else f"uid:{user}"
last_ip = s.get("last_ip", "?")
last_used = (s.get("last_used") or "?")[:19]
ua = s.get("user_agent", {})
browser = ua.get("user_agent", {}).get("family", "?") if isinstance(ua, dict) else "?"
lines.append(f" {uuid[:8]} {username:15s} {last_ip:16s} {browser:10s} {last_used}")
return f"Active sessions ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_delete_session(session_uuid: str) -> str:
"""
Delete (invalidate) an authenticated session in Authentik.
Use authentik_list_sessions to find the UUID.
Args:
session_uuid: The session UUID to delete.
"""
try:
_authentik("DELETE", f"/core/authenticated_sessions/{session_uuid}/")
return f"Deleted session {session_uuid}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def authentik_get_events(action: str = "", limit: int = 20) -> str:
"""
Get recent Authentik audit log events. Useful for debugging auth failures, login issues, and policy denials.
Args:
action: Optional filter by action type, e.g. 'login', 'login_failed', 'authorize_application',
'model_created', 'model_updated', 'secret_rotate', 'policy_exception'. Leave empty for all.
limit: Number of events to return (max 50, default 20).
"""
try:
limit = min(limit, 50)
params = f"page_size={limit}&ordering=-created"
if action:
params += f"&action={action}"
data = _authentik("GET", f"/events/events/?{params}")
events = data.get("results", []) # type: ignore
if not events:
return "No events found."
lines = []
for e in events:
ts = (e.get("created") or "?")[:19]
act = e.get("action", "?")
user = e.get("user", {})
username = user.get("username", "system") if isinstance(user, dict) else "system"
app = e.get("app", "").replace("authentik.", "")
ctx = e.get("context", {})
msg = ctx.get("message", "") if isinstance(ctx, dict) else ""
detail = msg[:60] if msg else app
lines.append(f" {ts} {act:30s} {username:15s} {detail}")
total = data.get("pagination", {}).get("count", "?") # type: ignore
return f"Events (showing {len(lines)} of {total}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Cloudflare tools
# ---------------------------------------------------------------------------
@mcp.tool()
def cloudflare_list_dns_records(filter_name: Optional[str] = None) -> str:
"""
List DNS records for the vish.gg zone in Cloudflare.
Args:
filter_name: Optional substring to filter by record name.
"""
try:
data = _cloudflare(
"GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records?per_page=200"
)
records = data.get("result", [])
lines = []
for r in records:
name = r.get("name", "?")
if filter_name and filter_name.lower() not in name.lower():
continue
rtype = r.get("type", "?")
content = r.get("content", "?")
proxied = "" if r.get("proxied") else ""
lines.append(f" {proxied} {rtype:6s} {name:45s}{content}")
return f"Cloudflare DNS records ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def cloudflare_create_dns_record(
name: str,
content: str,
record_type: str = "A",
proxied: bool = True,
ttl: int = 1,
) -> str:
"""
Create a new DNS record in the vish.gg Cloudflare zone.
Args:
name: Record name e.g. 'pt' (becomes pt.vish.gg) or 'pt.vish.gg'.
content: Record value e.g. '184.23.52.14' or 'calypso.vish.gg'.
record_type: DNS type: 'A', 'CNAME', 'TXT', etc. Default: 'A'.
proxied: Route through Cloudflare proxy. Default: True.
ttl: TTL in seconds (1 = auto). Default: 1.
"""
try:
data = _cloudflare(
"POST",
f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records",
json={
"type": record_type,
"name": name,
"content": content,
"proxied": proxied,
"ttl": ttl,
},
)
r = data.get("result", {})
return f"Created: {r.get('type')} {r.get('name')}{r.get('content')} proxied:{r.get('proxied')}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def cloudflare_delete_dns_record(record_id: str) -> str:
"""
Delete a DNS record from the vish.gg Cloudflare zone.
Args:
record_id: The record ID (from cloudflare_list_dns_records — use the ID shown).
"""
try:
_cloudflare("DELETE", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}")
return f"Deleted DNS record {record_id}."
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def cloudflare_update_dns_record(
record_id: str,
content: str,
proxied: Optional[bool] = None,
) -> str:
"""
Update an existing DNS record in Cloudflare.
Args:
record_id: The record ID to update.
content: New record value (IP or hostname).
proxied: Update proxied status. None = keep existing.
"""
try:
existing_data = _cloudflare(
"GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}"
)
existing = existing_data.get("result", {})
payload = {
"type": existing.get("type"),
"name": existing.get("name"),
"content": content,
"proxied": proxied if proxied is not None else existing.get("proxied"),
"ttl": existing.get("ttl", 1),
}
data = _cloudflare(
"PUT", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}", json=payload
)
r = data.get("result", {})
return (
f"Updated: {r.get('name')}{r.get('content')} proxied:{r.get('proxied')}"
)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Uptime Kuma tools
# ---------------------------------------------------------------------------
def _kuma_sqlite(query: str) -> str:
"""Run a SQLite query against the Kuma DB via SSH."""
result = subprocess.run(
[
"ssh",
KUMA_HOST,
f'docker exec {KUMA_CONTAINER} sqlite3 /app/data/kuma.db "{query}"',
],
capture_output=True,
text=True,
timeout=20,
)
if result.returncode != 0:
raise RuntimeError(result.stderr)
return result.stdout.strip()
@mcp.tool()
def kuma_list_monitors(filter_name: Optional[str] = None) -> str:
"""
List all monitors in Uptime Kuma with their status and type.
Args:
filter_name: Optional substring to filter by monitor name.
"""
try:
rows = _kuma_sqlite(
"SELECT id, name, type, active, url, hostname, port, parent FROM monitor ORDER BY parent, name;"
)
if not rows:
return "No monitors found."
lines = []
for row in rows.splitlines():
parts = row.split("|")
if len(parts) < 8:
continue
mid, name, mtype, active, url, hostname, port, parent = parts[:8]
if filter_name and filter_name.lower() not in name.lower():
continue
status = "" if active == "1" else ""
target = url or (f"{hostname}:{port}" if hostname else "")
indent = " └─ " if parent and parent != "" else ""
lines.append(
f" [{status}] ID:{mid:4s} {indent}{name:30s} [{mtype:8s}] {target}"
)
return f"Kuma monitors ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def kuma_list_groups() -> str:
"""
List all monitor groups in Uptime Kuma (for use as parent IDs).
"""
try:
rows = _kuma_sqlite(
"SELECT id, name, parent FROM monitor WHERE type='group' ORDER BY name;"
)
if not rows:
return "No groups found."
lines = []
for row in rows.splitlines():
parts = row.split("|")
mid, name = parts[0], parts[1]
parent = parts[2] if len(parts) > 2 else ""
lines.append(f" ID:{mid:4s} {name:30s} parent:{parent or 'none'}")
return f"Kuma groups:\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def kuma_add_monitor(
name: str,
monitor_type: str,
url: Optional[str] = None,
hostname: Optional[str] = None,
port: Optional[int] = None,
parent_id: Optional[int] = None,
interval: int = 60,
ignore_tls: bool = False,
) -> str:
"""
Add a new monitor to Uptime Kuma.
Requires a Kuma restart to take effect (call kuma_restart after adding monitors).
Args:
name: Monitor display name.
monitor_type: 'http', 'port', 'ping', 'group'.
url: URL for http monitors e.g. 'https://pt.vish.gg/'.
hostname: Hostname/IP for port/ping monitors.
port: Port number for port monitors.
parent_id: Parent group monitor ID (from kuma_list_groups).
interval: Check interval in seconds. Default: 60.
ignore_tls: Ignore TLS cert errors. Default: False.
"""
try:
url_val = url or "https://"
host_val = hostname or ""
port_val = port or 0
parent_val = parent_id or "NULL"
ignore_tls_val = 1 if ignore_tls else 0
query = (
f"INSERT INTO monitor "
f"(name, active, user_id, interval, url, type, weight, hostname, port, "
f"maxretries, ignore_tls, upside_down, maxredirects, accepted_statuscodes_json, parent) "
f"VALUES ('{name}', 1, 1, {interval}, '{url_val}', '{monitor_type}', 2000, "
f"'{host_val}', {port_val}, 3, {ignore_tls_val}, 0, 10, '[\"200-299\"]', {parent_val});"
f"SELECT last_insert_rowid();"
)
result = _kuma_sqlite(query)
monitor_id = result.strip().split("\n")[-1]
return (
f"Monitor '{name}' added (ID: {monitor_id}).\n"
f"Call kuma_restart to activate."
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def kuma_set_parent(monitor_id: int, parent_id: Optional[int] = None) -> str:
"""
Set or clear the parent group of a Kuma monitor.
Args:
monitor_id: Monitor ID to update.
parent_id: Parent group ID, or None to remove from group.
"""
try:
parent_val = str(parent_id) if parent_id is not None else "NULL"
_kuma_sqlite(f"UPDATE monitor SET parent={parent_val} WHERE id={monitor_id};")
return f"Monitor {monitor_id} parent set to {parent_val}. Call kuma_restart to apply."
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def kuma_restart() -> str:
"""
Restart the Uptime Kuma container to apply DB changes.
Required after any kuma_add_monitor or kuma_set_parent operations.
"""
try:
result = subprocess.run(
["ssh", KUMA_HOST, f"docker restart {KUMA_CONTAINER}"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return f"Error restarting Kuma: {result.stderr}"
return "Uptime Kuma restarted successfully."
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
mcp.run()