#!/usr/bin/env python3 """Feature 14 — Config Drift Detector. Compares git-committed docker-compose files against running containers via Portainer API. Reports drifts via email with optional LLM analysis. Cron: 0 7 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/config-drift.py """ import argparse import logging import sys from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo import yaml sys.path.insert(0, str(Path(__file__).parent)) from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError from lib.notify import send_email from lib.portainer import list_containers, inspect_container, ENDPOINTS log = logging.getLogger(__name__) REPO_ROOT = Path("/home/homelab/organized/repos/homelab") HOSTS_DIR = REPO_ROOT / "hosts" # Map host directory names to Portainer endpoint names HOST_DIR_TO_ENDPOINT = { "atlantis": "atlantis", "calypso": "calypso", "nuc": "nuc", "homelab-vm": "homelab", "rpi5-vish": "rpi5", } # ── compose parsing ────────────────────────────────────────────────────────── def find_compose_files() -> list[Path]: """Walk hosts/ directory for docker-compose files.""" files = [] for pattern in ("docker-compose.yml", "docker-compose.yaml"): files.extend(HOSTS_DIR.rglob(pattern)) return sorted(files) def parse_compose_services(compose_path: Path) -> dict: """Parse a compose file and return declared services with key config.""" try: with open(compose_path) as f: data = yaml.safe_load(f) except Exception as e: log.warning("Failed to parse %s: %s", compose_path, e) return {} if not data or "services" not in data: return {} services = {} for svc_name, svc_config in data["services"].items(): services[svc_name] = { "image": svc_config.get("image", ""), "environment": _normalize_env(svc_config.get("environment")), "ports": _normalize_ports(svc_config.get("ports", [])), "volumes": [str(v) for v in svc_config.get("volumes", [])], "restart": svc_config.get("restart", ""), } return services def _normalize_env(env) -> dict: """Normalize environment from list or dict to dict of key names.""" if env is None: return {} if isinstance(env, dict): return {k: str(v) for k, v in env.items()} if isinstance(env, list): result = {} for item in env: item = str(item) if "=" in item: k, _, v = item.partition("=") result[k] = v else: result[item] = "" return result return {} def _normalize_ports(ports) -> list[str]: """Normalize port mappings to strings.""" return [str(p) for p in ports] if ports else [] def guess_endpoint(compose_path: Path) -> str | None: """Determine Portainer endpoint from the compose file's path.""" rel = compose_path.relative_to(HOSTS_DIR) parts = rel.parts # e.g. ('synology', 'atlantis', 'arr-suite', 'docker-compose.yml') for part in parts: if part in HOST_DIR_TO_ENDPOINT: return HOST_DIR_TO_ENDPOINT[part] return None def guess_project_name(compose_path: Path) -> str: """Guess the compose project name from the directory name.""" return compose_path.parent.name # ── container comparison ───────────────────────────────────────────────────── def get_running_services(endpoint: str) -> dict: """Get running containers grouped by compose project and service.""" try: containers = list_containers(endpoint, all_containers=True) except Exception as e: log.warning("Failed to list containers on %s: %s", endpoint, e) return {} services = {} for c in containers: labels = c.get("Labels", {}) project = labels.get("com.docker.compose.project", "") service = labels.get("com.docker.compose.service", "") if project and service: key = f"{project}/{service}" services[key] = { "id": c["Id"], "image": c.get("Image", ""), "state": c.get("State", ""), "status": c.get("Status", ""), } return services def compare_service(declared: dict, running_info: dict, endpoint: str) -> list[dict]: """Compare declared compose config against running container. Returns list of drifts.""" drifts = [] container_id = running_info["id"] # Inspect for full config try: inspection = inspect_container(endpoint, container_id) except Exception as e: log.warning("Failed to inspect container %s: %s", container_id[:12], e) return [{"field": "inspect", "declared": "N/A", "running": f"Error: {e}"}] config = inspection.get("Config", {}) host_config = inspection.get("HostConfig", {}) # Image comparison declared_image = declared.get("image", "") running_image = config.get("Image", "") if declared_image and running_image: # Normalize: strip registry prefix for comparison, compare base name d_img = declared_image.split("/")[-1] if "/" in declared_image else declared_image r_img = running_image.split("/")[-1] if "/" in running_image else running_image # Compare tag portion d_tag = d_img.split(":")[-1] if ":" in d_img else "latest" r_tag = r_img.split(":")[-1] if ":" in r_img else "latest" d_name = d_img.split(":")[0] r_name = r_img.split(":")[0] if d_name != r_name or d_tag != r_tag: drifts.append({ "field": "image", "declared": declared_image, "running": running_image, }) # Environment key presence check declared_env_keys = set(declared.get("environment", {}).keys()) running_env = {} for item in config.get("Env", []): if "=" in item: k, _, v = item.partition("=") running_env[k] = v running_env_keys = set(running_env.keys()) # Only check keys declared in compose that are missing at runtime # Ignore runtime-injected vars (PATH, HOME, etc.) missing_keys = declared_env_keys - running_env_keys if missing_keys: drifts.append({ "field": "env_missing", "declared": ", ".join(sorted(missing_keys)), "running": "(not set)", }) # Restart policy declared_restart = declared.get("restart", "") restart_policy = host_config.get("RestartPolicy", {}) running_restart = restart_policy.get("Name", "") # Normalize: "unless-stopped" vs "unless-stopped", "always" vs "always" restart_map = {"no": "", "": "no"} d_r = restart_map.get(declared_restart, declared_restart) r_r = restart_map.get(running_restart, running_restart) if declared_restart and d_r != r_r: drifts.append({ "field": "restart_policy", "declared": declared_restart, "running": running_restart or "no", }) return drifts # ── report ─────────────────────────────────────────────────────────────────── def build_report(all_drifts: list[dict], llm_analysis: str = "") -> tuple[str, str]: """Build markdown text and HTML drift report.""" now = datetime.now(tz=ZoneInfo("America/Los_Angeles")) text_lines = [ f"# Config Drift Report — {now.strftime('%Y-%m-%d %H:%M %Z')}", "", f"Total drifts found: {sum(len(d['drifts']) for d in all_drifts)}", "", ] html_parts = [ "
", f"{now.strftime('%Y-%m-%d %H:%M %Z')} — " f"{sum(len(d['drifts']) for d in all_drifts)} drifts found
", "| Endpoint | Project/Service | Field | " "Declared | Running |
|---|---|---|---|---|
| {entry['endpoint']} | " f"{entry['project']}/{entry['service']} | " f"{drift['field']} | " f"{drift['declared']} | "
f"{drift['running']} |
{llm_analysis}")
# Unmatched summary
html_parts.append("")
return "\n".join(text_lines), "\n".join(html_parts)
# ── main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Config Drift Detector — compare compose files vs running containers")
parser.add_argument("--dry-run", action="store_true", help="Print report without sending email")
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
compose_files = find_compose_files()
log.info("Found %d compose files under %s", len(compose_files), HOSTS_DIR)
# Cache running containers per endpoint
running_cache: dict[str, dict] = {}
all_drifts = []
unmatched_services = []
for compose_path in compose_files:
endpoint = guess_endpoint(compose_path)
if not endpoint:
log.debug("Skipping %s — no endpoint mapping", compose_path)
continue
project = guess_project_name(compose_path)
services = parse_compose_services(compose_path)
if not services:
log.debug("No services in %s", compose_path)
continue
# Lazy-load running containers for this endpoint
if endpoint not in running_cache:
running_cache[endpoint] = get_running_services(endpoint)
running = running_cache[endpoint]
for svc_name, declared in services.items():
key = f"{project}/{svc_name}"
if key in running:
drifts = compare_service(declared, running[key], endpoint)
if drifts:
all_drifts.append({
"endpoint": endpoint,
"project": project,
"service": svc_name,
"compose_file": str(compose_path),
"drifts": drifts,
})
else:
unmatched_services.append({
"endpoint": endpoint,
"project": project,
"service": svc_name,
})
log.debug("No running container for %s on %s", key, endpoint)
total_drifts = sum(len(d["drifts"]) for d in all_drifts)
log.info("Detected %d drifts across %d services", total_drifts, len(all_drifts))
log.info("Unmatched compose services (not running): %d", len(unmatched_services))
if total_drifts == 0:
log.info("No drifts found. Nothing to report.")
if args.dry_run:
print("No config drifts detected.")
return
# Optional LLM analysis
llm_analysis = ""
if ollama_available():
drift_summary = "\n".join(
f"- {d['endpoint']}/{d['project']}/{d['service']}: "
+ ", ".join(f"{x['field']} (declared={x['declared']}, running={x['running']})" for x in d["drifts"])
for d in all_drifts
)
prompt = (
"Explain these Docker config drifts and their risk level. "
"Be concise, rate each as LOW/MEDIUM/HIGH risk:\n\n"
+ drift_summary
)
try:
llm_analysis = ollama_generate(prompt)
log.info("LLM analysis obtained")
except OllamaUnavailableError as e:
log.warning("LLM unavailable for analysis: %s", e)
else:
log.info("Ollama not available, skipping LLM analysis")
text_report, html_report = build_report(all_drifts, llm_analysis)
if args.dry_run:
print(text_report)
return
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
subject = f"Config Drift: {total_drifts} drifts detected — {now.strftime('%b %d')}"
send_email(subject=subject, html_body=html_report, text_body=text_report)
log.info("Drift report emailed")
if __name__ == "__main__":
main()