#!/usr/bin/env python3 """Feature 14 — Config Drift Detector. Compares git-committed docker-compose files against running containers via Portainer API. Reports drifts via email with optional LLM analysis. Cron: 0 7 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/config-drift.py """ import argparse import logging import sys from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo import yaml sys.path.insert(0, str(Path(__file__).parent)) from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError from lib.notify import send_email from lib.portainer import list_containers, inspect_container, ENDPOINTS log = logging.getLogger(__name__) REPO_ROOT = Path("/home/homelab/organized/repos/homelab") HOSTS_DIR = REPO_ROOT / "hosts" # Map host directory names to Portainer endpoint names HOST_DIR_TO_ENDPOINT = { "atlantis": "atlantis", "calypso": "calypso", "nuc": "nuc", "homelab-vm": "homelab", "rpi5-vish": "rpi5", } # ── compose parsing ────────────────────────────────────────────────────────── def find_compose_files() -> list[Path]: """Walk hosts/ directory for docker-compose files.""" files = [] for pattern in ("docker-compose.yml", "docker-compose.yaml"): files.extend(HOSTS_DIR.rglob(pattern)) return sorted(files) def parse_compose_services(compose_path: Path) -> dict: """Parse a compose file and return declared services with key config.""" try: with open(compose_path) as f: data = yaml.safe_load(f) except Exception as e: log.warning("Failed to parse %s: %s", compose_path, e) return {} if not data or "services" not in data: return {} services = {} for svc_name, svc_config in data["services"].items(): services[svc_name] = { "image": svc_config.get("image", ""), "environment": _normalize_env(svc_config.get("environment")), "ports": _normalize_ports(svc_config.get("ports", [])), "volumes": [str(v) for v in svc_config.get("volumes", [])], "restart": svc_config.get("restart", ""), } return services def _normalize_env(env) -> dict: """Normalize environment from list or dict to dict of key names.""" if env is None: return {} if isinstance(env, dict): return {k: str(v) for k, v in env.items()} if isinstance(env, list): result = {} for item in env: item = str(item) if "=" in item: k, _, v = item.partition("=") result[k] = v else: result[item] = "" return result return {} def _normalize_ports(ports) -> list[str]: """Normalize port mappings to strings.""" return [str(p) for p in ports] if ports else [] def guess_endpoint(compose_path: Path) -> str | None: """Determine Portainer endpoint from the compose file's path.""" rel = compose_path.relative_to(HOSTS_DIR) parts = rel.parts # e.g. ('synology', 'atlantis', 'arr-suite', 'docker-compose.yml') for part in parts: if part in HOST_DIR_TO_ENDPOINT: return HOST_DIR_TO_ENDPOINT[part] return None def guess_project_name(compose_path: Path) -> str: """Guess the compose project name from the directory name.""" return compose_path.parent.name # ── container comparison ───────────────────────────────────────────────────── def get_running_services(endpoint: str) -> dict: """Get running containers grouped by compose project and service.""" try: containers = list_containers(endpoint, all_containers=True) except Exception as e: log.warning("Failed to list containers on %s: %s", endpoint, e) return {} services = {} for c in containers: labels = c.get("Labels", {}) project = labels.get("com.docker.compose.project", "") service = labels.get("com.docker.compose.service", "") if project and service: key = f"{project}/{service}" services[key] = { "id": c["Id"], "image": c.get("Image", ""), "state": c.get("State", ""), "status": c.get("Status", ""), } return services def compare_service(declared: dict, running_info: dict, endpoint: str) -> list[dict]: """Compare declared compose config against running container. Returns list of drifts.""" drifts = [] container_id = running_info["id"] # Inspect for full config try: inspection = inspect_container(endpoint, container_id) except Exception as e: log.warning("Failed to inspect container %s: %s", container_id[:12], e) return [{"field": "inspect", "declared": "N/A", "running": f"Error: {e}"}] config = inspection.get("Config", {}) host_config = inspection.get("HostConfig", {}) # Image comparison declared_image = declared.get("image", "") running_image = config.get("Image", "") if declared_image and running_image: # Normalize: strip registry prefix for comparison, compare base name d_img = declared_image.split("/")[-1] if "/" in declared_image else declared_image r_img = running_image.split("/")[-1] if "/" in running_image else running_image # Compare tag portion d_tag = d_img.split(":")[-1] if ":" in d_img else "latest" r_tag = r_img.split(":")[-1] if ":" in r_img else "latest" d_name = d_img.split(":")[0] r_name = r_img.split(":")[0] if d_name != r_name or d_tag != r_tag: drifts.append({ "field": "image", "declared": declared_image, "running": running_image, }) # Environment key presence check declared_env_keys = set(declared.get("environment", {}).keys()) running_env = {} for item in config.get("Env", []): if "=" in item: k, _, v = item.partition("=") running_env[k] = v running_env_keys = set(running_env.keys()) # Only check keys declared in compose that are missing at runtime # Ignore runtime-injected vars (PATH, HOME, etc.) missing_keys = declared_env_keys - running_env_keys if missing_keys: drifts.append({ "field": "env_missing", "declared": ", ".join(sorted(missing_keys)), "running": "(not set)", }) # Restart policy declared_restart = declared.get("restart", "") restart_policy = host_config.get("RestartPolicy", {}) running_restart = restart_policy.get("Name", "") # Normalize: "unless-stopped" vs "unless-stopped", "always" vs "always" restart_map = {"no": "", "": "no"} d_r = restart_map.get(declared_restart, declared_restart) r_r = restart_map.get(running_restart, running_restart) if declared_restart and d_r != r_r: drifts.append({ "field": "restart_policy", "declared": declared_restart, "running": running_restart or "no", }) return drifts # ── report ─────────────────────────────────────────────────────────────────── def build_report(all_drifts: list[dict], llm_analysis: str = "") -> tuple[str, str]: """Build markdown text and HTML drift report.""" now = datetime.now(tz=ZoneInfo("America/Los_Angeles")) text_lines = [ f"# Config Drift Report — {now.strftime('%Y-%m-%d %H:%M %Z')}", "", f"Total drifts found: {sum(len(d['drifts']) for d in all_drifts)}", "", ] html_parts = [ "", f"

Config Drift Report

", f"

{now.strftime('%Y-%m-%d %H:%M %Z')} — " f"{sum(len(d['drifts']) for d in all_drifts)} drifts found

", "", "" "", ] for entry in all_drifts: for drift in entry["drifts"]: text_lines.append( f"| {entry['endpoint']} | {entry['project']}/{entry['service']} " f"| {drift['field']} | {drift['declared']} | {drift['running']} |" ) html_parts.append( f"" f"" f"" f"" f"" ) html_parts.append("
EndpointProject/ServiceFieldDeclaredRunning
{entry['endpoint']}{entry['project']}/{entry['service']}{drift['field']}{drift['declared']}{drift['running']}
") if llm_analysis: text_lines.extend(["", "## LLM Analysis", "", llm_analysis]) html_parts.append(f"

LLM Analysis

{llm_analysis}
") # Unmatched summary html_parts.append("") return "\n".join(text_lines), "\n".join(html_parts) # ── main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Config Drift Detector — compare compose files vs running containers") parser.add_argument("--dry-run", action="store_true", help="Print report without sending email") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) compose_files = find_compose_files() log.info("Found %d compose files under %s", len(compose_files), HOSTS_DIR) # Cache running containers per endpoint running_cache: dict[str, dict] = {} all_drifts = [] unmatched_services = [] for compose_path in compose_files: endpoint = guess_endpoint(compose_path) if not endpoint: log.debug("Skipping %s — no endpoint mapping", compose_path) continue project = guess_project_name(compose_path) services = parse_compose_services(compose_path) if not services: log.debug("No services in %s", compose_path) continue # Lazy-load running containers for this endpoint if endpoint not in running_cache: running_cache[endpoint] = get_running_services(endpoint) running = running_cache[endpoint] for svc_name, declared in services.items(): key = f"{project}/{svc_name}" if key in running: drifts = compare_service(declared, running[key], endpoint) if drifts: all_drifts.append({ "endpoint": endpoint, "project": project, "service": svc_name, "compose_file": str(compose_path), "drifts": drifts, }) else: unmatched_services.append({ "endpoint": endpoint, "project": project, "service": svc_name, }) log.debug("No running container for %s on %s", key, endpoint) total_drifts = sum(len(d["drifts"]) for d in all_drifts) log.info("Detected %d drifts across %d services", total_drifts, len(all_drifts)) log.info("Unmatched compose services (not running): %d", len(unmatched_services)) if total_drifts == 0: log.info("No drifts found. Nothing to report.") if args.dry_run: print("No config drifts detected.") return # Optional LLM analysis llm_analysis = "" if ollama_available(): drift_summary = "\n".join( f"- {d['endpoint']}/{d['project']}/{d['service']}: " + ", ".join(f"{x['field']} (declared={x['declared']}, running={x['running']})" for x in d["drifts"]) for d in all_drifts ) prompt = ( "Explain these Docker config drifts and their risk level. " "Be concise, rate each as LOW/MEDIUM/HIGH risk:\n\n" + drift_summary ) try: llm_analysis = ollama_generate(prompt) log.info("LLM analysis obtained") except OllamaUnavailableError as e: log.warning("LLM unavailable for analysis: %s", e) else: log.info("Ollama not available, skipping LLM analysis") text_report, html_report = build_report(all_drifts, llm_analysis) if args.dry_run: print(text_report) return now = datetime.now(tz=ZoneInfo("America/Los_Angeles")) subject = f"Config Drift: {total_drifts} drifts detected — {now.strftime('%b %d')}" send_email(subject=subject, html_body=html_report, text_body=text_report) log.info("Drift report emailed") if __name__ == "__main__": main()