#!/usr/bin/env python3 """Auto-restart unhealthy containers with LLM safety analysis. Monitors all Portainer endpoints for unhealthy containers. If a container has been unhealthy for >5 minutes and passes safety checks (blacklist, rate limit, LLM analysis), it gets restarted automatically. SQLite DB tracks unhealthy state durations and restart counts. Cron: */5 * * * * /usr/bin/python3 /home/homelab/organized/repos/homelab/scripts/stack-restart.py """ import argparse import logging import sqlite3 import sys from datetime import datetime, timedelta from pathlib import Path from zoneinfo import ZoneInfo sys.path.insert(0, str(Path(__file__).parent)) from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError from lib.notify import send_email, send_ntfy from lib.portainer import list_containers, get_container_logs, restart_container, ENDPOINTS # ── config ─────────────────────────────────────────────────────────────────── DB_PATH = Path(__file__).parent / "stack-restart.db" TIMEZONE = ZoneInfo("America/Los_Angeles") NTFY_TOPIC = "REDACTED_NTFY_TOPIC" UNHEALTHY_GRACE_SECONDS = 300 # 5 minutes MAX_RESTARTS_PER_HOUR = 2 LOG_TAIL_LINES = 50 RESTART_BLACKLIST = [ "portainer", "headscale", "adguard", "postgres", "mariadb", "mongodb", "redis", ] log = logging.getLogger("stack-restart") # ── database ───────────────────────────────────────────────────────────────── def init_db(db_path: Path) -> sqlite3.Connection: """Initialize SQLite database and return connection.""" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row conn.execute(""" CREATE TABLE IF NOT EXISTS unhealthy_tracking ( container_id TEXT, endpoint TEXT, first_seen TEXT, last_checked TEXT, restart_count INTEGER DEFAULT 0, last_restart TEXT, PRIMARY KEY (container_id, endpoint) ) """) conn.commit() return conn def get_tracking(conn: sqlite3.Connection, container_id: str, endpoint: str) -> dict | None: """Get tracking record for a container.""" row = conn.execute( "SELECT * FROM unhealthy_tracking WHERE container_id = ? AND endpoint = ?", (container_id, endpoint), ).fetchone() return dict(row) if row else None def upsert_tracking(conn: sqlite3.Connection, container_id: str, endpoint: str, first_seen: str, last_checked: str, restart_count: int = 0, last_restart: str | None = None): """Insert or update tracking record.""" conn.execute(""" INSERT INTO unhealthy_tracking (container_id, endpoint, first_seen, last_checked, restart_count, last_restart) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(container_id, endpoint) DO UPDATE SET last_checked = ?, restart_count = ?, last_restart = COALESCE(?, last_restart) """, (container_id, endpoint, first_seen, last_checked, restart_count, last_restart, last_checked, restart_count, last_restart)) conn.commit() def remove_tracking(conn: sqlite3.Connection, container_id: str, endpoint: str): """Remove tracking for a container that is now healthy.""" conn.execute( "DELETE FROM unhealthy_tracking WHERE container_id = ? AND endpoint = ?", (container_id, endpoint), ) conn.commit() def cleanup_stale(conn: sqlite3.Connection, active_ids: set[tuple[str, str]]): """Remove tracking entries for containers no longer unhealthy.""" rows = conn.execute("SELECT container_id, endpoint FROM unhealthy_tracking").fetchall() for row in rows: key = (row["container_id"], row["endpoint"]) if key not in active_ids: log.info("Cleaning up healthy container: %s on %s", key[0][:12], key[1]) remove_tracking(conn, key[0], key[1]) # ── container helpers ──────────────────────────────────────────────────────── def get_container_name(container: dict) -> str: """Extract human-readable container name.""" names = container.get("Names", []) if names: return names[0].lstrip("/") return container.get("Id", "unknown")[:12] def is_blacklisted(name: str) -> bool: """Check if container name matches any blacklist entry.""" name_lower = name.lower() return any(bl in name_lower for bl in RESTART_BLACKLIST) def get_health_status(container: dict) -> str | None: """Extract health status from container state.""" state = container.get("State", "") # Docker API JSON list format: State is a string, Status has health info status = container.get("Status", "") if "(unhealthy)" in status.lower(): return "unhealthy" # Some API versions nest it if isinstance(state, dict): health = state.get("Health", {}) if isinstance(health, dict): return health.get("Status") return None def restarts_in_last_hour(tracking: dict) -> int: """Count how many restarts happened in the last hour based on tracking.""" if not tracking or not tracking.get("last_restart"): return 0 last = datetime.fromisoformat(tracking["last_restart"]) now = datetime.now(TIMEZONE) if last.tzinfo is None: last = last.replace(tzinfo=TIMEZONE) if (now - last) < timedelta(hours=1): return tracking.get("restart_count", 0) return 0 # ── main logic ─────────────────────────────────────────────────────────────── def process_unhealthy(conn: sqlite3.Connection, endpoint: str, container: dict, dry_run: bool, use_ollama: bool): """Process a single unhealthy container.""" cid = container.get("Id", "") name = get_container_name(container) now_str = datetime.now(TIMEZONE).isoformat() log.info("Unhealthy container: %s (%s) on %s", name, cid[:12], endpoint) # Check/insert tracking tracking = get_tracking(conn, cid, endpoint) if not tracking: log.info("First seen, starting grace period: %s", name) upsert_tracking(conn, cid, endpoint, first_seen=now_str, last_checked=now_str) return # Update last_checked first_seen = datetime.fromisoformat(tracking["first_seen"]) if first_seen.tzinfo is None: first_seen = first_seen.replace(tzinfo=TIMEZONE) now = datetime.now(TIMEZONE) duration = (now - first_seen).total_seconds() upsert_tracking(conn, cid, endpoint, first_seen=tracking["first_seen"], last_checked=now_str, restart_count=tracking.get("restart_count", 0), last_restart=tracking.get("last_restart")) if duration < UNHEALTHY_GRACE_SECONDS: remaining = UNHEALTHY_GRACE_SECONDS - duration log.info("Still in grace period (%ds remaining): %s", remaining, name) return log.info("Container %s unhealthy for %ds (past grace period)", name, duration) # Check blacklist if is_blacklisted(name): log.warning("Container %s is blacklisted, notifying only", name) msg = f"Unhealthy (blacklisted, no auto-restart): {name} on {endpoint} ({duration:.0f}s)" if not dry_run: send_email(subject=f"[Homelab] Unhealthy: {name}", text_body=msg) else: log.info("[DRY-RUN] Would notify: %s", msg) return # Check restart rate limit recent_restarts = restarts_in_last_hour(tracking) if recent_restarts >= MAX_RESTARTS_PER_HOUR: log.warning("Container %s hit restart limit (%d/%d in last hour)", name, recent_restarts, MAX_RESTARTS_PER_HOUR) msg = f"Unhealthy (restart limit reached): {name} on {endpoint} — {recent_restarts} restarts in last hour" if not dry_run: send_email(subject=f"[Homelab] Restart limit: {name}", text_body=msg) else: log.info("[DRY-RUN] Would notify: %s", msg) return # Get logs for analysis try: logs = get_container_logs(endpoint, cid, tail=LOG_TAIL_LINES) except Exception as e: log.error("Failed to get logs for %s: %s", name, e) logs = f"(failed to fetch logs: {e})" # LLM analysis or raw notification if use_ollama: prompt = ( "Analyze these container logs. Is it safe to restart this container? " "Consider: data corruption risk, dependency chains, active operations. " "Reply with SAFE or UNSAFE followed by your reasoning.\n\n" f"Container: {name}\n" f"Endpoint: {endpoint}\n" f"Unhealthy duration: {duration:.0f}s\n\n" f"Logs:\n```\n{logs}\n```" ) try: llm_response = ollama_generate(prompt) log.info("LLM response for %s: %s", name, llm_response[:200]) first_line = llm_response.strip().split("\n")[0].lower() is_safe = "safe" in first_line and "unsafe" not in first_line if is_safe: log.info("LLM says SAFE to restart %s", name) if not dry_run: success = restart_container(endpoint, cid) new_count = tracking.get("restart_count", 0) + 1 upsert_tracking(conn, cid, endpoint, first_seen=tracking["first_seen"], last_checked=now_str, restart_count=new_count, last_restart=now_str) status = "restarted" if success else "restart FAILED" msg = f"Container {name} on {endpoint} {status} (LLM: SAFE)\n\n{llm_response}" send_email(subject=f"[Homelab] Restarted: {name}", text_body=msg) else: log.info("[DRY-RUN] Would restart: %s", name) else: log.warning("LLM says UNSAFE for %s", name) msg = f"Unhealthy but LLM says UNSAFE: {name} on {endpoint}\n\n{llm_response}" if not dry_run: send_email(subject=f"[Homelab] Unsafe to restart: {name}", text_body=msg) else: log.info("[DRY-RUN] Would notify (UNSAFE): %s", name) except OllamaUnavailableError as e: log.warning("Ollama failed during analysis: %s", e) msg = f"Unhealthy (LLM unavailable, no auto-restart): {name} on {endpoint}\n\nLast {LOG_TAIL_LINES} log lines:\n{logs}" if not dry_run: send_ntfy(topic=NTFY_TOPIC, title=f"Unhealthy: {name}", message=msg[:512], priority="high") else: # No Ollama — notify only, never restart log.info("Ollama unavailable, notifying only for %s", name) msg = f"Unhealthy (no LLM, no auto-restart): {name} on {endpoint} ({duration:.0f}s)\n\nLast {LOG_TAIL_LINES} log lines:\n{logs}" if not dry_run: send_ntfy(topic=NTFY_TOPIC, title=f"Unhealthy: {name}", message=msg[:512], priority="high") else: log.info("[DRY-RUN] Would notify: %s", msg[:200]) def main(): parser = argparse.ArgumentParser(description="Auto-restart unhealthy containers with LLM safety check") parser.add_argument("--dry-run", action="store_true", help="Log actions without restarting or notifying") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", ) log.info("Starting stack-restart check") conn = init_db(DB_PATH) use_ollama = ollama_available() if use_ollama: log.info("Ollama is available for safety analysis") else: log.info("Ollama is NOT available — will notify only, no auto-restarts") # Track all currently unhealthy containers for cleanup active_unhealthy: set[tuple[str, str]] = set() for endpoint in ENDPOINTS: log.info("Checking endpoint: %s", endpoint) try: containers = list_containers(endpoint, all_containers=True) except Exception as e: log.error("Failed to list containers on %s: %s", endpoint, e) continue for container in containers: health = get_health_status(container) cid = container.get("Id", "") if health == "unhealthy": active_unhealthy.add((cid, endpoint)) try: process_unhealthy(conn, endpoint, container, args.dry_run, use_ollama) except Exception as e: name = get_container_name(container) log.error("Error processing %s on %s: %s", name, endpoint, e) # Clean up containers that recovered cleanup_stale(conn, active_unhealthy) conn.close() log.info("Stack-restart check complete") if __name__ == "__main__": main()