362 lines
13 KiB
Python
362 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Feature 14 — Config Drift Detector.
|
|
|
|
Compares git-committed docker-compose files against running containers
|
|
via Portainer API. Reports drifts via email with optional LLM analysis.
|
|
|
|
Cron: 0 7 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/config-drift.py
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import yaml
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError
|
|
from lib.notify import send_email
|
|
from lib.portainer import list_containers, inspect_container, ENDPOINTS
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
REPO_ROOT = Path("/home/homelab/organized/repos/homelab")
|
|
HOSTS_DIR = REPO_ROOT / "hosts"
|
|
|
|
# Map host directory names to Portainer endpoint names
|
|
HOST_DIR_TO_ENDPOINT = {
|
|
"atlantis": "atlantis",
|
|
"calypso": "calypso",
|
|
"nuc": "nuc",
|
|
"homelab-vm": "homelab",
|
|
"rpi5-vish": "rpi5",
|
|
}
|
|
|
|
|
|
# ── compose parsing ──────────────────────────────────────────────────────────
|
|
|
|
def find_compose_files() -> list[Path]:
|
|
"""Walk hosts/ directory for docker-compose files."""
|
|
files = []
|
|
for pattern in ("docker-compose.yml", "docker-compose.yaml"):
|
|
files.extend(HOSTS_DIR.rglob(pattern))
|
|
return sorted(files)
|
|
|
|
|
|
def parse_compose_services(compose_path: Path) -> dict:
|
|
"""Parse a compose file and return declared services with key config."""
|
|
try:
|
|
with open(compose_path) as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
log.warning("Failed to parse %s: %s", compose_path, e)
|
|
return {}
|
|
|
|
if not data or "services" not in data:
|
|
return {}
|
|
|
|
services = {}
|
|
for svc_name, svc_config in data["services"].items():
|
|
services[svc_name] = {
|
|
"image": svc_config.get("image", ""),
|
|
"environment": _normalize_env(svc_config.get("environment")),
|
|
"ports": _normalize_ports(svc_config.get("ports", [])),
|
|
"volumes": [str(v) for v in svc_config.get("volumes", [])],
|
|
"restart": svc_config.get("restart", ""),
|
|
}
|
|
return services
|
|
|
|
|
|
def _normalize_env(env) -> dict:
|
|
"""Normalize environment from list or dict to dict of key names."""
|
|
if env is None:
|
|
return {}
|
|
if isinstance(env, dict):
|
|
return {k: str(v) for k, v in env.items()}
|
|
if isinstance(env, list):
|
|
result = {}
|
|
for item in env:
|
|
item = str(item)
|
|
if "=" in item:
|
|
k, _, v = item.partition("=")
|
|
result[k] = v
|
|
else:
|
|
result[item] = ""
|
|
return result
|
|
return {}
|
|
|
|
|
|
def _normalize_ports(ports) -> list[str]:
|
|
"""Normalize port mappings to strings."""
|
|
return [str(p) for p in ports] if ports else []
|
|
|
|
|
|
def guess_endpoint(compose_path: Path) -> str | None:
|
|
"""Determine Portainer endpoint from the compose file's path."""
|
|
rel = compose_path.relative_to(HOSTS_DIR)
|
|
parts = rel.parts # e.g. ('synology', 'atlantis', 'arr-suite', 'docker-compose.yml')
|
|
for part in parts:
|
|
if part in HOST_DIR_TO_ENDPOINT:
|
|
return HOST_DIR_TO_ENDPOINT[part]
|
|
return None
|
|
|
|
|
|
def guess_project_name(compose_path: Path) -> str:
|
|
"""Guess the compose project name from the directory name."""
|
|
return compose_path.parent.name
|
|
|
|
|
|
# ── container comparison ─────────────────────────────────────────────────────
|
|
|
|
def get_running_services(endpoint: str) -> dict:
|
|
"""Get running containers grouped by compose project and service."""
|
|
try:
|
|
containers = list_containers(endpoint, all_containers=True)
|
|
except Exception as e:
|
|
log.warning("Failed to list containers on %s: %s", endpoint, e)
|
|
return {}
|
|
|
|
services = {}
|
|
for c in containers:
|
|
labels = c.get("Labels", {})
|
|
project = labels.get("com.docker.compose.project", "")
|
|
service = labels.get("com.docker.compose.service", "")
|
|
if project and service:
|
|
key = f"{project}/{service}"
|
|
services[key] = {
|
|
"id": c["Id"],
|
|
"image": c.get("Image", ""),
|
|
"state": c.get("State", ""),
|
|
"status": c.get("Status", ""),
|
|
}
|
|
return services
|
|
|
|
|
|
def compare_service(declared: dict, running_info: dict, endpoint: str) -> list[dict]:
|
|
"""Compare declared compose config against running container. Returns list of drifts."""
|
|
drifts = []
|
|
container_id = running_info["id"]
|
|
|
|
# Inspect for full config
|
|
try:
|
|
inspection = inspect_container(endpoint, container_id)
|
|
except Exception as e:
|
|
log.warning("Failed to inspect container %s: %s", container_id[:12], e)
|
|
return [{"field": "inspect", "declared": "N/A", "running": f"Error: {e}"}]
|
|
|
|
config = inspection.get("Config", {})
|
|
host_config = inspection.get("HostConfig", {})
|
|
|
|
# Image comparison
|
|
declared_image = declared.get("image", "")
|
|
running_image = config.get("Image", "")
|
|
if declared_image and running_image:
|
|
# Normalize: strip registry prefix for comparison, compare base name
|
|
d_img = declared_image.split("/")[-1] if "/" in declared_image else declared_image
|
|
r_img = running_image.split("/")[-1] if "/" in running_image else running_image
|
|
# Compare tag portion
|
|
d_tag = d_img.split(":")[-1] if ":" in d_img else "latest"
|
|
r_tag = r_img.split(":")[-1] if ":" in r_img else "latest"
|
|
d_name = d_img.split(":")[0]
|
|
r_name = r_img.split(":")[0]
|
|
if d_name != r_name or d_tag != r_tag:
|
|
drifts.append({
|
|
"field": "image",
|
|
"declared": declared_image,
|
|
"running": running_image,
|
|
})
|
|
|
|
# Environment key presence check
|
|
declared_env_keys = set(declared.get("environment", {}).keys())
|
|
running_env = {}
|
|
for item in config.get("Env", []):
|
|
if "=" in item:
|
|
k, _, v = item.partition("=")
|
|
running_env[k] = v
|
|
running_env_keys = set(running_env.keys())
|
|
# Only check keys declared in compose that are missing at runtime
|
|
# Ignore runtime-injected vars (PATH, HOME, etc.)
|
|
missing_keys = declared_env_keys - running_env_keys
|
|
if missing_keys:
|
|
drifts.append({
|
|
"field": "env_missing",
|
|
"declared": ", ".join(sorted(missing_keys)),
|
|
"running": "(not set)",
|
|
})
|
|
|
|
# Restart policy
|
|
declared_restart = declared.get("restart", "")
|
|
restart_policy = host_config.get("RestartPolicy", {})
|
|
running_restart = restart_policy.get("Name", "")
|
|
# Normalize: "unless-stopped" vs "unless-stopped", "always" vs "always"
|
|
restart_map = {"no": "", "": "no"}
|
|
d_r = restart_map.get(declared_restart, declared_restart)
|
|
r_r = restart_map.get(running_restart, running_restart)
|
|
if declared_restart and d_r != r_r:
|
|
drifts.append({
|
|
"field": "restart_policy",
|
|
"declared": declared_restart,
|
|
"running": running_restart or "no",
|
|
})
|
|
|
|
return drifts
|
|
|
|
|
|
# ── report ───────────────────────────────────────────────────────────────────
|
|
|
|
def build_report(all_drifts: list[dict], llm_analysis: str = "") -> tuple[str, str]:
|
|
"""Build markdown text and HTML drift report."""
|
|
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
|
|
|
|
text_lines = [
|
|
f"# Config Drift Report — {now.strftime('%Y-%m-%d %H:%M %Z')}",
|
|
"",
|
|
f"Total drifts found: {sum(len(d['drifts']) for d in all_drifts)}",
|
|
"",
|
|
]
|
|
|
|
html_parts = [
|
|
"<html><body>",
|
|
f"<h2>Config Drift Report</h2>",
|
|
f"<p>{now.strftime('%Y-%m-%d %H:%M %Z')} — "
|
|
f"{sum(len(d['drifts']) for d in all_drifts)} drifts found</p>",
|
|
"<table border='1' cellpadding='6' cellspacing='0' style='border-collapse:collapse;'>",
|
|
"<tr><th>Endpoint</th><th>Project/Service</th><th>Field</th>"
|
|
"<th>Declared</th><th>Running</th></tr>",
|
|
]
|
|
|
|
for entry in all_drifts:
|
|
for drift in entry["drifts"]:
|
|
text_lines.append(
|
|
f"| {entry['endpoint']} | {entry['project']}/{entry['service']} "
|
|
f"| {drift['field']} | {drift['declared']} | {drift['running']} |"
|
|
)
|
|
html_parts.append(
|
|
f"<tr><td>{entry['endpoint']}</td>"
|
|
f"<td>{entry['project']}/{entry['service']}</td>"
|
|
f"<td>{drift['field']}</td>"
|
|
f"<td><code>{drift['declared']}</code></td>"
|
|
f"<td><code>{drift['running']}</code></td></tr>"
|
|
)
|
|
|
|
html_parts.append("</table>")
|
|
|
|
if llm_analysis:
|
|
text_lines.extend(["", "## LLM Analysis", "", llm_analysis])
|
|
html_parts.append(f"<h3>LLM Analysis</h3><pre>{llm_analysis}</pre>")
|
|
|
|
# Unmatched summary
|
|
html_parts.append("</body></html>")
|
|
|
|
return "\n".join(text_lines), "\n".join(html_parts)
|
|
|
|
|
|
# ── main ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Config Drift Detector — compare compose files vs running containers")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print report without sending email")
|
|
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
|
|
compose_files = find_compose_files()
|
|
log.info("Found %d compose files under %s", len(compose_files), HOSTS_DIR)
|
|
|
|
# Cache running containers per endpoint
|
|
running_cache: dict[str, dict] = {}
|
|
|
|
all_drifts = []
|
|
unmatched_services = []
|
|
|
|
for compose_path in compose_files:
|
|
endpoint = guess_endpoint(compose_path)
|
|
if not endpoint:
|
|
log.debug("Skipping %s — no endpoint mapping", compose_path)
|
|
continue
|
|
|
|
project = guess_project_name(compose_path)
|
|
services = parse_compose_services(compose_path)
|
|
if not services:
|
|
log.debug("No services in %s", compose_path)
|
|
continue
|
|
|
|
# Lazy-load running containers for this endpoint
|
|
if endpoint not in running_cache:
|
|
running_cache[endpoint] = get_running_services(endpoint)
|
|
|
|
running = running_cache[endpoint]
|
|
|
|
for svc_name, declared in services.items():
|
|
key = f"{project}/{svc_name}"
|
|
if key in running:
|
|
drifts = compare_service(declared, running[key], endpoint)
|
|
if drifts:
|
|
all_drifts.append({
|
|
"endpoint": endpoint,
|
|
"project": project,
|
|
"service": svc_name,
|
|
"compose_file": str(compose_path),
|
|
"drifts": drifts,
|
|
})
|
|
else:
|
|
unmatched_services.append({
|
|
"endpoint": endpoint,
|
|
"project": project,
|
|
"service": svc_name,
|
|
})
|
|
log.debug("No running container for %s on %s", key, endpoint)
|
|
|
|
total_drifts = sum(len(d["drifts"]) for d in all_drifts)
|
|
log.info("Detected %d drifts across %d services", total_drifts, len(all_drifts))
|
|
log.info("Unmatched compose services (not running): %d", len(unmatched_services))
|
|
|
|
if total_drifts == 0:
|
|
log.info("No drifts found. Nothing to report.")
|
|
if args.dry_run:
|
|
print("No config drifts detected.")
|
|
return
|
|
|
|
# Optional LLM analysis
|
|
llm_analysis = ""
|
|
if ollama_available():
|
|
drift_summary = "\n".join(
|
|
f"- {d['endpoint']}/{d['project']}/{d['service']}: "
|
|
+ ", ".join(f"{x['field']} (declared={x['declared']}, running={x['running']})" for x in d["drifts"])
|
|
for d in all_drifts
|
|
)
|
|
prompt = (
|
|
"Explain these Docker config drifts and their risk level. "
|
|
"Be concise, rate each as LOW/MEDIUM/HIGH risk:\n\n"
|
|
+ drift_summary
|
|
)
|
|
try:
|
|
llm_analysis = ollama_generate(prompt)
|
|
log.info("LLM analysis obtained")
|
|
except OllamaUnavailableError as e:
|
|
log.warning("LLM unavailable for analysis: %s", e)
|
|
else:
|
|
log.info("Ollama not available, skipping LLM analysis")
|
|
|
|
text_report, html_report = build_report(all_drifts, llm_analysis)
|
|
|
|
if args.dry_run:
|
|
print(text_report)
|
|
return
|
|
|
|
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
|
|
subject = f"Config Drift: {total_drifts} drifts detected — {now.strftime('%b %d')}"
|
|
send_email(subject=subject, html_body=html_report, text_body=text_report)
|
|
log.info("Drift report emailed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|