Sanitized mirror from private repository - 2026-04-05 13:06:07 UTC
This commit is contained in:
361
scripts/config-drift.py
Normal file
361
scripts/config-drift.py
Normal file
@@ -0,0 +1,361 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Feature 14 — Config Drift Detector.
|
||||
|
||||
Compares git-committed docker-compose files against running containers
|
||||
via Portainer API. Reports drifts via email with optional LLM analysis.
|
||||
|
||||
Cron: 0 7 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/config-drift.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import yaml
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError
|
||||
from lib.notify import send_email
|
||||
from lib.portainer import list_containers, inspect_container, ENDPOINTS
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
REPO_ROOT = Path("/home/homelab/organized/repos/homelab")
|
||||
HOSTS_DIR = REPO_ROOT / "hosts"
|
||||
|
||||
# Map host directory names to Portainer endpoint names
|
||||
HOST_DIR_TO_ENDPOINT = {
|
||||
"atlantis": "atlantis",
|
||||
"calypso": "calypso",
|
||||
"nuc": "nuc",
|
||||
"homelab-vm": "homelab",
|
||||
"rpi5-vish": "rpi5",
|
||||
}
|
||||
|
||||
|
||||
# ── compose parsing ──────────────────────────────────────────────────────────
|
||||
|
||||
def find_compose_files() -> list[Path]:
|
||||
"""Walk hosts/ directory for docker-compose files."""
|
||||
files = []
|
||||
for pattern in ("docker-compose.yml", "docker-compose.yaml"):
|
||||
files.extend(HOSTS_DIR.rglob(pattern))
|
||||
return sorted(files)
|
||||
|
||||
|
||||
def parse_compose_services(compose_path: Path) -> dict:
|
||||
"""Parse a compose file and return declared services with key config."""
|
||||
try:
|
||||
with open(compose_path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
except Exception as e:
|
||||
log.warning("Failed to parse %s: %s", compose_path, e)
|
||||
return {}
|
||||
|
||||
if not data or "services" not in data:
|
||||
return {}
|
||||
|
||||
services = {}
|
||||
for svc_name, svc_config in data["services"].items():
|
||||
services[svc_name] = {
|
||||
"image": svc_config.get("image", ""),
|
||||
"environment": _normalize_env(svc_config.get("environment")),
|
||||
"ports": _normalize_ports(svc_config.get("ports", [])),
|
||||
"volumes": [str(v) for v in svc_config.get("volumes", [])],
|
||||
"restart": svc_config.get("restart", ""),
|
||||
}
|
||||
return services
|
||||
|
||||
|
||||
def _normalize_env(env) -> dict:
|
||||
"""Normalize environment from list or dict to dict of key names."""
|
||||
if env is None:
|
||||
return {}
|
||||
if isinstance(env, dict):
|
||||
return {k: str(v) for k, v in env.items()}
|
||||
if isinstance(env, list):
|
||||
result = {}
|
||||
for item in env:
|
||||
item = str(item)
|
||||
if "=" in item:
|
||||
k, _, v = item.partition("=")
|
||||
result[k] = v
|
||||
else:
|
||||
result[item] = ""
|
||||
return result
|
||||
return {}
|
||||
|
||||
|
||||
def _normalize_ports(ports) -> list[str]:
|
||||
"""Normalize port mappings to strings."""
|
||||
return [str(p) for p in ports] if ports else []
|
||||
|
||||
|
||||
def guess_endpoint(compose_path: Path) -> str | None:
|
||||
"""Determine Portainer endpoint from the compose file's path."""
|
||||
rel = compose_path.relative_to(HOSTS_DIR)
|
||||
parts = rel.parts # e.g. ('synology', 'atlantis', 'arr-suite', 'docker-compose.yml')
|
||||
for part in parts:
|
||||
if part in HOST_DIR_TO_ENDPOINT:
|
||||
return HOST_DIR_TO_ENDPOINT[part]
|
||||
return None
|
||||
|
||||
|
||||
def guess_project_name(compose_path: Path) -> str:
|
||||
"""Guess the compose project name from the directory name."""
|
||||
return compose_path.parent.name
|
||||
|
||||
|
||||
# ── container comparison ─────────────────────────────────────────────────────
|
||||
|
||||
def get_running_services(endpoint: str) -> dict:
|
||||
"""Get running containers grouped by compose project and service."""
|
||||
try:
|
||||
containers = list_containers(endpoint, all_containers=True)
|
||||
except Exception as e:
|
||||
log.warning("Failed to list containers on %s: %s", endpoint, e)
|
||||
return {}
|
||||
|
||||
services = {}
|
||||
for c in containers:
|
||||
labels = c.get("Labels", {})
|
||||
project = labels.get("com.docker.compose.project", "")
|
||||
service = labels.get("com.docker.compose.service", "")
|
||||
if project and service:
|
||||
key = f"{project}/{service}"
|
||||
services[key] = {
|
||||
"id": c["Id"],
|
||||
"image": c.get("Image", ""),
|
||||
"state": c.get("State", ""),
|
||||
"status": c.get("Status", ""),
|
||||
}
|
||||
return services
|
||||
|
||||
|
||||
def compare_service(declared: dict, running_info: dict, endpoint: str) -> list[dict]:
|
||||
"""Compare declared compose config against running container. Returns list of drifts."""
|
||||
drifts = []
|
||||
container_id = running_info["id"]
|
||||
|
||||
# Inspect for full config
|
||||
try:
|
||||
inspection = inspect_container(endpoint, container_id)
|
||||
except Exception as e:
|
||||
log.warning("Failed to inspect container %s: %s", container_id[:12], e)
|
||||
return [{"field": "inspect", "declared": "N/A", "running": f"Error: {e}"}]
|
||||
|
||||
config = inspection.get("Config", {})
|
||||
host_config = inspection.get("HostConfig", {})
|
||||
|
||||
# Image comparison
|
||||
declared_image = declared.get("image", "")
|
||||
running_image = config.get("Image", "")
|
||||
if declared_image and running_image:
|
||||
# Normalize: strip registry prefix for comparison, compare base name
|
||||
d_img = declared_image.split("/")[-1] if "/" in declared_image else declared_image
|
||||
r_img = running_image.split("/")[-1] if "/" in running_image else running_image
|
||||
# Compare tag portion
|
||||
d_tag = d_img.split(":")[-1] if ":" in d_img else "latest"
|
||||
r_tag = r_img.split(":")[-1] if ":" in r_img else "latest"
|
||||
d_name = d_img.split(":")[0]
|
||||
r_name = r_img.split(":")[0]
|
||||
if d_name != r_name or d_tag != r_tag:
|
||||
drifts.append({
|
||||
"field": "image",
|
||||
"declared": declared_image,
|
||||
"running": running_image,
|
||||
})
|
||||
|
||||
# Environment key presence check
|
||||
declared_env_keys = set(declared.get("environment", {}).keys())
|
||||
running_env = {}
|
||||
for item in config.get("Env", []):
|
||||
if "=" in item:
|
||||
k, _, v = item.partition("=")
|
||||
running_env[k] = v
|
||||
running_env_keys = set(running_env.keys())
|
||||
# Only check keys declared in compose that are missing at runtime
|
||||
# Ignore runtime-injected vars (PATH, HOME, etc.)
|
||||
missing_keys = declared_env_keys - running_env_keys
|
||||
if missing_keys:
|
||||
drifts.append({
|
||||
"field": "env_missing",
|
||||
"declared": ", ".join(sorted(missing_keys)),
|
||||
"running": "(not set)",
|
||||
})
|
||||
|
||||
# Restart policy
|
||||
declared_restart = declared.get("restart", "")
|
||||
restart_policy = host_config.get("RestartPolicy", {})
|
||||
running_restart = restart_policy.get("Name", "")
|
||||
# Normalize: "unless-stopped" vs "unless-stopped", "always" vs "always"
|
||||
restart_map = {"no": "", "": "no"}
|
||||
d_r = restart_map.get(declared_restart, declared_restart)
|
||||
r_r = restart_map.get(running_restart, running_restart)
|
||||
if declared_restart and d_r != r_r:
|
||||
drifts.append({
|
||||
"field": "restart_policy",
|
||||
"declared": declared_restart,
|
||||
"running": running_restart or "no",
|
||||
})
|
||||
|
||||
return drifts
|
||||
|
||||
|
||||
# ── report ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def build_report(all_drifts: list[dict], llm_analysis: str = "") -> tuple[str, str]:
|
||||
"""Build markdown text and HTML drift report."""
|
||||
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
|
||||
|
||||
text_lines = [
|
||||
f"# Config Drift Report — {now.strftime('%Y-%m-%d %H:%M %Z')}",
|
||||
"",
|
||||
f"Total drifts found: {sum(len(d['drifts']) for d in all_drifts)}",
|
||||
"",
|
||||
]
|
||||
|
||||
html_parts = [
|
||||
"<html><body>",
|
||||
f"<h2>Config Drift Report</h2>",
|
||||
f"<p>{now.strftime('%Y-%m-%d %H:%M %Z')} — "
|
||||
f"{sum(len(d['drifts']) for d in all_drifts)} drifts found</p>",
|
||||
"<table border='1' cellpadding='6' cellspacing='0' style='border-collapse:collapse;'>",
|
||||
"<tr><th>Endpoint</th><th>Project/Service</th><th>Field</th>"
|
||||
"<th>Declared</th><th>Running</th></tr>",
|
||||
]
|
||||
|
||||
for entry in all_drifts:
|
||||
for drift in entry["drifts"]:
|
||||
text_lines.append(
|
||||
f"| {entry['endpoint']} | {entry['project']}/{entry['service']} "
|
||||
f"| {drift['field']} | {drift['declared']} | {drift['running']} |"
|
||||
)
|
||||
html_parts.append(
|
||||
f"<tr><td>{entry['endpoint']}</td>"
|
||||
f"<td>{entry['project']}/{entry['service']}</td>"
|
||||
f"<td>{drift['field']}</td>"
|
||||
f"<td><code>{drift['declared']}</code></td>"
|
||||
f"<td><code>{drift['running']}</code></td></tr>"
|
||||
)
|
||||
|
||||
html_parts.append("</table>")
|
||||
|
||||
if llm_analysis:
|
||||
text_lines.extend(["", "## LLM Analysis", "", llm_analysis])
|
||||
html_parts.append(f"<h3>LLM Analysis</h3><pre>{llm_analysis}</pre>")
|
||||
|
||||
# Unmatched summary
|
||||
html_parts.append("</body></html>")
|
||||
|
||||
return "\n".join(text_lines), "\n".join(html_parts)
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Config Drift Detector — compare compose files vs running containers")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Print report without sending email")
|
||||
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
compose_files = find_compose_files()
|
||||
log.info("Found %d compose files under %s", len(compose_files), HOSTS_DIR)
|
||||
|
||||
# Cache running containers per endpoint
|
||||
running_cache: dict[str, dict] = {}
|
||||
|
||||
all_drifts = []
|
||||
unmatched_services = []
|
||||
|
||||
for compose_path in compose_files:
|
||||
endpoint = guess_endpoint(compose_path)
|
||||
if not endpoint:
|
||||
log.debug("Skipping %s — no endpoint mapping", compose_path)
|
||||
continue
|
||||
|
||||
project = guess_project_name(compose_path)
|
||||
services = parse_compose_services(compose_path)
|
||||
if not services:
|
||||
log.debug("No services in %s", compose_path)
|
||||
continue
|
||||
|
||||
# Lazy-load running containers for this endpoint
|
||||
if endpoint not in running_cache:
|
||||
running_cache[endpoint] = get_running_services(endpoint)
|
||||
|
||||
running = running_cache[endpoint]
|
||||
|
||||
for svc_name, declared in services.items():
|
||||
key = f"{project}/{svc_name}"
|
||||
if key in running:
|
||||
drifts = compare_service(declared, running[key], endpoint)
|
||||
if drifts:
|
||||
all_drifts.append({
|
||||
"endpoint": endpoint,
|
||||
"project": project,
|
||||
"service": svc_name,
|
||||
"compose_file": str(compose_path),
|
||||
"drifts": drifts,
|
||||
})
|
||||
else:
|
||||
unmatched_services.append({
|
||||
"endpoint": endpoint,
|
||||
"project": project,
|
||||
"service": svc_name,
|
||||
})
|
||||
log.debug("No running container for %s on %s", key, endpoint)
|
||||
|
||||
total_drifts = sum(len(d["drifts"]) for d in all_drifts)
|
||||
log.info("Detected %d drifts across %d services", total_drifts, len(all_drifts))
|
||||
log.info("Unmatched compose services (not running): %d", len(unmatched_services))
|
||||
|
||||
if total_drifts == 0:
|
||||
log.info("No drifts found. Nothing to report.")
|
||||
if args.dry_run:
|
||||
print("No config drifts detected.")
|
||||
return
|
||||
|
||||
# Optional LLM analysis
|
||||
llm_analysis = ""
|
||||
if ollama_available():
|
||||
drift_summary = "\n".join(
|
||||
f"- {d['endpoint']}/{d['project']}/{d['service']}: "
|
||||
+ ", ".join(f"{x['field']} (declared={x['declared']}, running={x['running']})" for x in d["drifts"])
|
||||
for d in all_drifts
|
||||
)
|
||||
prompt = (
|
||||
"Explain these Docker config drifts and their risk level. "
|
||||
"Be concise, rate each as LOW/MEDIUM/HIGH risk:\n\n"
|
||||
+ drift_summary
|
||||
)
|
||||
try:
|
||||
llm_analysis = ollama_generate(prompt)
|
||||
log.info("LLM analysis obtained")
|
||||
except OllamaUnavailableError as e:
|
||||
log.warning("LLM unavailable for analysis: %s", e)
|
||||
else:
|
||||
log.info("Ollama not available, skipping LLM analysis")
|
||||
|
||||
text_report, html_report = build_report(all_drifts, llm_analysis)
|
||||
|
||||
if args.dry_run:
|
||||
print(text_report)
|
||||
return
|
||||
|
||||
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
|
||||
subject = f"Config Drift: {total_drifts} drifts detected — {now.strftime('%b %d')}"
|
||||
send_email(subject=subject, html_body=html_report, text_body=text_report)
|
||||
log.info("Drift report emailed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user