Files
homelab-optimized/scripts/disk-predictor.py
Gitea Mirror Bot 71e08c7c58
Some checks failed
Documentation / Build Docusaurus (push) Failing after 4m59s
Documentation / Deploy to GitHub Pages (push) Has been skipped
Sanitized mirror from private repository - 2026-04-06 02:44:02 UTC
2026-04-06 02:44:02 +00:00

289 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""Predict disk space exhaustion using Prometheus metrics and linear regression.
Queries 30 days of filesystem availability data, fits a linear trend per
filesystem, and predicts when each volume will reach 90% usage. Sends
report via email with optional LLM summary.
Cron: 0 9 * * 0 /usr/bin/python3 /home/homelab/organized/repos/homelab/scripts/disk-predictor.py
"""
import argparse
import logging
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError
from lib.notify import send_email, send_ntfy
from lib.prometheus import prom_query, prom_query_range
# ── config ───────────────────────────────────────────────────────────────────
AVAIL_QUERY = 'node_filesystem_avail_bytes{fstype!~"tmpfs|devtmpfs|overlay"}'
SIZE_QUERY = 'node_filesystem_size_bytes{fstype!~"tmpfs|devtmpfs|overlay"}'
RANGE_DAYS = 30
STEP = "1d"
WARN_DAYS = 30
NTFY_TOPIC = "REDACTED_NTFY_TOPIC"
log = logging.getLogger("disk-predictor")
# ── linear regression (no numpy) ────────────────────────────────────────────
def linear_regression(xs: list[float], ys: list[float]) -> tuple[float, float]:
"""Least-squares linear regression. Returns (slope, intercept).
slope = (n*sum(xy) - sum(x)*sum(y)) / (n*sum(x^2) - (sum(x))^2)
"""
n = len(xs)
if n < 2:
return 0.0, ys[0] if ys else 0.0
sx = sum(xs)
sy = sum(ys)
sxy = sum(x * y for x, y in zip(xs, ys))
sx2 = sum(x * x for x in xs)
denom = n * sx2 - sx * sx
if abs(denom) < 1e-12:
return 0.0, sy / n
slope = (n * sxy - sx * sy) / denom
intercept = (sy - slope * sx) / n
return slope, intercept
def predict_days_to_threshold(
timestamps: list[float],
avail_values: list[float],
total_bytes: float,
threshold_pct: float = 0.10,
) -> float | None:
"""Predict days until available space drops to threshold_pct of total.
Returns None if trend is flat/increasing (never fills) or already below threshold.
Returns negative if already past threshold.
"""
if not timestamps or not avail_values or total_bytes <= 0:
return None
threshold_bytes = total_bytes * threshold_pct
# Normalize timestamps to days from start
t0 = timestamps[0]
xs = [(t - t0) / 86400.0 for t in timestamps]
ys = avail_values
slope, intercept = linear_regression(xs, ys)
if slope >= 0:
# Disk is stable or growing — won't fill
return None
# Solve: slope * days + intercept = threshold_bytes
# days = (threshold_bytes - intercept) / slope
current_day = xs[-1]
current_avail = slope * current_day + intercept
days_from_now = (threshold_bytes - current_avail) / slope
return days_from_now
# ── data fetching ────────────────────────────────────────────────────────────
def make_fs_key(metric: dict) -> str:
"""Create a unique key from metric labels."""
m = metric.get("metric", {})
instance = m.get("instance", "unknown")
host = instance.split(":")[0]
mount = m.get("mountpoint", "unknown")
device = m.get("device", "unknown")
return f"{host}|{mount}|{device}"
def fetch_data() -> list[dict]:
"""Fetch range and instant data from Prometheus, compute predictions."""
now = int(time.time())
start = now - RANGE_DAYS * 86400
log.info("Querying Prometheus for %d-day range data", RANGE_DAYS)
range_results = prom_query_range(
query=AVAIL_QUERY,
start=str(start),
end=str(now),
step=STEP,
)
log.info("Querying Prometheus for current filesystem sizes")
size_results = prom_query(SIZE_QUERY)
# Build size lookup
size_map = {}
for r in size_results:
key = make_fs_key(r)
val = r.get("value", [None, "0"])
size_map[key] = float(val[1])
# Process each filesystem
filesystems = []
for r in range_results:
key = make_fs_key(r)
m = r.get("metric", {})
instance = m.get("instance", "unknown")
host = instance.split(":")[0]
mount = m.get("mountpoint", "unknown")
values = r.get("values", [])
if len(values) < 2:
log.debug("Skipping %s (insufficient data points: %d)", key, len(values))
continue
timestamps = [float(v[0]) for v in values]
avail_values = [float(v[1]) for v in values]
total_bytes = size_map.get(key, 0)
if total_bytes <= 0:
log.debug("Skipping %s (no size data)", key)
continue
current_avail = avail_values[-1]
current_used_pct = (1.0 - current_avail / total_bytes) * 100.0
days = predict_days_to_threshold(timestamps, avail_values, total_bytes)
# Determine trend
slope, _ = linear_regression(
[(t - timestamps[0]) / 86400.0 for t in timestamps],
avail_values,
)
if slope < -1e6:
trend = "decreasing"
elif slope > 1e6:
trend = "increasing"
else:
trend = "stable"
# Bytes per day consumption rate
rate_gb_day = abs(slope) / (1024 ** 3) if slope < 0 else 0
filesystems.append({
"host": host,
"mount": mount,
"total_gb": total_bytes / (1024 ** 3),
"avail_gb": current_avail / (1024 ** 3),
"used_pct": current_used_pct,
"days_to_90": days,
"trend": trend,
"rate_gb_day": rate_gb_day,
"data_points": len(values),
})
filesystems.sort(key=lambda f: (f["days_to_90"] is not None, f["days_to_90"] or 9999))
return filesystems
# ── report ───────────────────────────────────────────────────────────────────
def build_report(filesystems: list[dict]) -> str:
"""Build a plain-text report table."""
lines = []
lines.append("Disk Space Prediction Report")
lines.append("=" * 90)
lines.append("")
header = f"{'Host':<20} {'Mount':<25} {'Used%':>6} {'Days to 90%':>12} {'Trend':<12} {'Rate GB/day':>11}"
lines.append(header)
lines.append("-" * 90)
warnings = []
for fs in filesystems:
days_str = f"{fs['days_to_90']:.0f}" if fs["days_to_90"] is not None else "never"
rate_str = f"{fs['rate_gb_day']:.2f}" if fs["rate_gb_day"] > 0 else "-"
line = f"{fs['host']:<20} {fs['mount']:<25} {fs['used_pct']:>5.1f}% {days_str:>12} {fs['trend']:<12} {rate_str:>11}"
lines.append(line)
if fs["days_to_90"] is not None and fs["days_to_90"] <= WARN_DAYS:
warnings.append(fs)
lines.append("")
lines.append(f"Total filesystems: {len(filesystems)}")
if warnings:
lines.append(f"WARNING: {len(warnings)} filesystem(s) predicted to reach 90% within {WARN_DAYS} days!")
for w in warnings:
lines.append(f" - {w['host']}:{w['mount']} ({w['used_pct']:.1f}% used, ~{w['days_to_90']:.0f} days)")
return "\n".join(lines)
# ── main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Predict disk space exhaustion from Prometheus metrics")
parser.add_argument("--dry-run", action="store_true", help="Print report without sending notifications")
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
log.info("Fetching disk metrics from Prometheus")
try:
filesystems = fetch_data()
except Exception as e:
log.error("Failed to fetch Prometheus data: %s", e)
if not args.dry_run:
send_email(
subject="Disk Predictor: FETCH ERROR",
text_body=f"Failed to query Prometheus:\n{e}",
)
return
report = build_report(filesystems)
log.info("Report:\n%s", report)
# LLM summary
llm_summary = None
if ollama_available():
log.info("Ollama available, requesting prediction summary")
prompt = (
"Summarize these disk predictions. Highlight any volumes "
"expected to fill within 30 days.\n\n"
f"```\n{report}\n```"
)
try:
llm_summary = ollama_generate(prompt)
log.info("LLM summary received (%d chars)", len(llm_summary))
except OllamaUnavailableError as e:
log.warning("Ollama became unavailable: %s", e)
else:
log.info("Ollama not available, sending raw report")
# Build final report
final_report = report
if llm_summary:
final_report += "\n\nLLM Analysis:\n" + "-" * 40 + "\n" + llm_summary
warnings = [fs for fs in filesystems if fs["days_to_90"] is not None and fs["days_to_90"] <= WARN_DAYS]
subject = "Disk Predictor: WARNING" if warnings else "Disk Predictor: OK"
if args.dry_run:
print(final_report)
return
try:
send_email(subject=subject, text_body=final_report)
log.info("Email sent: %s", subject)
except Exception as e:
log.error("Failed to send email: %s", e)
# Warnings already included in the email report above
if __name__ == "__main__":
main()