Files
homelab-optimized/scripts/homelab-mcp/server.py
Gitea Mirror Bot a25375e7be
Some checks failed
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Documentation / Build Docusaurus (push) Has been cancelled
Sanitized mirror from private repository - 2026-04-05 06:40:46 UTC
2026-04-05 06:40:46 +00:00

2588 lines
85 KiB
Python

#!/usr/bin/env python3
"""
Homelab MCP Server
Provides MCP tools for managing homelab infrastructure:
- Portainer: stack/container management across all endpoints
- Gitea: repo, issue, and branch management
- Prometheus: PromQL queries and target inspection
- Grafana: dashboard and alert listing
- Sonarr/Radarr: media library and download queue
- SABnzbd: download queue management
- SSH: remote command execution on homelab hosts
- Filesystem: read/write files on the local machine
"""
import functools
import json
import logging
import os
import subprocess
import traceback
from pathlib import Path
from typing import Optional
import httpx
from fastmcp import FastMCP
logger = logging.getLogger("homelab-mcp")
def _safe(func):
"""Wrap MCP tool functions so unhandled exceptions return error strings
instead of crashing the server process."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error("Tool %s failed: %s\n%s", func.__name__, e, traceback.format_exc())
return f"Error in {func.__name__}: {type(e).__name__}: {e}"
return wrapper
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
PORTAINER_URL = "http://100.83.230.112:10000"
PORTAINER_TOKEN = "REDACTED_TOKEN"
GITEA_TOKEN = "REDACTED_TOKEN"
NTFY_BASE = "https://ntfy.vish.gg"
REPO_PATH = Path("/home/homelab/organized/repos/homelab")
ENDPOINTS: dict[str, int] = {
"atlantis": 2,
"calypso": 443397,
"nuc": 443398,
"homelab": 443399,
"rpi5": 443395,
}
# Gitea — now on matrix-ubuntu via NPM
GITEA_URL = "https://git.vish.gg"
GITEA_ORG = "vish"
# Monitoring
PROMETHEUS_URL = "http://192.168.0.210:9090"
GRAFANA_URL = "http://192.168.0.210:3300"
GRAFANA_USER = "admin"
GRAFANA_PASS = "REDACTED_PASSWORD"
# Media
SONARR_URL = "http://192.168.0.200:8989"
SONARR_API_KEY = "REDACTED_API_KEY"
RADARR_URL = "http://192.168.0.200:7878"
RADARR_API_KEY = "REDACTED_API_KEY"
SABNZBD_URL = "http://192.168.0.200:8080"
SABNZBD_API_KEY = "REDACTED_API_KEY"
# AdGuard — primary on Calypso, secondary on NUC
ADGUARD_URL = "http://192.168.0.250:9080"
ADGUARD_USER = "vish"
ADGUARD_PASS = "REDACTED_PASSWORD"
# NPM — now on matrix-ubuntu
NPM_URL = "http://192.168.0.154:81"
NPM_USER = "your-email@example.com"
NPM_PASS = "REDACTED_PASSWORD"
# Headscale — on Calypso
HEADSCALE_URL = "https://headscale.vish.gg:8443"
HEADSCALE_CONTAINER = "headscale"
HEADSCALE_CALYPSO_SSH = "calypso"
# Authentik — on Calypso
AUTHENTIK_URL = "https://sso.vish.gg"
AUTHENTIK_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret
# Cloudflare
CLOUDFLARE_TOKEN = (
"FGXlHM7doB8Z4vxv84_ntzhG_Cx15RXs66zoouZU" # pragma: allowlist secret
)
CLOUDFLARE_ZONE_ID = "4dbd15d096d71101b7c0c6362b307a66"
# Ollama — on Olares (LAN NodePort, bypasses Olares auth proxy)
OLLAMA_URL = "http://192.168.0.145:31434"
OLLAMA_MODEL = "qwen3-coder:latest"
# Jellyfin — on Olares (via kubectl exec to bypass envoy auth sidecar)
JELLYFIN_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret
JELLYFIN_USER_ID = "308e0dab19ce4a2180a2933d73694514"
# Authentik — constants for provider/app creation
AUTHENTIK_OUTPOST_PK = "9dcb1d53-a023-4222-a320-d27f66f06eb9"
AUTHENTIK_DEFAULT_AUTH_FLOW = "default-provider-authorization-explicit-consent"
AUTHENTIK_DEFAULT_INVALIDATION_FLOW = "default-provider-invalidation-flow"
# Uptime Kuma — on Pi-5
KUMA_DB_PATH = "/home/vish/docker/kuma/data/kuma.db"
KUMA_CONTAINER = "uptime-kuma"
KUMA_HOST = "pi-5"
# SSH — hostnames must resolve via /etc/hosts or ~/.ssh/config
SSH_KNOWN_HOSTS = [
"atlantis",
"calypso",
"setillo",
"setillo-root",
"nuc",
"homelab-vm",
"rpi5",
"pi-5",
"matrix-ubuntu",
"moon",
"olares",
"guava",
"pve",
"seattle",
"seattle-tailscale",
"gl-mt3000",
"gl-be3600",
"jellyfish",
]
# Filesystem — restrict read/write to safe root paths
FS_ALLOWED_ROOTS = [
Path("/home/homelab"),
Path("/tmp"),
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _portainer(method: str, path: str, **kwargs) -> dict | list:
"""Make a Portainer API request. Raises on HTTP error."""
with httpx.Client(verify=False, timeout=30) as client:
r = client.request(
method,
f"{PORTAINER_URL}/api{path}",
headers={"X-API-Key": PORTAINER_TOKEN},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _resolve_endpoint(endpoint: str) -> int:
"""Resolve endpoint name or numeric string to an endpoint ID."""
if endpoint.isdigit():
return int(endpoint)
ep = endpoint.lower()
if ep not in ENDPOINTS:
raise ValueError(
f"Unknown endpoint '{endpoint}'. Known: {', '.join(ENDPOINTS)}"
)
return ENDPOINTS[ep]
def _gitea(method: str, path: str, **kwargs) -> dict | list:
"""Make a Gitea API request."""
with httpx.Client(timeout=20) as client:
r = client.request(
method,
f"{GITEA_URL}/api/v1{path}",
headers={"Authorization": f"token {GITEA_TOKEN}"},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _arr(
base_url: str, api_key: str, path: str, params: dict | None = None
) -> dict | list:
"""Make a Sonarr/Radarr API request."""
with httpx.Client(timeout=20) as client:
r = client.get(
f"{base_url}/api/v3{path}",
headers={"X-Api-Key": api_key},
params=params or {},
)
r.raise_for_status()
return r.json()
def _sabnzbd(mode: str, extra: dict | None = None) -> dict:
"""Make a SABnzbd API request."""
params = {"apikey": SABNZBD_API_KEY, "output": "json", "mode": mode}
if extra:
params.update(extra)
with httpx.Client(timeout=20) as client:
r = client.get(f"{SABNZBD_URL}/api", params=params)
r.raise_for_status()
return r.json()
def _fs_safe(path: str) -> Path:
"""Resolve a path and verify it's under an allowed root."""
p = Path(path).expanduser().resolve()
for root in FS_ALLOWED_ROOTS:
try:
p.relative_to(root)
return p
except ValueError:
continue
allowed = ", ".join(str(r) for r in FS_ALLOWED_ROOTS)
raise PermissionError(f"Path '{path}' is outside allowed roots: {allowed}")
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP(
"Homelab",
instructions=(
"Tools for managing a homelab running Docker services across multiple hosts.\n\n"
"PORTAINER — Docker orchestration across 5 endpoints:\n"
" Endpoints: atlantis (main NAS, media stack), calypso (secondary NAS), "
"nuc (mini PC), homelab (VM at 192.168.0.210), rpi5 (Raspberry Pi).\n"
" Tools: list_endpoints, list_stacks, get_stack, redeploy_stack, "
"list_containers, get_container_logs, restart_container, start_container, "
"stop_container, list_stack_containers, check_portainer.\n\n"
"GITEA — Self-hosted git at git.vish.gg, org=vish. Repo names can be "
"'vish/homelab' or just 'homelab'.\n"
" Tools: gitea_list_repos, gitea_list_issues, gitea_create_issue, gitea_list_branches.\n\n"
"ADGUARD — DNS rewrite management for split-horizon DNS (Calypso, 192.168.0.250:9080).\n"
" The wildcard *.vish.gg → 100.85.21.51 (matrix-ubuntu) requires specific overrides\n"
" for unproxied services accessed internally (pt.vish.gg, sso.vish.gg, git.vish.gg).\n"
" Tools: adguard_list_rewrites, adguard_add_rewrite, adguard_delete_rewrite.\n\n"
"NPM — Nginx Proxy Manager on matrix-ubuntu (192.168.0.154:81).\n"
" Cert IDs: npm-6=mx.vish.gg, npm-7=livekit.mx.vish.gg, npm-8=*.vish.gg(CF),\n"
" npm-11=pt.vish.gg, npm-12=sso.vish.gg. NEVER reuse an existing npm-N ID.\n"
" Tools: npm_list_proxy_hosts, npm_list_certs, npm_get_proxy_host, npm_update_cert.\n\n"
"HEADSCALE — Self-hosted Tailscale coordination server on Calypso.\n"
" Access via SSH to calypso → docker exec headscale.\n"
" Tools: headscale_list_nodes, headscale_create_preauth_key, headscale_delete_node, "
"headscale_rename_node.\n\n"
"AUTHENTIK — SSO identity provider at sso.vish.gg (Calypso).\n"
" All proxy providers need cookie_domain=vish.gg to avoid redirect loops.\n"
" Embedded outpost PK: 9dcb1d53-a023-4222-a320-d27f66f06eb9.\n"
" To onboard a new service: create_proxy_provider → create_application (auto-binds to outpost).\n"
" Tools: authentik_list_applications, authentik_list_providers, authentik_list_users,\n"
" authentik_update_app_launch_url, authentik_set_provider_cookie_domain,\n"
" authentik_create_proxy_provider, authentik_create_application,\n"
" authentik_list_sessions, authentik_delete_session, authentik_get_events.\n\n"
"CLOUDFLARE — DNS management for vish.gg zone.\n"
" Most *.vish.gg are proxied (orange cloud). Unproxied: mx.vish.gg, headscale.vish.gg,\n"
" livekit.mx.vish.gg, pt.vish.gg, sso.vish.gg, derp*.vish.gg.\n"
" Tools: cloudflare_list_dns_records, cloudflare_create_dns_record,\n"
" cloudflare_delete_dns_record, cloudflare_update_dns_record.\n\n"
"UPTIME KUMA — Monitoring on Pi-5 (100.77.151.40:3001). DB manipulation via SSH+SQLite.\n"
" Always call kuma_restart after adding/modifying monitors.\n"
" Tools: kuma_list_monitors, kuma_list_groups, kuma_add_monitor, kuma_set_parent, "
"kuma_restart.\n\n"
"PROMETHEUS — PromQL queries against homelab metrics (192.168.0.210:9090).\n"
" Tools: prometheus_query, prometheus_targets.\n\n"
"GRAFANA — Dashboard and alert inspection (192.168.0.210:3300).\n"
" Tools: grafana_list_dashboards, grafana_list_alerts.\n\n"
"SONARR/RADARR — Media library and download queue on Atlantis (ports 8989/7878).\n"
" Tools: sonarr_list_series, sonarr_queue, radarr_list_movies, radarr_queue.\n\n"
"SABNZBD — Usenet download queue on Atlantis (port 8080).\n"
" Tools: sabnzbd_queue, sabnzbd_pause, sabnzbd_resume.\n\n"
"SSH — Run commands on homelab hosts. Allowed: atlantis, calypso, setillo, "
"setillo-root, nuc, homelab-vm, rpi5. Requires key auth in ~/.ssh/config.\n"
" Tool: ssh_exec(host, command).\n\n"
"FILESYSTEM — Read/write files on the local machine. "
"Allowed roots: /home/homelab, /tmp.\n"
" Tools: fs_read, fs_write, fs_list.\n\n"
"JELLYFIN — Media server on Olares (LAN 192.168.0.145:30096).\n"
" Tools: jellyfin_libraries, jellyfin_sessions, jellyfin_system_info.\n\n"
"OLARES — Kubernetes (K3s) cluster on Olares (192.168.0.145).\n"
" RTX 5090 GPU, runs Jellyfin, Ollama, vLLM. Access via SSH+kubectl.\n"
" Tools: olares_pods, olares_gpu, olares_pod_logs, olares_restart.\n\n"
"OLLAMA — Local LLM on Olares (LAN 192.168.0.145:31434).\n"
" Model: qwen3-coder:latest. Bypasses Olares auth proxy.\n"
" Tool: ollama_query(prompt, model, max_tokens, temperature).\n\n"
"REPO — Inspect compose files in the homelab git repo at "
"/home/homelab/organized/repos/homelab.\n"
" Tools: list_homelab_services, get_compose_file.\n\n"
"UTILITIES — check_url (HTTP health check), send_notification (ntfy push)."
),
)
# ---------------------------------------------------------------------------
# Portainer — Endpoints
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def list_endpoints() -> str:
"""List all Portainer environments (servers) with their connection status."""
data = _portainer("GET", "/endpoints")
rows = []
for ep in data:
rows.append(
f" {ep['Name']} (id={ep['Id']}) — "
f"status={'online' if ep.get('Status') == 1 else 'offline'}"
f"containers={ep.get('Snapshots', [{}])[0].get('RunningContainerCount', '?')} running"
)
return "Endpoints:\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Portainer — Stacks
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def list_stacks(endpoint: Optional[str] = None) -> str:
"""
List all Portainer stacks with their status.
Args:
endpoint: Optional filter by endpoint name (atlantis, calypso, nuc,
homelab, rpi5) or numeric ID.
"""
data = _portainer("GET", "/stacks")
if endpoint:
ep_id = _resolve_endpoint(endpoint)
data = [s for s in data if s.get("EndpointId") == ep_id]
rows = []
for s in sorted(data, key=lambda x: x["Name"]):
status = "active" if s.get("Status") == 1 else "inactive"
ep_name = next(
(k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")),
str(s.get("EndpointId")),
)
git = s.get("GitConfig", {})
git_info = f" [git: {git.get('ConfigFilePath', '')}]" if git else ""
rows.append(
f" [{s['Id']}] {s['Name']}{status} — endpoint={ep_name}{git_info}"
)
return f"Stacks ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def get_stack(stack_name_or_id: str) -> str:
"""
Get detailed info about a specific stack.
Args:
stack_name_or_id: Stack name (partial match) or numeric ID.
"""
all_stacks = _portainer("GET", "/stacks")
if stack_name_or_id.isdigit():
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
else:
term = stack_name_or_id.lower()
matches = [s for s in all_stacks if term in s["Name"].lower()]
if not matches:
return f"No stack found matching '{stack_name_or_id}'."
if len(matches) > 1:
names = ", ".join(s["Name"] for s in matches)
return f"Multiple matches: {names}. Be more specific."
s = matches[0]
git = s.get("GitConfig") or {}
env = s.get("Env") or []
ep_name = next(
(k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")),
str(s.get("EndpointId")),
)
lines = [
f"Stack: {s['Name']} (id={s['Id']})",
f" Status: {'active' if s.get('Status') == 1 else 'inactive'}",
f" Endpoint: {ep_name} (id={s.get('EndpointId')})",
f" Created: {s.get('CreationDate', 'unknown')}",
f" Updated: {s.get('UpdateDate', 'unknown')}",
]
if git:
lines += [
f" Git URL: {git.get('URL', '')}",
f" Git Branch: {git.get('ReferenceName', '').replace('refs/heads/', '')}",
f" Git File: {git.get('ConfigFilePath', '')}",
]
if env:
lines.append(f" Env vars: {len(env)} set")
return "\n".join(lines)
@mcp.tool()
@_safe
def redeploy_stack(stack_name_or_id: str) -> str:
"""
Trigger a GitOps redeploy of a stack (pull latest from Git and redeploy).
Args:
stack_name_or_id: Stack name (partial match) or numeric ID.
"""
all_stacks = _portainer("GET", "/stacks")
if stack_name_or_id.isdigit():
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
else:
term = stack_name_or_id.lower()
matches = [s for s in all_stacks if term in s["Name"].lower()]
if not matches:
return f"No stack found matching '{stack_name_or_id}'."
if len(matches) > 1:
names = ", ".join(s["Name"] for s in matches)
return f"Multiple matches: {names}. Be more specific."
s = matches[0]
if not s.get("GitConfig"):
return f"Stack '{s['Name']}' is not a GitOps stack — cannot redeploy via git."
ep_id = s["EndpointId"]
stack_id = s["Id"]
_portainer(
"PUT",
f"/stacks/{stack_id}/git/redeploy?endpointId={ep_id}",
json={
"pullImage": True,
"prune": False,
"repositoryAuthentication": True,
"repositoryUsername": "vish",
"repositoryPassword": GITEA_TOKEN,
},
)
return f"Redeploy triggered for stack '{s['Name']}' (id={stack_id}) on endpoint {ep_id}."
# ---------------------------------------------------------------------------
# Portainer — Containers
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def list_containers(
endpoint: str = "atlantis",
all_containers: bool = False,
filter_name: Optional[str] = None,
) -> str:
"""
List containers on a Portainer endpoint.
Args:
endpoint: Endpoint name (atlantis, calypso, nuc, homelab, rpi5) or ID.
all_containers: If True, include stopped containers. Default False (running only).
filter_name: Optional substring to filter container names.
"""
ep_id = _resolve_endpoint(endpoint)
params = "?all=true" if all_containers else ""
data = _portainer("GET", f"/endpoints/{ep_id}/docker/containers/json{params}")
if filter_name:
term = filter_name.lower()
data = [c for c in data if any(term in n.lower() for n in c.get("Names", []))]
rows = []
for c in sorted(data, key=lambda x: x.get("Names", [""])[0]):
name = c.get("Names", ["?"])[0].lstrip("/")
image = c.get("Image", "?").split(":")[0].split("/")[-1]
state = c.get("State", "?")
short_id = c.get("Id", "")[:12]
rows.append(f" {short_id} {state:10s} {name:40s} {image}")
header = f"Containers on {endpoint} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image"
return header + "\n" + "\n".join(rows)
@mcp.tool()
@_safe
def get_container_logs(
container_id_or_name: str,
endpoint: str = "atlantis",
tail: int = 100,
) -> str:
"""
Get recent logs from a container.
Args:
container_id_or_name: Container ID (short or full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
tail: Number of log lines to return. Default: 100.
"""
ep_id = _resolve_endpoint(endpoint)
# Resolve by name if not a hex ID
if not all(c in "0123456789abcdefABCDEF" for c in container_id_or_name):
containers = _portainer(
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
)
term = container_id_or_name.lower()
matches = [
c for c in containers if any(term in n.lower() for n in c.get("Names", []))
]
if not matches:
return (
f"No container found matching '{container_id_or_name}' on {endpoint}."
)
if len(matches) > 1:
names = ", ".join(c["Names"][0].lstrip("/") for c in matches)
return f"Multiple matches: {names}. Be more specific."
container_id_or_name = matches[0]["Id"][:12]
with httpx.Client(verify=False, timeout=30) as client:
r = client.get(
f"{PORTAINER_URL}/api/endpoints/{ep_id}/docker/containers/{container_id_or_name}/logs",
headers={"X-API-Key": PORTAINER_TOKEN},
params={"stdout": 1, "stderr": 1, "tail": tail, "timestamps": 0},
)
r.raise_for_status()
# Docker log stream has 8-byte header per line; strip it
raw = r.content
lines = []
i = 0
while i < len(raw):
if i + 8 > len(raw):
break
size = int.from_bytes(raw[i + 4 : i + 8], "big")
line = raw[i + 8 : i + 8 + size].decode("utf-8", errors="replace").rstrip()
if line:
lines.append(line)
i += 8 + size
if not lines:
# fallback: treat as plain text
lines = r.text.splitlines()
return "\n".join(lines[-tail:])
@mcp.tool()
@_safe
def restart_container(
container_id_or_name: str,
endpoint: str = "atlantis",
) -> str:
"""
Restart a container.
Args:
container_id_or_name: Container ID (short/full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
"""
ep_id = _resolve_endpoint(endpoint)
cid = _resolve_container_id(container_id_or_name, ep_id)
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/restart")
return f"Restarted container {cid} on {endpoint}."
@mcp.tool()
@_safe
def stop_container(
container_id_or_name: str,
endpoint: str = "atlantis",
) -> str:
"""
Stop a running container.
Args:
container_id_or_name: Container ID (short/full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
"""
ep_id = _resolve_endpoint(endpoint)
cid = _resolve_container_id(container_id_or_name, ep_id)
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/stop")
return f"Stopped container {cid} on {endpoint}."
@mcp.tool()
@_safe
def start_container(
container_id_or_name: str,
endpoint: str = "atlantis",
) -> str:
"""
Start a stopped container.
Args:
container_id_or_name: Container ID (short/full) or name substring.
endpoint: Endpoint name or ID. Default: atlantis.
"""
ep_id = _resolve_endpoint(endpoint)
cid = _resolve_container_id(container_id_or_name, ep_id)
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/start")
return f"Started container {cid} on {endpoint}."
def _resolve_container_id(name_or_id: str, ep_id: int) -> str:
"""Resolve a container name substring to a short container ID."""
if len(name_or_id) >= 12 and all(c in "0123456789abcdefABCDEF" for c in name_or_id):
return name_or_id[:12]
containers = _portainer(
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
)
term = name_or_id.lower()
matches = [
c for c in containers if any(term in n.lower() for n in c.get("Names", []))
]
if not matches:
raise ValueError(f"No container found matching '{name_or_id}'.")
if len(matches) > 1:
names = ", ".join(c["Names"][0].lstrip("/") for c in matches)
raise ValueError(
f"Multiple containers match '{name_or_id}': {names}. Be more specific."
)
return matches[0]["Id"][:12]
# ---------------------------------------------------------------------------
# Portainer — Stack containers (convenience)
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def list_stack_containers(stack_name_or_id: str) -> str:
"""
List all containers belonging to a specific stack.
Args:
stack_name_or_id: Stack name (partial match) or numeric ID.
"""
all_stacks = _portainer("GET", "/stacks")
if stack_name_or_id.isdigit():
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
else:
term = stack_name_or_id.lower()
matches = [s for s in all_stacks if term in s["Name"].lower()]
if not matches:
return f"No stack found matching '{stack_name_or_id}'."
if len(matches) > 1:
names = ", ".join(s["Name"] for s in matches)
return f"Multiple matches: {names}. Be more specific."
s = matches[0]
ep_id = s["EndpointId"]
stack_name = s["Name"]
containers = _portainer(
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
)
# Filter by compose project label (Portainer uses com.docker.compose.project)
stack_containers = [
c
for c in containers
if c.get("Labels", {}).get("com.docker.compose.project", "").lower()
== stack_name.lower()
or any(stack_name.lower() in n.lower() for n in c.get("Names", []))
]
ep_name = next((k for k, v in ENDPOINTS.items() if v == ep_id), str(ep_id))
rows = []
for c in sorted(stack_containers, key=lambda x: x.get("Names", [""])[0]):
name = c.get("Names", ["?"])[0].lstrip("/")
state = c.get("State", "?")
short_id = c.get("Id", "")[:12]
image = c.get("Image", "?").split(":")[0].split("/")[-1]
rows.append(f" {short_id} {state:10s} {name:40s} {image}")
header = f"Containers in stack '{stack_name}' on {ep_name} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image"
return header + "\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Health checks
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def check_url(url: str, expected_status: int = 200) -> str:
"""
Perform an HTTP health check on a URL.
Args:
url: The URL to check (e.g. http://192.168.0.200:9443/api/status).
expected_status: Expected HTTP status code. Default: 200.
"""
try:
with httpx.Client(verify=False, timeout=10, follow_redirects=True) as client:
r = client.get(url)
ok = r.status_code == expected_status
return (
f"{'OK' if ok else 'FAIL'} {url}\n"
f" Status: {r.status_code} (expected {expected_status})\n"
f" Latency: {r.elapsed.total_seconds() * 1000:.0f}ms"
)
except Exception as e:
return f"ERROR {url}\n {type(e).__name__}: {e}"
@mcp.tool()
@_safe
def check_portainer() -> str:
"""Quick health check of the Portainer API and summary of infrastructure."""
try:
status = _portainer("GET", "/status")
stacks = _portainer("GET", "/stacks")
stacks_list = stacks if isinstance(stacks, list) else []
status_dict = status if isinstance(status, dict) else {}
active = sum(
1 for s in stacks_list if isinstance(s, dict) and s.get("Status") == 1
)
return (
f"Portainer OK — version {status_dict.get('Version', '?')}\n"
f" Stacks: {len(stacks_list)} total, {active} active"
)
except Exception as e:
return f"Portainer UNREACHABLE: {e}"
# ---------------------------------------------------------------------------
# Repo — service inspection
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def list_homelab_services(host_filter: Optional[str] = None) -> str:
"""
List all services/stacks defined in the homelab repository.
Args:
host_filter: Optional substring to filter by host/path (e.g. 'atlantis', 'calypso', 'seattle').
"""
compose_files = list(REPO_PATH.rglob("docker-compose.yml")) + list(
REPO_PATH.rglob("docker-compose.yaml")
)
# Exclude archive
compose_files = [f for f in compose_files if "archive" not in f.parts]
rows = []
for f in sorted(compose_files):
rel = f.relative_to(REPO_PATH)
if host_filter and host_filter.lower() not in str(rel).lower():
continue
rows.append(f" {rel}")
return f"Compose files ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def get_compose_file(service_path: str) -> str:
"""
Read a compose file from the homelab repo.
Args:
service_path: Relative path within the repo, e.g.
'hosts/synology/atlantis/arr-suite/docker-compose.yml'
or a partial name like 'atlantis/arr-suite'.
"""
# Try exact relative path first
candidate = REPO_PATH / service_path
if candidate.is_file():
return candidate.read_text()
# Try fuzzy: find compose files whose path contains the fragment
# Searches docker-compose.yml/yaml AND standalone *.yaml/*.yml stack files
term = service_path.lower().replace("\\", "/")
all_compose = (
list(REPO_PATH.rglob("docker-compose.yml"))
+ list(REPO_PATH.rglob("docker-compose.yaml"))
+ list(REPO_PATH.rglob("*.yaml"))
+ list(REPO_PATH.rglob("*.yml"))
)
hits = [
f
for f in all_compose
if term in str(f.relative_to(REPO_PATH)).lower()
and "archive" not in f.parts
and ".git" not in f.parts
and "node_modules" not in f.parts
]
# Prefer docker-compose files if both match
compose_hits = [f for f in hits if "docker-compose" in f.name]
if compose_hits:
hits = compose_hits
if not hits:
return f"No compose file found matching '{service_path}'."
if len(hits) > 1:
paths = "\n".join(f" {f.relative_to(REPO_PATH)}" for f in hits)
return f"Multiple matches:\n{paths}\nBe more specific."
return hits[0].read_text()
# ---------------------------------------------------------------------------
# Notifications
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def send_notification(
message: str,
title: str = "Homelab",
topic: str = "homelab-alerts",
priority: str = "default",
tags: Optional[str] = None,
) -> str:
"""
Send a push notification via ntfy.
Args:
message: The notification body.
title: Notification title. Default: 'Homelab'.
topic: ntfy topic to publish to. Default: 'homelab-alerts'.
priority: urgent, high, default, low, or min. Default: 'default'.
tags: Comma-separated emoji tags e.g. 'warning,robot'. Optional.
"""
headers = {
"Title": title,
"Priority": priority,
}
if tags:
headers["Tags"] = tags
with httpx.Client(timeout=10) as client:
r = client.post(
f"{NTFY_BASE}/{topic}",
content=message.encode(),
headers=headers,
)
r.raise_for_status()
return f"Notification sent to {NTFY_BASE}/{topic}'{title}: {message}'"
# ---------------------------------------------------------------------------
# Gitea
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def gitea_list_repos(owner: Optional[str] = None, limit: int = 50) -> str:
"""
List Gitea repositories.
Args:
owner: User or org name. Defaults to the service account's accessible repos.
limit: Max repos to return. Default: 50.
"""
if owner:
data = _gitea("GET", f"/repos/search", params={"owner": owner, "limit": limit})
repos = data.get("data", []) if isinstance(data, dict) else data
else:
repos = _gitea(
"GET", f"/repos/search", params={"limit": limit, "token": GITEA_TOKEN}
)
repos = repos.get("data", []) if isinstance(repos, dict) else repos
rows = []
for r in repos:
archived = " [archived]" if r.get("archived") else ""
private = " [private]" if r.get("private") else ""
rows.append(
f" {r['full_name']}{private}{archived}"
f"{r.get('stars_count', 0)} "
f"updated: {r.get('updated', '')[:10]}"
)
return f"Repos ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def gitea_list_issues(
repo: str,
state: str = "open",
limit: int = 20,
) -> str:
"""
List issues for a Gitea repository.
Args:
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
state: 'open', 'closed', or 'all'. Default: 'open'.
limit: Max issues to return. Default: 20.
"""
if "/" not in repo:
repo = f"{GITEA_ORG}/{repo}"
data = _gitea(
"GET",
f"/repos/{repo}/issues",
params={"state": state, "type": "issues", "limit": limit},
)
if not data:
return f"No {state} issues in {repo}."
rows = []
for issue in data:
labels = ", ".join(l["name"] for l in issue.get("labels", []))
label_str = f" [{labels}]" if labels else ""
rows.append(
f" #{issue['number']} {issue['title']}{label_str} — @{issue['user']['login']}"
)
return f"{state.capitalize()} issues in {repo} ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def gitea_create_issue(repo: str, title: str, body: str = "") -> str:
"""
Create a new issue in a Gitea repository.
Args:
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
title: Issue title.
body: Issue body/description. Optional.
"""
if "/" not in repo:
repo = f"{GITEA_ORG}/{repo}"
data = _gitea("POST", f"/repos/{repo}/issues", json={"title": title, "body": body})
if isinstance(data, dict):
return f"Created issue #{data.get('number')}: {data.get('title')}\n URL: {data.get('html_url')}"
return f"Issue created: {data}"
@mcp.tool()
@_safe
def gitea_list_branches(repo: str) -> str:
"""
List branches for a Gitea repository.
Args:
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
"""
if "/" not in repo:
repo = f"{GITEA_ORG}/{repo}"
data = _gitea("GET", f"/repos/{repo}/branches")
rows = [
f" {b['name']}" + (" [default]" if b.get("is_default") else "") for b in data
]
return f"Branches in {repo} ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Prometheus
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def prometheus_query(query: str) -> str:
"""
Run an instant PromQL query.
Args:
query: PromQL expression e.g. 'up', 'node_memory_MemAvailable_bytes{job="node"}'.
"""
with httpx.Client(timeout=20) as client:
r = client.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query})
r.raise_for_status()
data = r.json()
if data.get("status") != "success":
return f"Query failed: {data.get('error', 'unknown error')}"
results = data["data"]["result"]
if not results:
return f"No results for: {query}"
rows = []
for item in results[:50]: # cap output
metric = item["metric"]
value = item["value"][1] if item.get("value") else "?"
label_str = ", ".join(
f'{k}="{v}"' for k, v in metric.items() if k != "__name__"
)
name = metric.get("__name__", query)
rows.append(f" {name}{{{label_str}}} = {value}")
return f"Results ({len(results)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def prometheus_targets() -> str:
"""List all Prometheus scrape targets and their health status."""
with httpx.Client(timeout=20) as client:
r = client.get(f"{PROMETHEUS_URL}/api/v1/targets")
r.raise_for_status()
data = r.json()
active = data["data"].get("activeTargets", [])
rows = []
for t in sorted(active, key=lambda x: x.get("labels", {}).get("job", "")):
job = t.get("labels", {}).get("job", "?")
instance = t.get("labels", {}).get("instance", "?")
health = t.get("health", "?")
last_scrape = t.get("lastScrapeDuration", 0)
rows.append(
f" {'' if health == 'up' else ''} {job:30s} {instance:40s} {health}"
)
return f"Prometheus targets ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Grafana
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def grafana_list_dashboards() -> str:
"""List all Grafana dashboards."""
with httpx.Client(timeout=20) as client:
r = client.get(
f"{GRAFANA_URL}/api/search",
params={"type": "dash-db"},
auth=(GRAFANA_USER, GRAFANA_PASS),
)
r.raise_for_status()
data = r.json()
rows = []
for d in data:
folder = d.get("folderTitle", "General")
rows.append(f" [{d['uid']:20s}] {d['title']:50s} folder={folder}")
return f"Dashboards ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def grafana_list_alerts() -> str:
"""List Grafana alert rules and their current state."""
with httpx.Client(timeout=20) as client:
r = client.get(
f"{GRAFANA_URL}/api/v1/provisioning/alert-rules",
auth=(GRAFANA_USER, GRAFANA_PASS),
)
r.raise_for_status()
data = r.json()
if not data:
return "No alert rules configured."
rows = []
for rule in data:
rows.append(f" {rule.get('title', '?'):50s} uid={rule.get('uid', '?')}")
return f"Alert rules ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Sonarr
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def sonarr_list_series(filter_name: Optional[str] = None) -> str:
"""
List all series in Sonarr.
Args:
filter_name: Optional substring to filter by series title.
"""
data = _arr(SONARR_URL, SONARR_API_KEY, "/series")
if filter_name:
term = filter_name.lower()
data = [s for s in data if term in s.get("title", "").lower()]
rows = []
for s in sorted(data, key=lambda x: x.get("sortTitle", "")):
status = s.get("status", "?")
monitored = "" if s.get("monitored") else ""
ep_count = s.get("episodeCount", 0)
rows.append(f" {monitored} {s['title']:50s} {status:12s} {ep_count} eps")
return f"Series ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def sonarr_queue() -> str:
"""Show the Sonarr download queue."""
data = _arr(SONARR_URL, SONARR_API_KEY, "/queue")
records = data.get("records", []) if isinstance(data, dict) else data
if not records:
return "Sonarr queue is empty."
rows = []
for item in records:
title = item.get("title", "?")[:60]
status = item.get("status", "?")
size = item.get("size", 0)
sizeleft = item.get("sizeleft", 0)
pct = int((1 - sizeleft / size) * 100) if size else 0
rows.append(f" {status:12s} {pct:3d}% {title}")
return f"Sonarr queue ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# Radarr
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def radarr_list_movies(filter_name: Optional[str] = None) -> str:
"""
List all movies in Radarr.
Args:
filter_name: Optional substring to filter by movie title.
"""
data = _arr(RADARR_URL, RADARR_API_KEY, "/movie")
if filter_name:
term = filter_name.lower()
data = [m for m in data if term in m.get("title", "").lower()]
rows = []
for m in sorted(data, key=lambda x: x.get("sortTitle", "")):
monitored = "" if m.get("monitored") else ""
downloaded = "" if m.get("hasFile") else " "
year = m.get("year", "?")
rows.append(f" {monitored}{downloaded} {m['title']:50s} {year}")
return f"Movies ({len(rows)}):\n" + "\n".join(rows)
@mcp.tool()
@_safe
def radarr_queue() -> str:
"""Show the Radarr download queue."""
data = _arr(RADARR_URL, RADARR_API_KEY, "/queue")
records = data.get("records", []) if isinstance(data, dict) else data
if not records:
return "Radarr queue is empty."
rows = []
for item in records:
title = item.get("title", "?")[:60]
status = item.get("status", "?")
size = item.get("size", 0)
sizeleft = item.get("sizeleft", 0)
pct = int((1 - sizeleft / size) * 100) if size else 0
rows.append(f" {status:12s} {pct:3d}% {title}")
return f"Radarr queue ({len(rows)}):\n" + "\n".join(rows)
# ---------------------------------------------------------------------------
# SABnzbd
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def sabnzbd_queue() -> str:
"""Show the SABnzbd download queue."""
data = _sabnzbd("queue")
queue = data.get("queue", {})
slots = queue.get("slots", [])
if not slots:
return f"SABnzbd queue empty. Status: {queue.get('status', '?')}"
rows = []
for s in slots:
name = s.get("filename", "?")[:60]
status = s.get("status", "?")
pct = s.get("percentage", "0")
size = s.get("sizeleft", "?")
rows.append(f" {status:12s} {pct:>4s}% {size:>8s} left {name}")
speed = queue.get("speed", "0")
eta = queue.get("timeleft", "?")
return f"SABnzbd queue ({len(rows)}) — {speed} — ETA {eta}:\n" + "\n".join(rows)
@mcp.tool()
@_safe
def sabnzbd_pause() -> str:
"""Pause the SABnzbd download queue."""
_sabnzbd("pause")
return "SABnzbd queue paused."
@mcp.tool()
@_safe
def sabnzbd_resume() -> str:
"""Resume the SABnzbd download queue."""
_sabnzbd("resume")
return "SABnzbd queue resumed."
# ---------------------------------------------------------------------------
# SSH
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def ssh_exec(host: str, command: str, timeout: int = 30) -> str:
"""
Run a command on a homelab host via SSH.
Known hosts: atlantis, calypso, setillo, setillo-root, nuc, homelab-vm, rpi5.
Requires the host to be in ~/.ssh/config or /etc/hosts.
Args:
host: SSH host alias (e.g. 'atlantis', 'calypso', 'setillo-root').
command: Shell command to execute remotely.
timeout: Seconds before the command times out. Default: 30.
"""
if host not in SSH_KNOWN_HOSTS:
return (
f"Host '{host}' not in allowed list.\n"
f"Known hosts: {', '.join(SSH_KNOWN_HOSTS)}"
)
try:
result = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", host, command],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout
if result.stderr:
output += f"\n[stderr]\n{result.stderr}"
if result.returncode != 0:
output += f"\n[exit code {result.returncode}]"
return output or "(no output)"
except subprocess.TimeoutExpired:
return f"Command timed out after {timeout}s."
except Exception as e:
return f"SSH error: {type(e).__name__}: {e}"
# ---------------------------------------------------------------------------
# Filesystem
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def fs_read(path: str) -> str:
"""
Read a file from the local filesystem.
Allowed roots: /home/homelab, /tmp.
Args:
path: Absolute or ~-relative path to the file.
"""
try:
p = _fs_safe(path)
if not p.exists():
return f"File not found: {p}"
if p.is_dir():
return f"'{p}' is a directory. Use fs_list to list it."
size = p.stat().st_size
if size > 1_000_000:
return f"File too large ({size:,} bytes). Read it in parts or use grep."
return p.read_text(errors="replace")
except PermissionError as e:
return f"Permission denied: {e}"
except Exception as e:
return f"Error reading file: {e}"
@mcp.tool()
@_safe
def fs_write(path: str, content: str) -> str:
"""
Write content to a file on the local filesystem.
Allowed roots: /home/homelab, /tmp.
Args:
path: Absolute or ~-relative path to the file.
content: Text content to write.
"""
try:
p = _fs_safe(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return f"Written {len(content)} bytes to {p}"
except PermissionError as e:
return f"Permission denied: {e}"
except Exception as e:
return f"Error writing file: {e}"
@mcp.tool()
@_safe
def fs_list(path: str = "/home/homelab") -> str:
"""
List the contents of a directory on the local filesystem.
Allowed roots: /home/homelab, /tmp.
Args:
path: Directory path. Default: /home/homelab.
"""
try:
p = _fs_safe(path)
if not p.exists():
return f"Path not found: {p}"
if not p.is_dir():
return f"'{p}' is a file, not a directory."
entries = sorted(p.iterdir(), key=lambda x: (x.is_file(), x.name))
rows = []
for entry in entries:
kind = "DIR " if entry.is_dir() else "FILE"
size = entry.stat().st_size if entry.is_file() else ""
size_str = f"{size:>10,}" if size != "" else f"{'':>10}"
rows.append(f" {kind} {size_str} {entry.name}")
return f"Contents of {p} ({len(rows)} entries):\n" + "\n".join(rows)
except PermissionError as e:
return f"Permission denied: {e}"
except Exception as e:
return f"Error listing directory: {e}"
# ---------------------------------------------------------------------------
# Helper functions for new tools
# ---------------------------------------------------------------------------
_adguard_session_cache: dict = {"client": None}
def _adguard(method: str, path: str, **kwargs) -> dict | list:
"""Make an AdGuard API request with cached session cookie."""
client = _adguard_session_cache.get("client")
if client is None:
client = httpx.Client(timeout=15)
client.post(
f"{ADGUARD_URL}/control/login",
json={"name": ADGUARD_USER, "password": ADGUARD_PASS},
).raise_for_status()
_adguard_session_cache["client"] = client
try:
r = client.request(method, f"{ADGUARD_URL}/control{path}", **kwargs)
if r.status_code == 403:
# Session expired, re-login
client.post(
f"{ADGUARD_URL}/control/login",
json={"name": ADGUARD_USER, "password": ADGUARD_PASS},
).raise_for_status()
r = client.request(method, f"{ADGUARD_URL}/control{path}", **kwargs)
r.raise_for_status()
if r.content and r.headers.get("content-type", "").startswith("application/json"):
return r.json()
return {}
except httpx.HTTPError:
# Reset session on error
_adguard_session_cache["client"] = None
raise
_npm_token_cache: dict = {"token": None}
def _npm_token() -> str:
"""Get NPM API token (cached)."""
if _npm_token_cache["token"]:
return _npm_token_cache["token"]
with httpx.Client(timeout=10) as client:
r = client.post(
f"{NPM_URL}/api/tokens",
json={"identity": NPM_USER, "secret": NPM_PASS},
)
r.raise_for_status()
_npm_token_cache["token"] = r.json()["token"]
return _npm_token_cache["token"]
def _npm(method: str, path: str, **kwargs) -> dict | list:
"""Make an NPM API request with cached token."""
token = _npm_token()
with httpx.Client(timeout=15) as client:
r = client.request(
method,
f"{NPM_URL}/api{path}",
headers={"Authorization": f"Bearer {token}"},
**kwargs,
)
if r.status_code == 401:
# Token expired, refresh
_npm_token_cache["token"] = None
token = _npm_token()
r = client.request(
method,
f"{NPM_URL}/api{path}",
headers={"Authorization": f"Bearer {token}"},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _authentik(method: str, path: str, **kwargs) -> dict | list:
"""Make an Authentik API request."""
with httpx.Client(timeout=15, verify=False) as client:
r = client.request(
method,
f"{AUTHENTIK_URL}/api/v3{path}",
headers={"Authorization": f"Bearer {AUTHENTIK_TOKEN}"},
**kwargs,
)
r.raise_for_status()
if r.content:
return r.json()
return {}
def _cloudflare(method: str, path: str, **kwargs) -> dict:
"""Make a Cloudflare API request."""
with httpx.Client(timeout=15) as client:
r = client.request(
method,
f"https://api.cloudflare.com/client/v4{path}",
headers={"Authorization": f"Bearer {CLOUDFLARE_TOKEN}"},
**kwargs,
)
r.raise_for_status()
return r.json()
# ---------------------------------------------------------------------------
# AdGuard tools
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def adguard_list_rewrites() -> str:
"""
List all DNS rewrite rules in AdGuard Home (Calypso).
Returns domain → answer mappings. Useful for checking split-horizon DNS
overrides (e.g. which *.vish.gg services are pinned to specific IPs).
"""
try:
data = _adguard("GET", "/rewrite/list")
if not data:
return "No rewrite rules found."
lines = [f" {r['domain']:45s}{r['answer']}" for r in data] # type: ignore
return f"AdGuard DNS rewrites ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def adguard_add_rewrite(domain: str, answer: str) -> str:
"""
Add a DNS rewrite rule to AdGuard Home.
Use this to add split-horizon DNS overrides so internal services
resolve to LAN/Tailscale IPs instead of public ones.
Args:
domain: The domain to override (e.g. 'pt.vish.gg' or '*.example.com').
answer: The IP address to return (e.g. '192.168.0.154').
"""
try:
_adguard("POST", "/rewrite/add", json={"domain": domain, "answer": answer})
return f"Added rewrite: {domain}{answer}"
except Exception as e:
return f"Error adding rewrite: {e}"
@mcp.tool()
@_safe
def adguard_delete_rewrite(domain: str, answer: str) -> str:
"""
Delete a DNS rewrite rule from AdGuard Home.
Args:
domain: The domain of the rule to delete.
answer: The answer IP of the rule to delete.
"""
try:
_adguard("POST", "/rewrite/delete", json={"domain": domain, "answer": answer})
return f"Deleted rewrite: {domain}{answer}"
except Exception as e:
return f"Error deleting rewrite: {e}"
# ---------------------------------------------------------------------------
# NPM (Nginx Proxy Manager) tools
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def npm_list_proxy_hosts(filter_domain: Optional[str] = None) -> str:
"""
List all proxy hosts in Nginx Proxy Manager (matrix-ubuntu).
Args:
filter_domain: Optional substring to filter by domain name.
"""
try:
token = _npm_token()
hosts = _npm("GET", "/nginx/proxy-hosts", token=token)
results = []
for h in hosts: # type: ignore
domains = ", ".join(h.get("domain_names", []))
if filter_domain and filter_domain.lower() not in domains.lower():
continue
fwd = f"{h.get('forward_scheme')}://{h.get('forward_host')}:{h.get('forward_port')}"
cert = h.get("certificate_id", 0)
enabled = "" if h.get("enabled") else ""
results.append(
f" [{enabled}] ID:{h['id']:3d} {domains:45s}{fwd:35s} cert:{cert}"
)
if not results:
return "No proxy hosts found."
return f"NPM proxy hosts ({len(results)}):\n" + "\n".join(results)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def npm_list_certs() -> str:
"""
List all SSL certificates in Nginx Proxy Manager with their domains and expiry.
"""
try:
token = _npm_token()
certs = _npm("GET", "/nginx/certificates", token=token)
lines = []
for c in certs: # type: ignore
domains = ", ".join(c.get("domain_names", []))
provider = c.get("provider", "?")
expires = (c.get("expires_on") or "?")[:10]
nice = c.get("nice_name", "")
lines.append(
f" ID:{c['id']:3d} [{provider:12s}] expires:{expires} {nice or domains}"
)
return f"NPM certificates ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def npm_get_proxy_host(host_id: int) -> str:
"""
Get details of a specific NPM proxy host including advanced config and cert.
Args:
host_id: The proxy host ID (from npm_list_proxy_hosts).
"""
try:
token = _npm_token()
h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token)
lines = [
f"ID: {h['id']}", # type: ignore
f"Domains: {', '.join(h['domain_names'])}", # type: ignore
f"Forward: {h['forward_scheme']}://{h['forward_host']}:{h['forward_port']}", # type: ignore
f"SSL forced: {h.get('ssl_forced')}", # type: ignore
f"Certificate ID: {h.get('certificate_id')}", # type: ignore
f"Websocket: {h.get('allow_websocket_upgrade')}", # type: ignore
f"Enabled: {h.get('enabled')}", # type: ignore
]
adv = h.get("advanced_config", "").strip() # type: ignore
if adv:
lines.append(f"Advanced config:\n{adv}")
meta = h.get("meta", {}) # type: ignore
if isinstance(meta, dict) and meta.get("nginx_err"):
lines.append(f"nginx_err: {meta['nginx_err']}")
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def npm_update_cert(host_id: int, certificate_id: int) -> str:
"""
Update the SSL certificate used by an NPM proxy host.
Args:
host_id: The proxy host ID.
certificate_id: The certificate ID to assign (from npm_list_certs).
"""
try:
token = _npm_token()
h = _npm("GET", f"/nginx/proxy-hosts/{host_id}", token=token)
h_dict = h if isinstance(h, dict) else {}
payload = {
k: h_dict.get(k)
for k in [
"domain_names",
"forward_scheme",
"forward_host",
"forward_port",
"access_list_id",
"ssl_forced",
"caching_enabled",
"block_exploits",
"advanced_config",
"meta",
"allow_websocket_upgrade",
"http2_support",
"hsts_enabled",
"hsts_subdomains",
"locations",
]
}
payload["certificate_id"] = certificate_id
result = _npm("PUT", f"/nginx/proxy-hosts/{host_id}", token=token, json=payload)
return f"Updated host {host_id} ({', '.join(result.get('domain_names', []))}) to cert {certificate_id}" # type: ignore
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Headscale tools
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def headscale_list_nodes() -> str:
"""
List all nodes registered in the Headscale tailnet.
Shows node ID, hostname, Tailscale IP, online status, and expiry.
"""
try:
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
"sudo /usr/local/bin/docker exec headscale headscale nodes list --output json",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
nodes = json.loads(result.stdout)
lines = []
for n in nodes:
ips = ", ".join(n.get("ip_addresses", []))
online = "🟢" if n.get("online") else "🔴"
name = n.get("given_name") or n.get("name", "?")
lines.append(f" {online} ID:{str(n['id']):3s} {name:25s} {ips}")
return f"Headscale nodes ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def headscale_create_preauth_key(
expiration: str = "24h",
reusable: bool = False,
ephemeral: bool = False,
) -> str:
"""
Create a Headscale pre-authentication key for registering a new node.
Args:
expiration: Key expiry duration e.g. '24h', '7d'. Default: '24h'.
reusable: Allow multiple nodes to use this key. Default: False.
ephemeral: Node is removed when it goes offline. Default: False.
"""
try:
flags = f"--expiration {expiration}"
if reusable:
flags += " --reusable"
if ephemeral:
flags += " --ephemeral"
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
f"sudo /usr/local/bin/docker exec headscale headscale preauthkeys create --user 1 {flags} --output json",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
data = json.loads(result.stdout)
key = data.get("key", "?")
exp = data.get("expiration", "?")
return (
f"Pre-auth key created:\n"
f" Key: {key}\n"
f" Expires: {exp}\n"
f" Reusable: {reusable}, Ephemeral: {ephemeral}\n\n"
f"Use on new node:\n"
f" tailscale up --login-server=https://headscale.vish.gg:8443 --authkey={key} --accept-routes=false"
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def headscale_delete_node(node_id: str) -> str:
"""
Delete a node from the Headscale tailnet.
Args:
node_id: The numeric node ID (from headscale_list_nodes).
"""
try:
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
f"sudo /usr/local/bin/docker exec headscale headscale nodes delete --identifier {node_id} --output json",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
return f"Node {node_id} deleted successfully."
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def headscale_rename_node(node_id: str, new_name: str) -> str:
"""
Rename a node in the Headscale tailnet.
Args:
node_id: The numeric node ID.
new_name: The new hostname/givenName.
"""
try:
result = subprocess.run(
[
"ssh",
HEADSCALE_CALYPSO_SSH,
f"sudo /usr/local/bin/docker exec headscale headscale nodes rename --identifier {node_id} {new_name}",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
return f"Node {node_id} renamed to '{new_name}'."
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Authentik tools
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def authentik_list_applications() -> str:
"""
List all applications configured in Authentik SSO.
"""
try:
data = _authentik("GET", "/core/applications/?page_size=100")
apps = data.get("results", []) # type: ignore
lines = []
for a in apps:
slug = a.get("slug", "?")
name = a.get("name", "?")
provider = a.get("provider")
launch = a.get("meta_launch_url") or ""
lines.append(f" {slug:30s} {name:35s} provider:{provider} {launch}")
return f"Authentik applications ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_list_providers() -> str:
"""
List all OAuth2/OIDC/proxy providers in Authentik.
"""
try:
data = _authentik("GET", "/providers/all/?page_size=100")
providers = data.get("results", []) # type: ignore
lines = []
for p in providers:
pk = p.get("pk")
name = p.get("name", "?")
component = p.get("component", "?").replace("ak-provider-", "")
lines.append(f" PK:{pk:4} [{component:20s}] {name}")
return f"Authentik providers ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_list_users() -> str:
"""
List all users in Authentik.
"""
try:
data = _authentik("GET", "/core/users/?page_size=100")
users = data.get("results", []) # type: ignore
lines = []
for u in users:
pk = u.get("pk")
username = u.get("username", "?")
email = u.get("email", "?")
active = "" if u.get("is_active") else ""
lines.append(f" [{active}] PK:{pk:4} {username:20s} {email}")
return f"Authentik users ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_update_app_launch_url(slug: str, launch_url: str) -> str:
"""
Update the launch URL of an Authentik application (the link shown on the dashboard).
Args:
slug: The application slug (from authentik_list_applications).
launch_url: The new launch URL e.g. 'https://pt.vish.gg'.
"""
try:
result = _authentik(
"PATCH", f"/core/applications/{slug}/", json={"meta_launch_url": launch_url}
)
return f"Updated '{slug}' launch URL to: {result.get('meta_launch_url')}" # type: ignore
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_set_provider_cookie_domain(provider_pk: int, cookie_domain: str) -> str:
"""
Set the cookie domain on an Authentik proxy provider.
Required to prevent redirect loops with Forward Auth.
Args:
provider_pk: The provider PK (from authentik_list_providers).
cookie_domain: Cookie domain e.g. 'vish.gg'.
"""
try:
provider = _authentik("GET", f"/providers/proxy/{provider_pk}/")
provider["cookie_domain"] = cookie_domain # type: ignore
result = _authentik("PUT", f"/providers/proxy/{provider_pk}/", json=provider)
return f"Provider {provider_pk} cookie_domain set to: {result.get('cookie_domain')}" # type: ignore
except Exception as e:
return f"Error: {e}"
AUTHENTIK_OUTPOST_PK = "9dcb1d53-a023-4222-a320-d27f66f06eb9"
AUTHENTIK_DEFAULT_AUTH_FLOW = "default-provider-authorization-implicit-consent"
AUTHENTIK_DEFAULT_INVALIDATION_FLOW = "default-provider-invalidation-flow"
@mcp.tool()
@_safe
def authentik_create_proxy_provider(
name: str,
external_host: str,
mode: str = "forward_single",
cookie_domain: str = "vish.gg",
) -> str:
"""
Create a proxy provider in Authentik and bind it to the embedded outpost.
Args:
name: Provider name, e.g. 'Grafana Forward Auth'.
external_host: External URL, e.g. 'https://grafana.vish.gg'.
mode: Proxy mode — 'forward_single' (default, for NPM forward auth) or 'forward_domain'.
cookie_domain: Cookie domain to prevent redirect loops. Default 'vish.gg'.
"""
try:
# Resolve authorization + invalidation flow slugs to PKs
auth_flows = _authentik("GET", f"/flows/instances/?slug={AUTHENTIK_DEFAULT_AUTH_FLOW}")
auth_results = auth_flows.get("results", []) # type: ignore
if not auth_results:
return f"Error: authorization flow '{AUTHENTIK_DEFAULT_AUTH_FLOW}' not found"
auth_flow_pk = auth_results[0]["pk"]
inval_flows = _authentik("GET", f"/flows/instances/?slug={AUTHENTIK_DEFAULT_INVALIDATION_FLOW}")
inval_results = inval_flows.get("results", []) # type: ignore
if not inval_results:
return f"Error: invalidation flow '{AUTHENTIK_DEFAULT_INVALIDATION_FLOW}' not found"
inval_flow_pk = inval_results[0]["pk"]
provider = _authentik("POST", "/providers/proxy/", json={
"name": name,
"authorization_flow": auth_flow_pk,
"invalidation_flow": inval_flow_pk,
"external_host": external_host,
"mode": mode,
"cookie_domain": cookie_domain,
})
pk = provider.get("pk") # type: ignore
# Bind to embedded outpost
outpost = _authentik("GET", f"/outposts/instances/{AUTHENTIK_OUTPOST_PK}/")
providers = outpost.get("providers", []) # type: ignore
if pk not in providers:
providers.append(pk)
_authentik("PATCH", f"/outposts/instances/{AUTHENTIK_OUTPOST_PK}/", json={
"providers": providers,
})
return (
f"Created proxy provider '{name}' (PK:{pk})\n"
f" external_host: {external_host}\n"
f" mode: {mode}\n"
f" cookie_domain: {cookie_domain}\n"
f" Bound to embedded outpost"
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_create_application(
name: str,
slug: str,
provider_pk: int,
launch_url: str = "",
) -> str:
"""
Create an application in Authentik linked to an existing provider.
Args:
name: Display name, e.g. 'Grafana'.
slug: URL-safe slug, e.g. 'grafana'.
provider_pk: PK of the provider to attach (from authentik_create_proxy_provider or authentik_list_providers).
launch_url: Optional launch URL shown on the Authentik dashboard.
"""
try:
app_data: dict = {
"name": name,
"slug": slug,
"provider": provider_pk,
}
if launch_url:
app_data["meta_launch_url"] = launch_url
result = _authentik("POST", "/core/applications/", json=app_data)
return (
f"Created application '{name}'\n"
f" slug: {result.get('slug')}\n" # type: ignore
f" provider: PK:{provider_pk}\n"
f" launch_url: {result.get('meta_launch_url', '(none)')}" # type: ignore
)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_list_sessions() -> str:
"""
List active authenticated sessions in Authentik.
Useful for debugging login issues.
"""
try:
data = _authentik("GET", "/core/authenticated_sessions/?page_size=50")
sessions = data.get("results", []) # type: ignore
if not sessions:
return "No active sessions."
lines = []
for s in sessions:
uuid = s.get("uuid", "?")
user = s.get("user", {})
username = user.get("username", "?") if isinstance(user, dict) else f"uid:{user}"
last_ip = s.get("last_ip", "?")
last_used = (s.get("last_used") or "?")[:19]
ua = s.get("user_agent", {})
browser = ua.get("user_agent", {}).get("family", "?") if isinstance(ua, dict) else "?"
lines.append(f" {uuid[:8]} {username:15s} {last_ip:16s} {browser:10s} {last_used}")
return f"Active sessions ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_delete_session(session_uuid: str) -> str:
"""
Delete (invalidate) an authenticated session in Authentik.
Use authentik_list_sessions to find the UUID.
Args:
session_uuid: The session UUID to delete.
"""
try:
_authentik("DELETE", f"/core/authenticated_sessions/{session_uuid}/")
return f"Deleted session {session_uuid}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def authentik_get_events(action: str = "", limit: int = 20) -> str:
"""
Get recent Authentik audit log events. Useful for debugging auth failures, login issues, and policy denials.
Args:
action: Optional filter by action type, e.g. 'login', 'login_failed', 'authorize_application',
'model_created', 'model_updated', 'secret_rotate', 'policy_exception'. Leave empty for all.
limit: Number of events to return (max 50, default 20).
"""
try:
limit = min(limit, 50)
params = f"page_size={limit}&ordering=-created"
if action:
params += f"&action={action}"
data = _authentik("GET", f"/events/events/?{params}")
events = data.get("results", []) # type: ignore
if not events:
return "No events found."
lines = []
for e in events:
ts = (e.get("created") or "?")[:19]
act = e.get("action", "?")
user = e.get("user", {})
username = user.get("username", "system") if isinstance(user, dict) else "system"
app = e.get("app", "").replace("authentik.", "")
ctx = e.get("context", {})
msg = ctx.get("message", "") if isinstance(ctx, dict) else ""
detail = msg[:60] if msg else app
lines.append(f" {ts} {act:30s} {username:15s} {detail}")
total = data.get("pagination", {}).get("count", "?") # type: ignore
return f"Events (showing {len(lines)} of {total}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Cloudflare tools
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def cloudflare_list_dns_records(filter_name: Optional[str] = None) -> str:
"""
List DNS records for the vish.gg zone in Cloudflare.
Args:
filter_name: Optional substring to filter by record name.
"""
try:
data = _cloudflare(
"GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records?per_page=200"
)
records = data.get("result", [])
lines = []
for r in records:
name = r.get("name", "?")
if filter_name and filter_name.lower() not in name.lower():
continue
rtype = r.get("type", "?")
content = r.get("content", "?")
proxied = "" if r.get("proxied") else ""
lines.append(f" {proxied} {rtype:6s} {name:45s}{content}")
return f"Cloudflare DNS records ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def cloudflare_create_dns_record(
name: str,
content: str,
record_type: str = "A",
proxied: bool = True,
ttl: int = 1,
) -> str:
"""
Create a new DNS record in the vish.gg Cloudflare zone.
Args:
name: Record name e.g. 'pt' (becomes pt.vish.gg) or 'pt.vish.gg'.
content: Record value e.g. '184.23.52.14' or 'calypso.vish.gg'.
record_type: DNS type: 'A', 'CNAME', 'TXT', etc. Default: 'A'.
proxied: Route through Cloudflare proxy. Default: True.
ttl: TTL in seconds (1 = auto). Default: 1.
"""
try:
data = _cloudflare(
"POST",
f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records",
json={
"type": record_type,
"name": name,
"content": content,
"proxied": proxied,
"ttl": ttl,
},
)
r = data.get("result", {})
return f"Created: {r.get('type')} {r.get('name')}{r.get('content')} proxied:{r.get('proxied')}"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def cloudflare_delete_dns_record(record_id: str) -> str:
"""
Delete a DNS record from the vish.gg Cloudflare zone.
Args:
record_id: The record ID (from cloudflare_list_dns_records — use the ID shown).
"""
try:
_cloudflare("DELETE", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}")
return f"Deleted DNS record {record_id}."
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def cloudflare_update_dns_record(
record_id: str,
content: str,
proxied: Optional[bool] = None,
) -> str:
"""
Update an existing DNS record in Cloudflare.
Args:
record_id: The record ID to update.
content: New record value (IP or hostname).
proxied: Update proxied status. None = keep existing.
"""
try:
existing_data = _cloudflare(
"GET", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}"
)
existing = existing_data.get("result", {})
payload = {
"type": existing.get("type"),
"name": existing.get("name"),
"content": content,
"proxied": proxied if proxied is not None else existing.get("proxied"),
"ttl": existing.get("ttl", 1),
}
data = _cloudflare(
"PUT", f"/zones/{CLOUDFLARE_ZONE_ID}/dns_records/{record_id}", json=payload
)
r = data.get("result", {})
return (
f"Updated: {r.get('name')}{r.get('content')} proxied:{r.get('proxied')}"
)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Uptime Kuma tools
# ---------------------------------------------------------------------------
def _kuma_sqlite(query: str) -> str:
"""Run a SQLite query against the Kuma DB via SSH."""
result = subprocess.run(
[
"ssh",
KUMA_HOST,
f'docker exec {KUMA_CONTAINER} sqlite3 /app/data/kuma.db "{query}"',
],
capture_output=True,
text=True,
timeout=20,
)
if result.returncode != 0:
raise RuntimeError(result.stderr)
return result.stdout.strip()
@mcp.tool()
@_safe
def kuma_list_monitors(filter_name: Optional[str] = None) -> str:
"""
List all monitors in Uptime Kuma with their status and type.
Args:
filter_name: Optional substring to filter by monitor name.
"""
try:
rows = _kuma_sqlite(
"SELECT id, name, type, active, url, hostname, port, parent FROM monitor ORDER BY parent, name;"
)
if not rows:
return "No monitors found."
lines = []
for row in rows.splitlines():
parts = row.split("|")
if len(parts) < 8:
continue
mid, name, mtype, active, url, hostname, port, parent = parts[:8]
if filter_name and filter_name.lower() not in name.lower():
continue
status = "" if active == "1" else ""
target = url or (f"{hostname}:{port}" if hostname else "")
indent = " └─ " if parent and parent != "" else ""
lines.append(
f" [{status}] ID:{mid:4s} {indent}{name:30s} [{mtype:8s}] {target}"
)
return f"Kuma monitors ({len(lines)}):\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def kuma_list_groups() -> str:
"""
List all monitor groups in Uptime Kuma (for use as parent IDs).
"""
try:
rows = _kuma_sqlite(
"SELECT id, name, parent FROM monitor WHERE type='group' ORDER BY name;"
)
if not rows:
return "No groups found."
lines = []
for row in rows.splitlines():
parts = row.split("|")
mid, name = parts[0], parts[1]
parent = parts[2] if len(parts) > 2 else ""
lines.append(f" ID:{mid:4s} {name:30s} parent:{parent or 'none'}")
return f"Kuma groups:\n" + "\n".join(lines)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
@_safe
def kuma_add_monitor(
name: str,
monitor_type: str,
url: Optional[str] = None,
hostname: Optional[str] = None,
port: Optional[int] = None,
parent_id: Optional[int] = None,
interval: int = 60,
ignore_tls: bool = False,
) -> str:
"""
Add a new monitor to Uptime Kuma.
Requires a Kuma restart to take effect (call kuma_restart after adding monitors).
Args:
name: Monitor display name.
monitor_type: 'http', 'port', 'ping', 'group'.
url: URL for http monitors e.g. 'https://pt.vish.gg/'.
hostname: Hostname/IP for port/ping monitors.
port: Port number for port monitors.
parent_id: Parent group monitor ID (from kuma_list_groups).
interval: Check interval in seconds. Default: 60.
ignore_tls: Ignore TLS cert errors. Default: False.
"""
url_val = (url or "https://").replace("'", "''")
host_val = (hostname or "").replace("'", "''")
port_val = int(port or 0)
parent_val = int(parent_id) if parent_id is not None else "NULL"
ignore_tls_val = 1 if ignore_tls else 0
safe_name = name.replace("'", "''")
safe_type = monitor_type.replace("'", "''")
query = (
f"INSERT INTO monitor "
f"(name, active, user_id, interval, url, type, weight, hostname, port, "
f"maxretries, ignore_tls, upside_down, maxredirects, accepted_statuscodes_json, parent) "
f"VALUES ('{safe_name}', 1, 1, {int(interval)}, '{url_val}', '{safe_type}', 2000, "
f"'{host_val}', {port_val}, 3, {ignore_tls_val}, 0, 10, '[\"200-299\"]', {parent_val});"
f"SELECT last_insert_rowid();"
)
result = _kuma_sqlite(query)
monitor_id = result.strip().split("\n")[-1]
return (
f"Monitor '{name}' added (ID: {monitor_id}).\n"
f"Call kuma_restart to activate."
)
@mcp.tool()
@_safe
def kuma_set_parent(monitor_id: int, parent_id: Optional[int] = None) -> str:
"""
Set or clear the parent group of a Kuma monitor.
Args:
monitor_id: Monitor ID to update.
parent_id: Parent group ID, or None to remove from group.
"""
parent_val = int(parent_id) if parent_id is not None else "NULL"
_kuma_sqlite(f"UPDATE monitor SET parent={parent_val} WHERE id={int(monitor_id)};")
return f"Monitor {monitor_id} parent set to {parent_val}. Call kuma_restart to apply."
@mcp.tool()
@_safe
def kuma_restart() -> str:
"""
Restart the Uptime Kuma container to apply DB changes.
Required after any kuma_add_monitor or kuma_set_parent operations.
"""
try:
result = subprocess.run(
["ssh", KUMA_HOST, f"docker restart {KUMA_CONTAINER}"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return f"Error restarting Kuma: {result.stderr}"
return "Uptime Kuma restarted successfully."
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Ollama LLM
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def ollama_query(prompt: str, model: str = "qwen3-coder:latest",
max_tokens: int = 2000, temperature: float = 0.3) -> str:
"""Query the local Ollama LLM for homelab-specific analysis.
Useful for: analyzing logs, explaining configs, generating commands,
summarizing infrastructure state, or any homelab question.
Args:
prompt: The question or analysis prompt.
model: Ollama model name (default: qwen3-coder:latest).
max_tokens: Maximum response tokens.
temperature: Sampling temperature (0.0-1.0, lower = more focused).
"""
import re as _re
with httpx.Client(timeout=120) as c:
resp = c.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": temperature, "num_predict": max_tokens},
},
)
resp.raise_for_status()
raw = resp.json().get("response", "").strip()
return _re.sub(r"<think>.*?</think>", "", raw, flags=_re.DOTALL).strip()
# ---------------------------------------------------------------------------
# Jellyfin tools
# ---------------------------------------------------------------------------
def _jellyfin(path: str) -> dict | list:
"""Make a Jellyfin API request via kubectl exec (bypasses envoy auth sidecar)."""
sep = "&" if "?" in path else "?"
url = f"http://localhost:8096{path}{sep}api_key={JELLYFIN_TOKEN}"
result = subprocess.run(
["ssh", "olares",
f"kubectl exec -n jellyfin-vishinator deploy/jellyfin -c jellyfin -- "
f"curl -s '{url}'"],
capture_output=True,
text=True,
timeout=20,
)
if result.returncode != 0:
raise RuntimeError(f"Jellyfin API error: {result.stderr.strip()}")
import json as _json
return _json.loads(result.stdout)
@mcp.tool()
@_safe
def jellyfin_libraries() -> str:
"""List all Jellyfin media libraries with item counts."""
data = _jellyfin("/Library/VirtualFolders")
if not data:
return "No libraries found."
lines = []
for lib in data:
name = lib.get("Name", "?")
ltype = lib.get("CollectionType", "unknown")
paths = ", ".join(lib.get("Locations", []))
lines.append(f" {name} ({ltype}): {paths}")
return f"Jellyfin Libraries ({len(data)}):\n" + "\n".join(lines)
@mcp.tool()
@_safe
def jellyfin_sessions() -> str:
"""List active Jellyfin playback sessions."""
data = _jellyfin("/Sessions")
if not data:
return "No active sessions."
lines = []
for s in data:
user = s.get("UserName", "?")
client_name = s.get("Client", "?")
device = s.get("DeviceName", "?")
now_playing = s.get("NowPlayingItem")
if now_playing:
title = now_playing.get("Name", "?")
media_type = now_playing.get("Type", "?")
lines.append(f" {user} on {device} ({client_name}): {title} ({media_type})")
else:
lines.append(f" {user} on {device} ({client_name}): idle")
return f"Jellyfin Sessions ({len(lines)}):\n" + "\n".join(lines)
@mcp.tool()
@_safe
def jellyfin_system_info() -> str:
"""Get Jellyfin server system info (version, OS, transcoding)."""
data = _jellyfin("/System/Info/Public")
return (
f"Jellyfin {data.get('Version', '?')}\n"
f" OS: {data.get('OperatingSystem', '?')}\n"
f" Architecture: {data.get('SystemArchitecture', '?')}\n"
f" Server: {data.get('ServerName', '?')}\n"
f" Local Address: {data.get('LocalAddress', '?')}"
)
# ---------------------------------------------------------------------------
# Olares / Kubernetes tools
# ---------------------------------------------------------------------------
def _kubectl(cmd: str, timeout: int = 15) -> str:
"""Run kubectl against the Olares K3s cluster."""
result = subprocess.run(
["ssh", "olares", f"kubectl {cmd}"],
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0:
raise RuntimeError(f"kubectl error: {result.stderr.strip()}")
return result.stdout.strip()
@mcp.tool()
@_safe
def olares_pods(namespace: Optional[str] = None) -> str:
"""List pods on the Olares K3s cluster.
Args:
namespace: Filter by namespace. If None, lists all namespaces.
"""
ns_flag = f"-n {namespace}" if namespace else "-A"
return _kubectl(f"get pods {ns_flag} -o wide")
@mcp.tool()
@_safe
def olares_gpu() -> str:
"""Show NVIDIA GPU status on Olares (nvidia-smi)."""
result = subprocess.run(
["ssh", "olares", "nvidia-smi"],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return f"nvidia-smi failed: {result.stderr.strip()}"
return result.stdout.strip()
@mcp.tool()
@_safe
def olares_pod_logs(namespace: str, pod: str, container: Optional[str] = None,
tail: int = 50) -> str:
"""Get pod logs from Olares K3s.
Args:
namespace: Kubernetes namespace.
pod: Pod name or deployment name (use deploy/NAME).
container: Container name (if pod has multiple).
tail: Number of lines from the end.
"""
container_flag = f"-c {container}" if container else ""
return _kubectl(f"logs -n {namespace} {pod} {container_flag} --tail={tail}", timeout=20)
@mcp.tool()
@_safe
def olares_restart(namespace: str, deployment: str) -> str:
"""Restart a deployment on Olares K3s.
Args:
namespace: Kubernetes namespace.
deployment: Deployment name.
"""
return _kubectl(f"rollout restart deployment/{deployment} -n {namespace}")
# ---------------------------------------------------------------------------
# Homelab services list
# ---------------------------------------------------------------------------
@mcp.tool()
@_safe
def list_homelab_services() -> str:
"""List all known homelab services with their URLs and hosts.
Returns a quick reference of service endpoints across the homelab.
"""
services = [
("Portainer", PORTAINER_URL, "atlantis (Tailscale)"),
("Gitea", GITEA_URL, "matrix-ubuntu via NPM"),
("Prometheus", PROMETHEUS_URL, "homelab-vm"),
("Grafana", GRAFANA_URL, "homelab-vm"),
("Sonarr", SONARR_URL, "atlantis"),
("Radarr", RADARR_URL, "atlantis"),
("SABnzbd", SABNZBD_URL, "atlantis"),
("AdGuard", ADGUARD_URL, "calypso"),
("NPM", NPM_URL, "matrix-ubuntu"),
("Authentik", AUTHENTIK_URL, "calypso"),
("Jellyfin", JELLYFIN_URL, "olares (NodePort 30096)"),
("Ollama", OLLAMA_URL, "olares (NodePort 31434)"),
("Kuma", f"ssh://{KUMA_HOST}", "rpi5"),
]
lines = [f" {name:12} {url:45} ({host})" for name, url, host in services]
return f"Homelab Services ({len(services)}):\n" + "\n".join(lines)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
mcp.run()