Files
homelab-optimized/scripts/lib/prometheus.py
Gitea Mirror Bot d6eb5dcb1e
Some checks failed
Documentation / Build Docusaurus (push) Failing after 18m8s
Documentation / Deploy to GitHub Pages (push) Has been skipped
Sanitized mirror from private repository - 2026-04-19 07:39:14 UTC
2026-04-19 07:39:14 +00:00

38 lines
1.4 KiB
Python

"""Prometheus query client."""
import json
import logging
import urllib.request
import urllib.parse
log = logging.getLogger(__name__)
PROMETHEUS_URL = "http://192.168.0.210:9090"
def prom_query(query: str, url: str = PROMETHEUS_URL) -> list[dict]:
"""Instant PromQL query. Returns list of result dicts."""
params = urllib.parse.urlencode({"query": query})
req_url = f"{url.rstrip('/')}/api/v1/query?{params}"
req = urllib.request.Request(req_url)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
if data.get("status") != "success":
raise RuntimeError(f"Prometheus query failed: {data}")
return data["data"]["result"]
def prom_query_range(query: str, start: str, end: str, step: str,
url: str = PROMETHEUS_URL) -> list[dict]:
"""Range PromQL query. start/end as Unix timestamps or RFC3339, step as duration string."""
params = urllib.parse.urlencode({
"query": query, "start": start, "end": end, "step": step,
})
req_url = f"{url.rstrip('/')}/api/v1/query_range?{params}"
req = urllib.request.Request(req_url)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
if data.get("status") != "success":
raise RuntimeError(f"Prometheus range query failed: {data}")
return data["data"]["result"]