"""Prometheus query client.""" import json import logging import urllib.request import urllib.parse log = logging.getLogger(__name__) PROMETHEUS_URL = "http://192.168.0.210:9090" def prom_query(query: str, url: str = PROMETHEUS_URL) -> list[dict]: """Instant PromQL query. Returns list of result dicts.""" params = urllib.parse.urlencode({"query": query}) req_url = f"{url.rstrip('/')}/api/v1/query?{params}" req = urllib.request.Request(req_url) with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read()) if data.get("status") != "success": raise RuntimeError(f"Prometheus query failed: {data}") return data["data"]["result"] def prom_query_range(query: str, start: str, end: str, step: str, url: str = PROMETHEUS_URL) -> list[dict]: """Range PromQL query. start/end as Unix timestamps or RFC3339, step as duration string.""" params = urllib.parse.urlencode({ "query": query, "start": start, "end": end, "step": step, }) req_url = f"{url.rstrip('/')}/api/v1/query_range?{params}" req = urllib.request.Request(req_url) with urllib.request.urlopen(req, timeout=60) as resp: data = json.loads(resp.read()) if data.get("status") != "success": raise RuntimeError(f"Prometheus range query failed: {data}") return data["data"]["result"]