1189 lines
39 KiB
Python
1189 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Homelab MCP Server
|
|
|
|
Provides MCP tools for managing homelab infrastructure:
|
|
- Portainer: stack/container management across all endpoints
|
|
- Gitea: repo, issue, and branch management
|
|
- Prometheus: PromQL queries and target inspection
|
|
- Grafana: dashboard and alert listing
|
|
- Sonarr/Radarr: media library and download queue
|
|
- SABnzbd: download queue management
|
|
- SSH: remote command execution on homelab hosts
|
|
- Filesystem: read/write files on the local machine
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from fastmcp import FastMCP
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PORTAINER_URL = "https://192.168.0.200:9443"
|
|
PORTAINER_TOKEN = "REDACTED_TOKEN"
|
|
GITEA_TOKEN = "REDACTED_TOKEN"
|
|
NTFY_BASE = "https://ntfy.vish.gg"
|
|
REPO_PATH = Path("/home/homelab/organized/repos/homelab")
|
|
|
|
ENDPOINTS: dict[str, int] = {
|
|
"atlantis": 2,
|
|
"calypso": 443397,
|
|
"nuc": 443398,
|
|
"homelab": 443399,
|
|
"rpi5": 443395,
|
|
}
|
|
|
|
# Gitea
|
|
GITEA_URL = "http://192.168.0.250:3052"
|
|
GITEA_ORG = "vish"
|
|
|
|
# Monitoring
|
|
PROMETHEUS_URL = "http://192.168.0.210:9090"
|
|
GRAFANA_URL = "http://192.168.0.210:3300"
|
|
GRAFANA_USER = "admin"
|
|
GRAFANA_PASS = "REDACTED_PASSWORD"
|
|
|
|
# Media
|
|
SONARR_URL = "http://192.168.0.200:8989"
|
|
SONARR_API_KEY = "REDACTED_API_KEY"
|
|
RADARR_URL = "http://192.168.0.200:7878"
|
|
RADARR_API_KEY = "REDACTED_API_KEY"
|
|
SABNZBD_URL = "http://192.168.0.200:8080"
|
|
SABNZBD_API_KEY = "REDACTED_API_KEY"
|
|
|
|
# SSH — hostnames must resolve via /etc/hosts or ~/.ssh/config
|
|
SSH_KNOWN_HOSTS = [
|
|
"atlantis",
|
|
"calypso",
|
|
"setillo",
|
|
"setillo-root",
|
|
"nuc",
|
|
"homelab-vm",
|
|
"rpi5",
|
|
"pi-5",
|
|
]
|
|
|
|
# Filesystem — restrict read/write to safe root paths
|
|
FS_ALLOWED_ROOTS = [
|
|
Path("/home/homelab"),
|
|
Path("/tmp"),
|
|
]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _portainer(method: str, path: str, **kwargs) -> dict | list:
|
|
"""Make a Portainer API request. Raises on HTTP error."""
|
|
with httpx.Client(verify=False, timeout=30) as client:
|
|
r = client.request(
|
|
method,
|
|
f"{PORTAINER_URL}/api{path}",
|
|
headers={"X-API-Key": PORTAINER_TOKEN},
|
|
**kwargs,
|
|
)
|
|
r.raise_for_status()
|
|
if r.content:
|
|
return r.json()
|
|
return {}
|
|
|
|
|
|
def _resolve_endpoint(endpoint: str) -> int:
|
|
"""Resolve endpoint name or numeric string to an endpoint ID."""
|
|
if endpoint.isdigit():
|
|
return int(endpoint)
|
|
ep = endpoint.lower()
|
|
if ep not in ENDPOINTS:
|
|
raise ValueError(
|
|
f"Unknown endpoint '{endpoint}'. Known: {', '.join(ENDPOINTS)}"
|
|
)
|
|
return ENDPOINTS[ep]
|
|
|
|
|
|
def _gitea(method: str, path: str, **kwargs) -> dict | list:
|
|
"""Make a Gitea API request."""
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.request(
|
|
method,
|
|
f"{GITEA_URL}/api/v1{path}",
|
|
headers={"Authorization": f"token {GITEA_TOKEN}"},
|
|
**kwargs,
|
|
)
|
|
r.raise_for_status()
|
|
if r.content:
|
|
return r.json()
|
|
return {}
|
|
|
|
|
|
def _arr(
|
|
base_url: str, api_key: str, path: str, params: dict | None = None
|
|
) -> dict | list:
|
|
"""Make a Sonarr/Radarr API request."""
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.get(
|
|
f"{base_url}/api/v3{path}",
|
|
headers={"X-Api-Key": api_key},
|
|
params=params or {},
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _sabnzbd(mode: str, extra: dict | None = None) -> dict:
|
|
"""Make a SABnzbd API request."""
|
|
params = {"apikey": SABNZBD_API_KEY, "output": "json", "mode": mode}
|
|
if extra:
|
|
params.update(extra)
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.get(f"{SABNZBD_URL}/api", params=params)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _fs_safe(path: str) -> Path:
|
|
"""Resolve a path and verify it's under an allowed root."""
|
|
p = Path(path).expanduser().resolve()
|
|
for root in FS_ALLOWED_ROOTS:
|
|
try:
|
|
p.relative_to(root)
|
|
return p
|
|
except ValueError:
|
|
continue
|
|
allowed = ", ".join(str(r) for r in FS_ALLOWED_ROOTS)
|
|
raise PermissionError(f"Path '{path}' is outside allowed roots: {allowed}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCP Server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
mcp = FastMCP(
|
|
"Homelab",
|
|
instructions=(
|
|
"Tools for managing a homelab running Docker services across multiple hosts.\n\n"
|
|
"PORTAINER — Docker orchestration across 5 endpoints:\n"
|
|
" Endpoints: atlantis (main NAS, media stack), calypso (secondary NAS), "
|
|
"nuc (mini PC), homelab (VM at 192.168.0.210), rpi5 (Raspberry Pi).\n"
|
|
" Tools: list_endpoints, list_stacks, get_stack, redeploy_stack, "
|
|
"list_containers, get_container_logs, restart_container, start_container, "
|
|
"stop_container, list_stack_containers, check_portainer.\n\n"
|
|
"GITEA — Self-hosted git at 192.168.0.250:3052, org=vish. Repo names can be "
|
|
"'vish/homelab' or just 'homelab'.\n"
|
|
" Tools: gitea_list_repos, gitea_list_issues, gitea_create_issue, gitea_list_branches.\n\n"
|
|
"PROMETHEUS — PromQL queries against homelab metrics (192.168.0.210:9090).\n"
|
|
" Tools: prometheus_query, prometheus_targets.\n\n"
|
|
"GRAFANA — Dashboard and alert inspection (192.168.0.210:3300).\n"
|
|
" Tools: grafana_list_dashboards, grafana_list_alerts.\n\n"
|
|
"SONARR/RADARR — Media library and download queue on Atlantis (ports 8989/7878).\n"
|
|
" Tools: sonarr_list_series, sonarr_queue, radarr_list_movies, radarr_queue.\n\n"
|
|
"SABNZBD — Usenet download queue on Atlantis (port 8080).\n"
|
|
" Tools: sabnzbd_queue, sabnzbd_pause, sabnzbd_resume.\n\n"
|
|
"SSH — Run commands on homelab hosts. Allowed: atlantis, calypso, setillo, "
|
|
"setillo-root, nuc, homelab-vm, rpi5. Requires key auth in ~/.ssh/config.\n"
|
|
" Tool: ssh_exec(host, command).\n\n"
|
|
"FILESYSTEM — Read/write files on the local machine. "
|
|
"Allowed roots: /home/homelab, /tmp.\n"
|
|
" Tools: fs_read, fs_write, fs_list.\n\n"
|
|
"REPO — Inspect compose files in the homelab git repo at "
|
|
"/home/homelab/organized/repos/homelab.\n"
|
|
" Tools: list_homelab_services, get_compose_file.\n\n"
|
|
"UTILITIES — check_url (HTTP health check), send_notification (ntfy push)."
|
|
),
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Portainer — Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def list_endpoints() -> str:
|
|
"""List all Portainer environments (servers) with their connection status."""
|
|
data = _portainer("GET", "/endpoints")
|
|
rows = []
|
|
for ep in data:
|
|
rows.append(
|
|
f" {ep['Name']} (id={ep['Id']}) — "
|
|
f"status={'online' if ep.get('Status') == 1 else 'offline'} — "
|
|
f"containers={ep.get('Snapshots', [{}])[0].get('RunningContainerCount', '?')} running"
|
|
)
|
|
return "Endpoints:\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Portainer — Stacks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def list_stacks(endpoint: Optional[str] = None) -> str:
|
|
"""
|
|
List all Portainer stacks with their status.
|
|
|
|
Args:
|
|
endpoint: Optional filter by endpoint name (atlantis, calypso, nuc,
|
|
homelab, rpi5) or numeric ID.
|
|
"""
|
|
data = _portainer("GET", "/stacks")
|
|
if endpoint:
|
|
ep_id = _resolve_endpoint(endpoint)
|
|
data = [s for s in data if s.get("EndpointId") == ep_id]
|
|
|
|
rows = []
|
|
for s in sorted(data, key=lambda x: x["Name"]):
|
|
status = "active" if s.get("Status") == 1 else "inactive"
|
|
ep_name = next(
|
|
(k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")),
|
|
str(s.get("EndpointId")),
|
|
)
|
|
git = s.get("GitConfig", {})
|
|
git_info = f" [git: {git.get('ConfigFilePath', '')}]" if git else ""
|
|
rows.append(
|
|
f" [{s['Id']}] {s['Name']} — {status} — endpoint={ep_name}{git_info}"
|
|
)
|
|
|
|
return f"Stacks ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_stack(stack_name_or_id: str) -> str:
|
|
"""
|
|
Get detailed info about a specific stack.
|
|
|
|
Args:
|
|
stack_name_or_id: Stack name (partial match) or numeric ID.
|
|
"""
|
|
all_stacks = _portainer("GET", "/stacks")
|
|
|
|
if stack_name_or_id.isdigit():
|
|
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
|
|
else:
|
|
term = stack_name_or_id.lower()
|
|
matches = [s for s in all_stacks if term in s["Name"].lower()]
|
|
|
|
if not matches:
|
|
return f"No stack found matching '{stack_name_or_id}'."
|
|
if len(matches) > 1:
|
|
names = ", ".join(s["Name"] for s in matches)
|
|
return f"Multiple matches: {names}. Be more specific."
|
|
|
|
s = matches[0]
|
|
git = s.get("GitConfig") or {}
|
|
env = s.get("Env") or []
|
|
ep_name = next(
|
|
(k for k, v in ENDPOINTS.items() if v == s.get("EndpointId")),
|
|
str(s.get("EndpointId")),
|
|
)
|
|
|
|
lines = [
|
|
f"Stack: {s['Name']} (id={s['Id']})",
|
|
f" Status: {'active' if s.get('Status') == 1 else 'inactive'}",
|
|
f" Endpoint: {ep_name} (id={s.get('EndpointId')})",
|
|
f" Created: {s.get('CreationDate', 'unknown')}",
|
|
f" Updated: {s.get('UpdateDate', 'unknown')}",
|
|
]
|
|
if git:
|
|
lines += [
|
|
f" Git URL: {git.get('URL', '')}",
|
|
f" Git Branch: {git.get('ReferenceName', '').replace('refs/heads/', '')}",
|
|
f" Git File: {git.get('ConfigFilePath', '')}",
|
|
]
|
|
if env:
|
|
lines.append(f" Env vars: {len(env)} set")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def redeploy_stack(stack_name_or_id: str) -> str:
|
|
"""
|
|
Trigger a GitOps redeploy of a stack (pull latest from Git and redeploy).
|
|
|
|
Args:
|
|
stack_name_or_id: Stack name (partial match) or numeric ID.
|
|
"""
|
|
all_stacks = _portainer("GET", "/stacks")
|
|
|
|
if stack_name_or_id.isdigit():
|
|
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
|
|
else:
|
|
term = stack_name_or_id.lower()
|
|
matches = [s for s in all_stacks if term in s["Name"].lower()]
|
|
|
|
if not matches:
|
|
return f"No stack found matching '{stack_name_or_id}'."
|
|
if len(matches) > 1:
|
|
names = ", ".join(s["Name"] for s in matches)
|
|
return f"Multiple matches: {names}. Be more specific."
|
|
|
|
s = matches[0]
|
|
if not s.get("GitConfig"):
|
|
return f"Stack '{s['Name']}' is not a GitOps stack — cannot redeploy via git."
|
|
|
|
ep_id = s["EndpointId"]
|
|
stack_id = s["Id"]
|
|
|
|
_portainer(
|
|
"PUT",
|
|
f"/stacks/{stack_id}/git/redeploy?endpointId={ep_id}",
|
|
json={
|
|
"pullImage": True,
|
|
"prune": False,
|
|
"repositoryAuthentication": True,
|
|
"repositoryUsername": "vish",
|
|
"repositoryPassword": GITEA_TOKEN,
|
|
},
|
|
)
|
|
return f"Redeploy triggered for stack '{s['Name']}' (id={stack_id}) on endpoint {ep_id}."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Portainer — Containers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def list_containers(
|
|
endpoint: str = "atlantis",
|
|
all_containers: bool = False,
|
|
filter_name: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
List containers on a Portainer endpoint.
|
|
|
|
Args:
|
|
endpoint: Endpoint name (atlantis, calypso, nuc, homelab, rpi5) or ID.
|
|
all_containers: If True, include stopped containers. Default False (running only).
|
|
filter_name: Optional substring to filter container names.
|
|
"""
|
|
ep_id = _resolve_endpoint(endpoint)
|
|
params = "?all=true" if all_containers else ""
|
|
data = _portainer("GET", f"/endpoints/{ep_id}/docker/containers/json{params}")
|
|
|
|
if filter_name:
|
|
term = filter_name.lower()
|
|
data = [c for c in data if any(term in n.lower() for n in c.get("Names", []))]
|
|
|
|
rows = []
|
|
for c in sorted(data, key=lambda x: x.get("Names", [""])[0]):
|
|
name = c.get("Names", ["?"])[0].lstrip("/")
|
|
image = c.get("Image", "?").split(":")[0].split("/")[-1]
|
|
state = c.get("State", "?")
|
|
short_id = c.get("Id", "")[:12]
|
|
rows.append(f" {short_id} {state:10s} {name:40s} {image}")
|
|
|
|
header = f"Containers on {endpoint} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image"
|
|
return header + "\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_container_logs(
|
|
container_id_or_name: str,
|
|
endpoint: str = "atlantis",
|
|
tail: int = 100,
|
|
) -> str:
|
|
"""
|
|
Get recent logs from a container.
|
|
|
|
Args:
|
|
container_id_or_name: Container ID (short or full) or name substring.
|
|
endpoint: Endpoint name or ID. Default: atlantis.
|
|
tail: Number of log lines to return. Default: 100.
|
|
"""
|
|
ep_id = _resolve_endpoint(endpoint)
|
|
|
|
# Resolve by name if not a hex ID
|
|
if not all(c in "0123456789abcdefABCDEF" for c in container_id_or_name):
|
|
containers = _portainer(
|
|
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
|
|
)
|
|
term = container_id_or_name.lower()
|
|
matches = [
|
|
c for c in containers if any(term in n.lower() for n in c.get("Names", []))
|
|
]
|
|
if not matches:
|
|
return (
|
|
f"No container found matching '{container_id_or_name}' on {endpoint}."
|
|
)
|
|
if len(matches) > 1:
|
|
names = ", ".join(c["Names"][0].lstrip("/") for c in matches)
|
|
return f"Multiple matches: {names}. Be more specific."
|
|
container_id_or_name = matches[0]["Id"][:12]
|
|
|
|
with httpx.Client(verify=False, timeout=30) as client:
|
|
r = client.get(
|
|
f"{PORTAINER_URL}/api/endpoints/{ep_id}/docker/containers/{container_id_or_name}/logs",
|
|
headers={"X-API-Key": PORTAINER_TOKEN},
|
|
params={"stdout": 1, "stderr": 1, "tail": tail, "timestamps": 0},
|
|
)
|
|
r.raise_for_status()
|
|
# Docker log stream has 8-byte header per line; strip it
|
|
raw = r.content
|
|
lines = []
|
|
i = 0
|
|
while i < len(raw):
|
|
if i + 8 > len(raw):
|
|
break
|
|
size = int.from_bytes(raw[i + 4 : i + 8], "big")
|
|
line = raw[i + 8 : i + 8 + size].decode("utf-8", errors="replace").rstrip()
|
|
if line:
|
|
lines.append(line)
|
|
i += 8 + size
|
|
if not lines:
|
|
# fallback: treat as plain text
|
|
lines = r.text.splitlines()
|
|
|
|
return "\n".join(lines[-tail:])
|
|
|
|
|
|
@mcp.tool()
|
|
def restart_container(
|
|
container_id_or_name: str,
|
|
endpoint: str = "atlantis",
|
|
) -> str:
|
|
"""
|
|
Restart a container.
|
|
|
|
Args:
|
|
container_id_or_name: Container ID (short/full) or name substring.
|
|
endpoint: Endpoint name or ID. Default: atlantis.
|
|
"""
|
|
ep_id = _resolve_endpoint(endpoint)
|
|
cid = _resolve_container_id(container_id_or_name, ep_id)
|
|
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/restart")
|
|
return f"Restarted container {cid} on {endpoint}."
|
|
|
|
|
|
@mcp.tool()
|
|
def stop_container(
|
|
container_id_or_name: str,
|
|
endpoint: str = "atlantis",
|
|
) -> str:
|
|
"""
|
|
Stop a running container.
|
|
|
|
Args:
|
|
container_id_or_name: Container ID (short/full) or name substring.
|
|
endpoint: Endpoint name or ID. Default: atlantis.
|
|
"""
|
|
ep_id = _resolve_endpoint(endpoint)
|
|
cid = _resolve_container_id(container_id_or_name, ep_id)
|
|
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/stop")
|
|
return f"Stopped container {cid} on {endpoint}."
|
|
|
|
|
|
@mcp.tool()
|
|
def start_container(
|
|
container_id_or_name: str,
|
|
endpoint: str = "atlantis",
|
|
) -> str:
|
|
"""
|
|
Start a stopped container.
|
|
|
|
Args:
|
|
container_id_or_name: Container ID (short/full) or name substring.
|
|
endpoint: Endpoint name or ID. Default: atlantis.
|
|
"""
|
|
ep_id = _resolve_endpoint(endpoint)
|
|
cid = _resolve_container_id(container_id_or_name, ep_id)
|
|
_portainer("POST", f"/endpoints/{ep_id}/docker/containers/{cid}/start")
|
|
return f"Started container {cid} on {endpoint}."
|
|
|
|
|
|
def _resolve_container_id(name_or_id: str, ep_id: int) -> str:
|
|
"""Resolve a container name substring to a short container ID."""
|
|
if len(name_or_id) >= 12 and all(c in "0123456789abcdefABCDEF" for c in name_or_id):
|
|
return name_or_id[:12]
|
|
containers = _portainer(
|
|
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
|
|
)
|
|
term = name_or_id.lower()
|
|
matches = [
|
|
c for c in containers if any(term in n.lower() for n in c.get("Names", []))
|
|
]
|
|
if not matches:
|
|
raise ValueError(f"No container found matching '{name_or_id}'.")
|
|
if len(matches) > 1:
|
|
names = ", ".join(c["Names"][0].lstrip("/") for c in matches)
|
|
raise ValueError(
|
|
f"Multiple containers match '{name_or_id}': {names}. Be more specific."
|
|
)
|
|
return matches[0]["Id"][:12]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Portainer — Stack containers (convenience)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def list_stack_containers(stack_name_or_id: str) -> str:
|
|
"""
|
|
List all containers belonging to a specific stack.
|
|
|
|
Args:
|
|
stack_name_or_id: Stack name (partial match) or numeric ID.
|
|
"""
|
|
all_stacks = _portainer("GET", "/stacks")
|
|
|
|
if stack_name_or_id.isdigit():
|
|
matches = [s for s in all_stacks if s["Id"] == int(stack_name_or_id)]
|
|
else:
|
|
term = stack_name_or_id.lower()
|
|
matches = [s for s in all_stacks if term in s["Name"].lower()]
|
|
|
|
if not matches:
|
|
return f"No stack found matching '{stack_name_or_id}'."
|
|
if len(matches) > 1:
|
|
names = ", ".join(s["Name"] for s in matches)
|
|
return f"Multiple matches: {names}. Be more specific."
|
|
|
|
s = matches[0]
|
|
ep_id = s["EndpointId"]
|
|
stack_name = s["Name"]
|
|
|
|
containers = _portainer(
|
|
"GET", f"/endpoints/{ep_id}/docker/containers/json?all=true"
|
|
)
|
|
# Filter by compose project label (Portainer uses com.docker.compose.project)
|
|
stack_containers = [
|
|
c
|
|
for c in containers
|
|
if c.get("Labels", {}).get("com.docker.compose.project", "").lower()
|
|
== stack_name.lower()
|
|
or any(stack_name.lower() in n.lower() for n in c.get("Names", []))
|
|
]
|
|
|
|
ep_name = next((k for k, v in ENDPOINTS.items() if v == ep_id), str(ep_id))
|
|
rows = []
|
|
for c in sorted(stack_containers, key=lambda x: x.get("Names", [""])[0]):
|
|
name = c.get("Names", ["?"])[0].lstrip("/")
|
|
state = c.get("State", "?")
|
|
short_id = c.get("Id", "")[:12]
|
|
image = c.get("Image", "?").split(":")[0].split("/")[-1]
|
|
rows.append(f" {short_id} {state:10s} {name:40s} {image}")
|
|
|
|
header = f"Containers in stack '{stack_name}' on {ep_name} ({len(rows)}):\n {'ID':12s} {'State':10s} {'Name':40s} Image"
|
|
return header + "\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Health checks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def check_url(url: str, expected_status: int = 200) -> str:
|
|
"""
|
|
Perform an HTTP health check on a URL.
|
|
|
|
Args:
|
|
url: The URL to check (e.g. http://192.168.0.200:9443/api/status).
|
|
expected_status: Expected HTTP status code. Default: 200.
|
|
"""
|
|
try:
|
|
with httpx.Client(verify=False, timeout=10, follow_redirects=True) as client:
|
|
r = client.get(url)
|
|
ok = r.status_code == expected_status
|
|
return (
|
|
f"{'OK' if ok else 'FAIL'} {url}\n"
|
|
f" Status: {r.status_code} (expected {expected_status})\n"
|
|
f" Latency: {r.elapsed.total_seconds() * 1000:.0f}ms"
|
|
)
|
|
except Exception as e:
|
|
return f"ERROR {url}\n {type(e).__name__}: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
def check_portainer() -> str:
|
|
"""Quick health check of the Portainer API and summary of infrastructure."""
|
|
try:
|
|
status = _portainer("GET", "/status")
|
|
stacks = _portainer("GET", "/stacks")
|
|
active = sum(1 for s in stacks if s.get("Status") == 1)
|
|
return (
|
|
f"Portainer OK — version {status.get('Version', '?')}\n"
|
|
f" Stacks: {len(stacks)} total, {active} active"
|
|
)
|
|
except Exception as e:
|
|
return f"Portainer UNREACHABLE: {e}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Repo — service inspection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def list_homelab_services(host_filter: Optional[str] = None) -> str:
|
|
"""
|
|
List all services/stacks defined in the homelab repository.
|
|
|
|
Args:
|
|
host_filter: Optional substring to filter by host/path (e.g. 'atlantis', 'calypso', 'seattle').
|
|
"""
|
|
compose_files = list(REPO_PATH.rglob("docker-compose.yml")) + list(
|
|
REPO_PATH.rglob("docker-compose.yaml")
|
|
)
|
|
# Exclude archive
|
|
compose_files = [f for f in compose_files if "archive" not in str(f).parts]
|
|
|
|
rows = []
|
|
for f in sorted(compose_files):
|
|
rel = f.relative_to(REPO_PATH)
|
|
if host_filter and host_filter.lower() not in str(rel).lower():
|
|
continue
|
|
rows.append(f" {rel}")
|
|
|
|
return f"Compose files ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_compose_file(service_path: str) -> str:
|
|
"""
|
|
Read a compose file from the homelab repo.
|
|
|
|
Args:
|
|
service_path: Relative path within the repo, e.g.
|
|
'hosts/synology/atlantis/arr-suite/docker-compose.yml'
|
|
or a partial name like 'atlantis/arr-suite'.
|
|
"""
|
|
# Try exact relative path first
|
|
candidate = REPO_PATH / service_path
|
|
if candidate.is_file():
|
|
return candidate.read_text()
|
|
|
|
# Try fuzzy: find compose files whose path contains the fragment
|
|
term = service_path.lower().replace("\\", "/")
|
|
hits = [
|
|
f
|
|
for f in REPO_PATH.rglob("docker-compose.y*ml")
|
|
if term in str(f.relative_to(REPO_PATH)).lower()
|
|
and "archive" not in str(f).lower()
|
|
]
|
|
|
|
if not hits:
|
|
return f"No compose file found matching '{service_path}'."
|
|
if len(hits) > 1:
|
|
paths = "\n".join(f" {f.relative_to(REPO_PATH)}" for f in hits)
|
|
return f"Multiple matches:\n{paths}\nBe more specific."
|
|
|
|
return hits[0].read_text()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notifications
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def send_notification(
|
|
message: str,
|
|
title: str = "Homelab",
|
|
topic: str = "homelab",
|
|
priority: str = "default",
|
|
tags: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Send a push notification via ntfy.
|
|
|
|
Args:
|
|
message: The notification body.
|
|
title: Notification title. Default: 'Homelab'.
|
|
topic: ntfy topic to publish to. Default: 'homelab'.
|
|
priority: urgent, high, default, low, or min. Default: 'default'.
|
|
tags: Comma-separated emoji tags e.g. 'warning,robot'. Optional.
|
|
"""
|
|
headers = {
|
|
"Title": title,
|
|
"Priority": priority,
|
|
}
|
|
if tags:
|
|
headers["Tags"] = tags
|
|
|
|
with httpx.Client(timeout=10) as client:
|
|
r = client.post(
|
|
f"{NTFY_BASE}/{topic}",
|
|
content=message.encode(),
|
|
headers=headers,
|
|
)
|
|
r.raise_for_status()
|
|
|
|
return f"Notification sent to {NTFY_BASE}/{topic} — '{title}: {message}'"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gitea
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def gitea_list_repos(owner: Optional[str] = None, limit: int = 50) -> str:
|
|
"""
|
|
List Gitea repositories.
|
|
|
|
Args:
|
|
owner: User or org name. Defaults to the service account's accessible repos.
|
|
limit: Max repos to return. Default: 50.
|
|
"""
|
|
if owner:
|
|
data = _gitea("GET", f"/repos/search", params={"owner": owner, "limit": limit})
|
|
repos = data.get("data", []) if isinstance(data, dict) else data
|
|
else:
|
|
repos = _gitea(
|
|
"GET", f"/repos/search", params={"limit": limit, "token": GITEA_TOKEN}
|
|
)
|
|
repos = repos.get("data", []) if isinstance(repos, dict) else repos
|
|
|
|
rows = []
|
|
for r in repos:
|
|
archived = " [archived]" if r.get("archived") else ""
|
|
private = " [private]" if r.get("private") else ""
|
|
rows.append(
|
|
f" {r['full_name']}{private}{archived} — "
|
|
f"⭐{r.get('stars_count', 0)} "
|
|
f"updated: {r.get('updated', '')[:10]}"
|
|
)
|
|
return f"Repos ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def gitea_list_issues(
|
|
repo: str,
|
|
state: str = "open",
|
|
limit: int = 20,
|
|
) -> str:
|
|
"""
|
|
List issues for a Gitea repository.
|
|
|
|
Args:
|
|
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
|
|
state: 'open', 'closed', or 'all'. Default: 'open'.
|
|
limit: Max issues to return. Default: 20.
|
|
"""
|
|
if "/" not in repo:
|
|
repo = f"{GITEA_ORG}/{repo}"
|
|
data = _gitea(
|
|
"GET",
|
|
f"/repos/{repo}/issues",
|
|
params={"state": state, "type": "issues", "limit": limit},
|
|
)
|
|
if not data:
|
|
return f"No {state} issues in {repo}."
|
|
rows = []
|
|
for issue in data:
|
|
labels = ", ".join(l["name"] for l in issue.get("labels", []))
|
|
label_str = f" [{labels}]" if labels else ""
|
|
rows.append(
|
|
f" #{issue['number']} {issue['title']}{label_str} — @{issue['user']['login']}"
|
|
)
|
|
return f"{state.capitalize()} issues in {repo} ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def gitea_create_issue(repo: str, title: str, body: str = "") -> str:
|
|
"""
|
|
Create a new issue in a Gitea repository.
|
|
|
|
Args:
|
|
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
|
|
title: Issue title.
|
|
body: Issue body/description. Optional.
|
|
"""
|
|
if "/" not in repo:
|
|
repo = f"{GITEA_ORG}/{repo}"
|
|
data = _gitea("POST", f"/repos/{repo}/issues", json={"title": title, "body": body})
|
|
return (
|
|
f"Created issue #{data['number']}: {data['title']}\n URL: {data['html_url']}"
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def gitea_list_branches(repo: str) -> str:
|
|
"""
|
|
List branches for a Gitea repository.
|
|
|
|
Args:
|
|
repo: Full repo name e.g. 'vish/homelab' or just 'homelab' (assumes GITEA_ORG).
|
|
"""
|
|
if "/" not in repo:
|
|
repo = f"{GITEA_ORG}/{repo}"
|
|
data = _gitea("GET", f"/repos/{repo}/branches")
|
|
rows = [
|
|
f" {b['name']}" + (" [default]" if b.get("is_default") else "") for b in data
|
|
]
|
|
return f"Branches in {repo} ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prometheus
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def prometheus_query(query: str) -> str:
|
|
"""
|
|
Run an instant PromQL query.
|
|
|
|
Args:
|
|
query: PromQL expression e.g. 'up', 'node_memory_MemAvailable_bytes{job="node"}'.
|
|
"""
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query})
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
if data.get("status") != "success":
|
|
return f"Query failed: {data.get('error', 'unknown error')}"
|
|
|
|
results = data["data"]["result"]
|
|
if not results:
|
|
return f"No results for: {query}"
|
|
|
|
rows = []
|
|
for item in results[:50]: # cap output
|
|
metric = item["metric"]
|
|
value = item["value"][1] if item.get("value") else "?"
|
|
label_str = ", ".join(
|
|
f'{k}="{v}"' for k, v in metric.items() if k != "__name__"
|
|
)
|
|
name = metric.get("__name__", query)
|
|
rows.append(f" {name}{{{label_str}}} = {value}")
|
|
return f"Results ({len(results)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def prometheus_targets() -> str:
|
|
"""List all Prometheus scrape targets and their health status."""
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.get(f"{PROMETHEUS_URL}/api/v1/targets")
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
active = data["data"].get("activeTargets", [])
|
|
rows = []
|
|
for t in sorted(active, key=lambda x: x.get("labels", {}).get("job", "")):
|
|
job = t.get("labels", {}).get("job", "?")
|
|
instance = t.get("labels", {}).get("instance", "?")
|
|
health = t.get("health", "?")
|
|
last_scrape = t.get("lastScrapeDuration", 0)
|
|
rows.append(
|
|
f" {'✓' if health == 'up' else '✗'} {job:30s} {instance:40s} {health}"
|
|
)
|
|
return f"Prometheus targets ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Grafana
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def grafana_list_dashboards() -> str:
|
|
"""List all Grafana dashboards."""
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.get(
|
|
f"{GRAFANA_URL}/api/search",
|
|
params={"type": "dash-db"},
|
|
auth=(GRAFANA_USER, GRAFANA_PASS),
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
rows = []
|
|
for d in data:
|
|
folder = d.get("folderTitle", "General")
|
|
rows.append(f" [{d['uid']:20s}] {d['title']:50s} folder={folder}")
|
|
return f"Dashboards ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def grafana_list_alerts() -> str:
|
|
"""List Grafana alert rules and their current state."""
|
|
with httpx.Client(timeout=20) as client:
|
|
r = client.get(
|
|
f"{GRAFANA_URL}/api/v1/provisioning/alert-rules",
|
|
auth=(GRAFANA_USER, GRAFANA_PASS),
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
if not data:
|
|
return "No alert rules configured."
|
|
rows = []
|
|
for rule in data:
|
|
rows.append(f" {rule.get('title', '?'):50s} uid={rule.get('uid', '?')}")
|
|
return f"Alert rules ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sonarr
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def sonarr_list_series(filter_name: Optional[str] = None) -> str:
|
|
"""
|
|
List all series in Sonarr.
|
|
|
|
Args:
|
|
filter_name: Optional substring to filter by series title.
|
|
"""
|
|
data = _arr(SONARR_URL, SONARR_API_KEY, "/series")
|
|
if filter_name:
|
|
term = filter_name.lower()
|
|
data = [s for s in data if term in s.get("title", "").lower()]
|
|
|
|
rows = []
|
|
for s in sorted(data, key=lambda x: x.get("sortTitle", "")):
|
|
status = s.get("status", "?")
|
|
monitored = "✓" if s.get("monitored") else "✗"
|
|
ep_count = s.get("episodeCount", 0)
|
|
rows.append(f" {monitored} {s['title']:50s} {status:12s} {ep_count} eps")
|
|
return f"Series ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def sonarr_queue() -> str:
|
|
"""Show the Sonarr download queue."""
|
|
data = _arr(SONARR_URL, SONARR_API_KEY, "/queue")
|
|
records = data.get("records", []) if isinstance(data, dict) else data
|
|
if not records:
|
|
return "Sonarr queue is empty."
|
|
rows = []
|
|
for item in records:
|
|
title = item.get("title", "?")[:60]
|
|
status = item.get("status", "?")
|
|
size = item.get("size", 0)
|
|
sizeleft = item.get("sizeleft", 0)
|
|
pct = int((1 - sizeleft / size) * 100) if size else 0
|
|
rows.append(f" {status:12s} {pct:3d}% {title}")
|
|
return f"Sonarr queue ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Radarr
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def radarr_list_movies(filter_name: Optional[str] = None) -> str:
|
|
"""
|
|
List all movies in Radarr.
|
|
|
|
Args:
|
|
filter_name: Optional substring to filter by movie title.
|
|
"""
|
|
data = _arr(RADARR_URL, RADARR_API_KEY, "/movie")
|
|
if filter_name:
|
|
term = filter_name.lower()
|
|
data = [m for m in data if term in m.get("title", "").lower()]
|
|
|
|
rows = []
|
|
for m in sorted(data, key=lambda x: x.get("sortTitle", "")):
|
|
monitored = "✓" if m.get("monitored") else "✗"
|
|
downloaded = "↓" if m.get("hasFile") else " "
|
|
year = m.get("year", "?")
|
|
rows.append(f" {monitored}{downloaded} {m['title']:50s} {year}")
|
|
return f"Movies ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def radarr_queue() -> str:
|
|
"""Show the Radarr download queue."""
|
|
data = _arr(RADARR_URL, RADARR_API_KEY, "/queue")
|
|
records = data.get("records", []) if isinstance(data, dict) else data
|
|
if not records:
|
|
return "Radarr queue is empty."
|
|
rows = []
|
|
for item in records:
|
|
title = item.get("title", "?")[:60]
|
|
status = item.get("status", "?")
|
|
size = item.get("size", 0)
|
|
sizeleft = item.get("sizeleft", 0)
|
|
pct = int((1 - sizeleft / size) * 100) if size else 0
|
|
rows.append(f" {status:12s} {pct:3d}% {title}")
|
|
return f"Radarr queue ({len(rows)}):\n" + "\n".join(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SABnzbd
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def sabnzbd_queue() -> str:
|
|
"""Show the SABnzbd download queue."""
|
|
data = _sabnzbd("queue")
|
|
queue = data.get("queue", {})
|
|
slots = queue.get("slots", [])
|
|
if not slots:
|
|
return f"SABnzbd queue empty. Status: {queue.get('status', '?')}"
|
|
rows = []
|
|
for s in slots:
|
|
name = s.get("filename", "?")[:60]
|
|
status = s.get("status", "?")
|
|
pct = s.get("percentage", "0")
|
|
size = s.get("sizeleft", "?")
|
|
rows.append(f" {status:12s} {pct:>4s}% {size:>8s} left {name}")
|
|
speed = queue.get("speed", "0")
|
|
eta = queue.get("timeleft", "?")
|
|
return f"SABnzbd queue ({len(rows)}) — {speed} — ETA {eta}:\n" + "\n".join(rows)
|
|
|
|
|
|
@mcp.tool()
|
|
def sabnzbd_pause() -> str:
|
|
"""Pause the SABnzbd download queue."""
|
|
_sabnzbd("pause")
|
|
return "SABnzbd queue paused."
|
|
|
|
|
|
@mcp.tool()
|
|
def sabnzbd_resume() -> str:
|
|
"""Resume the SABnzbd download queue."""
|
|
_sabnzbd("resume")
|
|
return "SABnzbd queue resumed."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSH
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def ssh_exec(host: str, command: str, timeout: int = 30) -> str:
|
|
"""
|
|
Run a command on a homelab host via SSH.
|
|
|
|
Known hosts: atlantis, calypso, setillo, setillo-root, nuc, homelab-vm, rpi5.
|
|
Requires the host to be in ~/.ssh/config or /etc/hosts.
|
|
|
|
Args:
|
|
host: SSH host alias (e.g. 'atlantis', 'calypso', 'setillo-root').
|
|
command: Shell command to execute remotely.
|
|
timeout: Seconds before the command times out. Default: 30.
|
|
"""
|
|
if host not in SSH_KNOWN_HOSTS:
|
|
return (
|
|
f"Host '{host}' not in allowed list.\n"
|
|
f"Known hosts: {', '.join(SSH_KNOWN_HOSTS)}"
|
|
)
|
|
try:
|
|
result = subprocess.run(
|
|
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", host, command],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
output = result.stdout
|
|
if result.stderr:
|
|
output += f"\n[stderr]\n{result.stderr}"
|
|
if result.returncode != 0:
|
|
output += f"\n[exit code {result.returncode}]"
|
|
return output or "(no output)"
|
|
except subprocess.TimeoutExpired:
|
|
return f"Command timed out after {timeout}s."
|
|
except Exception as e:
|
|
return f"SSH error: {type(e).__name__}: {e}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filesystem
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def fs_read(path: str) -> str:
|
|
"""
|
|
Read a file from the local filesystem.
|
|
|
|
Allowed roots: /home/homelab, /tmp.
|
|
|
|
Args:
|
|
path: Absolute or ~-relative path to the file.
|
|
"""
|
|
try:
|
|
p = _fs_safe(path)
|
|
if not p.exists():
|
|
return f"File not found: {p}"
|
|
if p.is_dir():
|
|
return f"'{p}' is a directory. Use fs_list to list it."
|
|
size = p.stat().st_size
|
|
if size > 1_000_000:
|
|
return f"File too large ({size:,} bytes). Read it in parts or use grep."
|
|
return p.read_text(errors="replace")
|
|
except PermissionError as e:
|
|
return f"Permission denied: {e}"
|
|
except Exception as e:
|
|
return f"Error reading file: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
def fs_write(path: str, content: str) -> str:
|
|
"""
|
|
Write content to a file on the local filesystem.
|
|
|
|
Allowed roots: /home/homelab, /tmp.
|
|
|
|
Args:
|
|
path: Absolute or ~-relative path to the file.
|
|
content: Text content to write.
|
|
"""
|
|
try:
|
|
p = _fs_safe(path)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text(content)
|
|
return f"Written {len(content)} bytes to {p}"
|
|
except PermissionError as e:
|
|
return f"Permission denied: {e}"
|
|
except Exception as e:
|
|
return f"Error writing file: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
def fs_list(path: str = "/home/homelab") -> str:
|
|
"""
|
|
List the contents of a directory on the local filesystem.
|
|
|
|
Allowed roots: /home/homelab, /tmp.
|
|
|
|
Args:
|
|
path: Directory path. Default: /home/homelab.
|
|
"""
|
|
try:
|
|
p = _fs_safe(path)
|
|
if not p.exists():
|
|
return f"Path not found: {p}"
|
|
if not p.is_dir():
|
|
return f"'{p}' is a file, not a directory."
|
|
entries = sorted(p.iterdir(), key=lambda x: (x.is_file(), x.name))
|
|
rows = []
|
|
for entry in entries:
|
|
kind = "DIR " if entry.is_dir() else "FILE"
|
|
size = entry.stat().st_size if entry.is_file() else ""
|
|
size_str = f"{size:>10,}" if size != "" else f"{'':>10}"
|
|
rows.append(f" {kind} {size_str} {entry.name}")
|
|
return f"Contents of {p} ({len(rows)} entries):\n" + "\n".join(rows)
|
|
except PermissionError as e:
|
|
return f"Permission denied: {e}"
|
|
except Exception as e:
|
|
return f"Error listing directory: {e}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
import urllib3
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
mcp.run()
|