#!/usr/bin/env python3 """Predict disk space exhaustion using Prometheus metrics and linear regression. Queries 30 days of filesystem availability data, fits a linear trend per filesystem, and predicts when each volume will reach 90% usage. Sends report via email with optional LLM summary. Cron: 0 9 * * 0 /usr/bin/python3 /home/homelab/organized/repos/homelab/scripts/disk-predictor.py """ import argparse import logging import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError from lib.notify import send_email, send_ntfy from lib.prometheus import prom_query, prom_query_range # ── config ─────────────────────────────────────────────────────────────────── AVAIL_QUERY = 'node_filesystem_avail_bytes{fstype!~"tmpfs|devtmpfs|overlay"}' SIZE_QUERY = 'node_filesystem_size_bytes{fstype!~"tmpfs|devtmpfs|overlay"}' RANGE_DAYS = 30 STEP = "1d" WARN_DAYS = 30 NTFY_TOPIC = "REDACTED_NTFY_TOPIC" log = logging.getLogger("disk-predictor") # ── linear regression (no numpy) ──────────────────────────────────────────── def linear_regression(xs: list[float], ys: list[float]) -> tuple[float, float]: """Least-squares linear regression. Returns (slope, intercept). slope = (n*sum(xy) - sum(x)*sum(y)) / (n*sum(x^2) - (sum(x))^2) """ n = len(xs) if n < 2: return 0.0, ys[0] if ys else 0.0 sx = sum(xs) sy = sum(ys) sxy = sum(x * y for x, y in zip(xs, ys)) sx2 = sum(x * x for x in xs) denom = n * sx2 - sx * sx if abs(denom) < 1e-12: return 0.0, sy / n slope = (n * sxy - sx * sy) / denom intercept = (sy - slope * sx) / n return slope, intercept def predict_days_to_threshold( timestamps: list[float], avail_values: list[float], total_bytes: float, threshold_pct: float = 0.10, ) -> float | None: """Predict days until available space drops to threshold_pct of total. Returns None if trend is flat/increasing (never fills) or already below threshold. Returns negative if already past threshold. """ if not timestamps or not avail_values or total_bytes <= 0: return None threshold_bytes = total_bytes * threshold_pct # Normalize timestamps to days from start t0 = timestamps[0] xs = [(t - t0) / 86400.0 for t in timestamps] ys = avail_values slope, intercept = linear_regression(xs, ys) if slope >= 0: # Disk is stable or growing — won't fill return None # Solve: slope * days + intercept = threshold_bytes # days = (threshold_bytes - intercept) / slope current_day = xs[-1] current_avail = slope * current_day + intercept days_from_now = (threshold_bytes - current_avail) / slope return days_from_now # ── data fetching ──────────────────────────────────────────────────────────── def make_fs_key(metric: dict) -> str: """Create a unique key from metric labels.""" m = metric.get("metric", {}) instance = m.get("instance", "unknown") host = instance.split(":")[0] mount = m.get("mountpoint", "unknown") device = m.get("device", "unknown") return f"{host}|{mount}|{device}" def fetch_data() -> list[dict]: """Fetch range and instant data from Prometheus, compute predictions.""" now = int(time.time()) start = now - RANGE_DAYS * 86400 log.info("Querying Prometheus for %d-day range data", RANGE_DAYS) range_results = prom_query_range( query=AVAIL_QUERY, start=str(start), end=str(now), step=STEP, ) log.info("Querying Prometheus for current filesystem sizes") size_results = prom_query(SIZE_QUERY) # Build size lookup size_map = {} for r in size_results: key = make_fs_key(r) val = r.get("value", [None, "0"]) size_map[key] = float(val[1]) # Process each filesystem filesystems = [] for r in range_results: key = make_fs_key(r) m = r.get("metric", {}) instance = m.get("instance", "unknown") host = instance.split(":")[0] mount = m.get("mountpoint", "unknown") values = r.get("values", []) if len(values) < 2: log.debug("Skipping %s (insufficient data points: %d)", key, len(values)) continue timestamps = [float(v[0]) for v in values] avail_values = [float(v[1]) for v in values] total_bytes = size_map.get(key, 0) if total_bytes <= 0: log.debug("Skipping %s (no size data)", key) continue current_avail = avail_values[-1] current_used_pct = (1.0 - current_avail / total_bytes) * 100.0 days = predict_days_to_threshold(timestamps, avail_values, total_bytes) # Determine trend slope, _ = linear_regression( [(t - timestamps[0]) / 86400.0 for t in timestamps], avail_values, ) if slope < -1e6: trend = "decreasing" elif slope > 1e6: trend = "increasing" else: trend = "stable" # Bytes per day consumption rate rate_gb_day = abs(slope) / (1024 ** 3) if slope < 0 else 0 filesystems.append({ "host": host, "mount": mount, "total_gb": total_bytes / (1024 ** 3), "avail_gb": current_avail / (1024 ** 3), "used_pct": current_used_pct, "days_to_90": days, "trend": trend, "rate_gb_day": rate_gb_day, "data_points": len(values), }) filesystems.sort(key=lambda f: (f["days_to_90"] is not None, f["days_to_90"] or 9999)) return filesystems # ── report ─────────────────────────────────────────────────────────────────── def build_report(filesystems: list[dict]) -> str: """Build a plain-text report table.""" lines = [] lines.append("Disk Space Prediction Report") lines.append("=" * 90) lines.append("") header = f"{'Host':<20} {'Mount':<25} {'Used%':>6} {'Days to 90%':>12} {'Trend':<12} {'Rate GB/day':>11}" lines.append(header) lines.append("-" * 90) warnings = [] for fs in filesystems: days_str = f"{fs['days_to_90']:.0f}" if fs["days_to_90"] is not None else "never" rate_str = f"{fs['rate_gb_day']:.2f}" if fs["rate_gb_day"] > 0 else "-" line = f"{fs['host']:<20} {fs['mount']:<25} {fs['used_pct']:>5.1f}% {days_str:>12} {fs['trend']:<12} {rate_str:>11}" lines.append(line) if fs["days_to_90"] is not None and fs["days_to_90"] <= WARN_DAYS: warnings.append(fs) lines.append("") lines.append(f"Total filesystems: {len(filesystems)}") if warnings: lines.append(f"WARNING: {len(warnings)} filesystem(s) predicted to reach 90% within {WARN_DAYS} days!") for w in warnings: lines.append(f" - {w['host']}:{w['mount']} ({w['used_pct']:.1f}% used, ~{w['days_to_90']:.0f} days)") return "\n".join(lines) # ── main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Predict disk space exhaustion from Prometheus metrics") parser.add_argument("--dry-run", action="store_true", help="Print report without sending notifications") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", ) log.info("Fetching disk metrics from Prometheus") try: filesystems = fetch_data() except Exception as e: log.error("Failed to fetch Prometheus data: %s", e) if not args.dry_run: send_email( subject="Disk Predictor: FETCH ERROR", text_body=f"Failed to query Prometheus:\n{e}", ) return report = build_report(filesystems) log.info("Report:\n%s", report) # LLM summary llm_summary = None if ollama_available(): log.info("Ollama available, requesting prediction summary") prompt = ( "Summarize these disk predictions. Highlight any volumes " "expected to fill within 30 days.\n\n" f"```\n{report}\n```" ) try: llm_summary = ollama_generate(prompt) log.info("LLM summary received (%d chars)", len(llm_summary)) except OllamaUnavailableError as e: log.warning("Ollama became unavailable: %s", e) else: log.info("Ollama not available, sending raw report") # Build final report final_report = report if llm_summary: final_report += "\n\nLLM Analysis:\n" + "-" * 40 + "\n" + llm_summary warnings = [fs for fs in filesystems if fs["days_to_90"] is not None and fs["days_to_90"] <= WARN_DAYS] subject = "Disk Predictor: WARNING" if warnings else "Disk Predictor: OK" if args.dry_run: print(final_report) return try: send_email(subject=subject, text_body=final_report) log.info("Email sent: %s", subject) except Exception as e: log.error("Failed to send email: %s", e) # Warnings already included in the email report above if __name__ == "__main__": main()