Sanitized mirror from private repository - 2026-04-20 01:32:01 UTC
This commit is contained in:
1
scripts/lib/__init__.py
Normal file
1
scripts/lib/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Shared utilities for homelab automation scripts
|
||||
38
scripts/lib/gitea.py
Normal file
38
scripts/lib/gitea.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Gitea API client."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
GITEA_URL = "https://git.vish.gg"
|
||||
GITEA_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret
|
||||
DEFAULT_REPO = "vish/homelab"
|
||||
|
||||
|
||||
def gitea_api(method: str, path: str, data: dict | None = None,
|
||||
url: str = GITEA_URL, token: str = GITEA_TOKEN) -> dict | list:
|
||||
"""Make a Gitea API request."""
|
||||
full_url = f"{url.rstrip('/')}/api/v1/{path.lstrip('/')}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(full_url, data=body, method=method, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
|
||||
def get_commits_since(since: str, repo: str = DEFAULT_REPO) -> list[dict]:
|
||||
"""Get commits since an ISO timestamp."""
|
||||
return gitea_api("GET", f"repos/{repo}/commits?sha=main&limit=50&since={since}")
|
||||
|
||||
|
||||
def create_release(tag: str, title: str, body: str, repo: str = DEFAULT_REPO) -> dict:
|
||||
"""Create a Gitea release."""
|
||||
return gitea_api("POST", f"repos/{repo}/releases", data={
|
||||
"tag_name": tag, "name": title, "body": body, "draft": False, "prerelease": False,
|
||||
})
|
||||
196
scripts/lib/notify.py
Normal file
196
scripts/lib/notify.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""Notification helpers — ntfy and IMAP via Proton Bridge."""
|
||||
|
||||
import imaplib
|
||||
import logging
|
||||
import re
|
||||
import ssl
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from html import escape
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SMTP_USER = "admin@thevish.io"
|
||||
SMTP_PASS = "REDACTED_PASSWORD" # pragma: allowlist secret
|
||||
DEFAULT_TO = "admin@thevish.io"
|
||||
|
||||
IMAP_HOST = "127.0.0.1"
|
||||
IMAP_PORT = 1143
|
||||
DIGEST_FOLDER = "Folders/Digests"
|
||||
|
||||
# Map subject prefixes to source labels and emoji indicators
|
||||
_SOURCE_MAP = {
|
||||
"Backup": ("Backup Validator", "#e74c3c", "\u2601\ufe0f"),
|
||||
"Disk Predictor": ("Disk Predictor", "#e67e22", "\U0001f4be"),
|
||||
"Config Drift": ("Config Drift", "#9b59b6", "\u2699\ufe0f"),
|
||||
"[Homelab]": ("Stack Monitor", "#3498db", "\U0001f433"),
|
||||
"Receipt Tracker": ("Receipt Tracker", "#27ae60", "\U0001f9fe"),
|
||||
"Subscription": ("Subscription Auditor", "#f39c12", "\U0001f4b3"),
|
||||
"Email Digest": ("Email Digest", "#2980b9", "\U0001f4e8"),
|
||||
}
|
||||
|
||||
|
||||
def _detect_source(subject: str) -> tuple[str, str, str]:
|
||||
"""Detect which script sent this based on the subject line."""
|
||||
for prefix, (label, color, icon) in _SOURCE_MAP.items():
|
||||
if prefix in subject:
|
||||
return label, color, icon
|
||||
return "Homelab Automation", "#7f8c8d", "\U0001f916"
|
||||
|
||||
|
||||
def _detect_status(subject: str) -> tuple[str, str]:
|
||||
"""Detect status from subject keywords."""
|
||||
subject_lower = subject.lower()
|
||||
if any(w in subject_lower for w in ["error", "fail", "issues found", "unsafe", "warning"]):
|
||||
return "Issue Detected", "#e74c3c"
|
||||
if any(w in subject_lower for w in ["ok", "restarted", "new"]):
|
||||
return "OK", "#27ae60"
|
||||
return "Report", "#7f8c8d"
|
||||
|
||||
|
||||
def _wrap_html(subject: str, body_html: str) -> str:
|
||||
"""Wrap content in a styled HTML email template."""
|
||||
source_label, source_color, icon = _detect_source(subject)
|
||||
status_label, status_color = _detect_status(subject)
|
||||
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
|
||||
timestamp = now.strftime("%b %d, %Y at %I:%M %p %Z")
|
||||
|
||||
return f"""\
|
||||
<html>
|
||||
<body style="margin:0;padding:0;background:#f5f5f5;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;">
|
||||
<table width="100%" cellpadding="0" cellspacing="0" style="background:#f5f5f5;padding:20px 0;">
|
||||
<tr><td align="center">
|
||||
<table width="600" cellpadding="0" cellspacing="0" style="background:#fff;border-radius:8px;overflow:hidden;box-shadow:0 2px 8px rgba(0,0,0,0.08);">
|
||||
|
||||
<!-- Header -->
|
||||
<tr><td style="background:{source_color};padding:16px 24px;">
|
||||
<span style="font-size:20px;margin-right:8px;">{icon}</span>
|
||||
<span style="color:#fff;font-size:16px;font-weight:600;">{source_label}</span>
|
||||
<span style="float:right;color:rgba(255,255,255,0.8);font-size:12px;line-height:28px;">{timestamp}</span>
|
||||
</td></tr>
|
||||
|
||||
<!-- Status bar -->
|
||||
<tr><td style="padding:12px 24px;background:{status_color}15;border-bottom:1px solid #eee;">
|
||||
<span style="color:{status_color};font-weight:600;font-size:13px;">\u25cf {status_label}</span>
|
||||
<span style="color:#666;font-size:13px;margin-left:12px;">{escape(subject)}</span>
|
||||
</td></tr>
|
||||
|
||||
<!-- Body -->
|
||||
<tr><td style="padding:20px 24px;color:#333;font-size:14px;line-height:1.6;">
|
||||
{body_html}
|
||||
</td></tr>
|
||||
|
||||
<!-- Footer -->
|
||||
<tr><td style="padding:12px 24px;background:#fafafa;border-top:1px solid #eee;text-align:center;">
|
||||
<span style="color:#999;font-size:11px;">Homelab Automation \u2022 homelab-vm \u2022 {timestamp}</span>
|
||||
</td></tr>
|
||||
|
||||
</table>
|
||||
</td></tr>
|
||||
</table>
|
||||
</body>
|
||||
</html>"""
|
||||
|
||||
|
||||
def _text_to_html(text: str) -> str:
|
||||
"""Convert plain text email body to formatted HTML."""
|
||||
escaped = escape(text)
|
||||
|
||||
# Bold section headers (lines ending with colon or lines of dashes)
|
||||
escaped = re.sub(
|
||||
r'^(.*?:)\s*$',
|
||||
r'<strong>\1</strong>',
|
||||
escaped,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
# Style lines starting with " -" or " *" as list items
|
||||
escaped = re.sub(
|
||||
r'^(\s+[-*])\s+(.+)$',
|
||||
r'<span style="color:#555;">\1</span> \2',
|
||||
escaped,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
# Highlight WARNING/ERROR/FAIL keywords
|
||||
escaped = re.sub(
|
||||
r'\b(ERROR|FAIL(?:ED)?|WARNING)\b',
|
||||
r'<span style="color:#e74c3c;font-weight:600;">\1</span>',
|
||||
escaped,
|
||||
)
|
||||
|
||||
# Highlight OK/SUCCESS/PASS keywords
|
||||
escaped = re.sub(
|
||||
r'\b(OK|SUCCESS|PASS(?:ED)?)\b',
|
||||
r'<span style="color:#27ae60;font-weight:600;">\1</span>',
|
||||
escaped,
|
||||
)
|
||||
|
||||
return f'<pre style="white-space:pre-wrap;word-wrap:break-word;font-family:\'SF Mono\',Monaco,Consolas,monospace;font-size:13px;margin:0;">{escaped}</pre>'
|
||||
|
||||
|
||||
def send_ntfy(topic: str, title: str, message: str, priority: str = "default",
|
||||
base_url: str = "https://ntfy.sh"):
|
||||
"""Send a push notification via ntfy."""
|
||||
url = f"{base_url.rstrip('/')}/{topic}"
|
||||
try:
|
||||
req = urllib.request.Request(url, data=message.encode(), headers={
|
||||
"Title": title,
|
||||
"Priority": priority,
|
||||
"Content-Type": "text/plain",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=10):
|
||||
pass
|
||||
log.info("ntfy sent: %s", title)
|
||||
except Exception as e:
|
||||
log.warning("ntfy failed: %s", e)
|
||||
|
||||
|
||||
def send_email(subject: str, html_body: str = "", text_body: str = "",
|
||||
to: str = DEFAULT_TO, from_addr: str = SMTP_USER):
|
||||
"""Place email directly into Digests IMAP folder via Proton Bridge."""
|
||||
msg = MIMEMultipart("alternative")
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = from_addr
|
||||
msg["To"] = to
|
||||
msg["Date"] = datetime.now(tz=ZoneInfo("America/Los_Angeles")).strftime(
|
||||
"%a, %d %b %Y %H:%M:%S %z"
|
||||
)
|
||||
|
||||
if text_body:
|
||||
msg.attach(MIMEText(text_body, "plain"))
|
||||
|
||||
# Build enhanced HTML: wrap provided HTML or convert plain text
|
||||
if html_body:
|
||||
wrapped = _wrap_html(subject, html_body)
|
||||
elif text_body:
|
||||
wrapped = _wrap_html(subject, _text_to_html(text_body))
|
||||
else:
|
||||
wrapped = ""
|
||||
|
||||
if wrapped:
|
||||
msg.attach(MIMEText(wrapped, "html"))
|
||||
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
imap = imaplib.IMAP4(IMAP_HOST, IMAP_PORT)
|
||||
imap.starttls(ctx)
|
||||
imap.login(SMTP_USER, SMTP_PASS)
|
||||
|
||||
# Create folder if it doesn't exist
|
||||
status, folders = imap.list()
|
||||
folder_exists = any(DIGEST_FOLDER.encode() in f for f in (folders or []))
|
||||
if not folder_exists:
|
||||
imap.create(DIGEST_FOLDER)
|
||||
log.info("Created IMAP folder: %s", DIGEST_FOLDER)
|
||||
|
||||
now = imaplib.Time2Internaldate(datetime.now(tz=ZoneInfo("UTC")))
|
||||
imap.append(DIGEST_FOLDER, "(\\Seen)", now, msg.as_bytes())
|
||||
imap.logout()
|
||||
|
||||
log.info("Email filed to Digests: %s", subject)
|
||||
91
scripts/lib/ollama.py
Normal file
91
scripts/lib/ollama.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Ollama LLM client with retry and response cleaning."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_URL = "http://192.168.0.145:31434"
|
||||
DEFAULT_MODEL = "qwen3-coder:latest"
|
||||
|
||||
|
||||
class OllamaUnavailableError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def ollama_available(url: str = DEFAULT_URL) -> bool:
|
||||
"""Quick health check — GET /api/tags."""
|
||||
try:
|
||||
req = urllib.request.Request(f"{url.rstrip('/')}/api/tags")
|
||||
with urllib.request.urlopen(req, timeout=5):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
_last_call_time = 0.0
|
||||
MIN_CALL_INTERVAL = 2.0 # seconds between calls to avoid overwhelming Ollama
|
||||
|
||||
|
||||
def ollama_generate(
|
||||
prompt: str,
|
||||
model: str = DEFAULT_MODEL,
|
||||
url: str = DEFAULT_URL,
|
||||
max_retries: int = 3,
|
||||
timeout: int = 120,
|
||||
temperature: float = 0.3,
|
||||
num_predict: int = 4000, # Needs headroom for thinking + response with qwen3:32b
|
||||
) -> str:
|
||||
"""Generate text from Ollama with retry + backoff. Returns cleaned response."""
|
||||
global _last_call_time
|
||||
elapsed = time.time() - _last_call_time
|
||||
if elapsed < MIN_CALL_INTERVAL:
|
||||
time.sleep(MIN_CALL_INTERVAL - elapsed)
|
||||
_last_call_time = time.time()
|
||||
|
||||
# Use /api/chat which properly separates thinking from content
|
||||
data = json.dumps({
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"stream": False,
|
||||
"options": {"temperature": temperature, "num_predict": num_predict},
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{url.rstrip('/')}/api/chat",
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
last_error = None
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
result = json.loads(resp.read())
|
||||
msg = result.get("message", {})
|
||||
content = msg.get("content", "").strip()
|
||||
thinking = msg.get("thinking", "").strip()
|
||||
# Content has the actual answer; thinking has the reasoning
|
||||
# If content exists, use it (strip any leaked think tags)
|
||||
if content:
|
||||
return re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
|
||||
# If only thinking exists, the model ran out of tokens before answering
|
||||
# Try to extract the answer from the end of the thinking text
|
||||
if thinking:
|
||||
# Look for category/keyword answers in the last 200 chars of thinking
|
||||
return thinking
|
||||
# Fallback to legacy response field
|
||||
raw = result.get("response", "").strip()
|
||||
return re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||
except (urllib.error.URLError, TimeoutError, OSError) as e:
|
||||
last_error = e
|
||||
if attempt < max_retries - 1:
|
||||
wait = 2 ** attempt
|
||||
log.warning("Ollama attempt %d/%d failed: %s — retrying in %ds",
|
||||
attempt + 1, max_retries, e, wait)
|
||||
time.sleep(wait)
|
||||
|
||||
raise OllamaUnavailableError(f"Ollama unavailable after {max_retries} attempts: {last_error}")
|
||||
71
scripts/lib/portainer.py
Normal file
71
scripts/lib/portainer.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Portainer API client."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
PORTAINER_URL = "http://100.83.230.112:10000"
|
||||
PORTAINER_KEY = "REDACTED_PORTAINER_TOKEN" # pragma: allowlist secret
|
||||
|
||||
ENDPOINTS = {
|
||||
"atlantis": 2,
|
||||
"calypso": 443397,
|
||||
"nuc": 443398,
|
||||
"homelab": 443399,
|
||||
"rpi5": 443395,
|
||||
}
|
||||
|
||||
|
||||
def portainer_api(method: str, path: str, data: dict | None = None,
|
||||
url: str = PORTAINER_URL, key: str = PORTAINER_KEY) -> dict | list:
|
||||
"""Make a Portainer API request."""
|
||||
full_url = f"{url.rstrip('/')}/api/{path.lstrip('/')}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(full_url, data=body, method=method, headers={
|
||||
"X-API-Key": key,
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
|
||||
def list_containers(endpoint: str, all_containers: bool = True) -> list[dict]:
|
||||
"""List containers on an endpoint."""
|
||||
eid = ENDPOINTS.get(endpoint, endpoint)
|
||||
params = "all=true" if all_containers else "all=false"
|
||||
return portainer_api("GET", f"endpoints/{eid}/docker/containers/json?{params}")
|
||||
|
||||
|
||||
def get_container_logs(endpoint: str, container_id: str, tail: int = 100) -> str:
|
||||
"""Get container logs."""
|
||||
eid = ENDPOINTS.get(endpoint, endpoint)
|
||||
url = f"{PORTAINER_URL}/api/endpoints/{eid}/docker/containers/{container_id}/logs?stdout=true&stderr=true&tail={tail}"
|
||||
req = urllib.request.Request(url, headers={"X-API-Key": PORTAINER_KEY})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
raw = resp.read()
|
||||
# Strip Docker log prefix bytes (8-byte header per line)
|
||||
lines = []
|
||||
for line in raw.split(b"\n"):
|
||||
if len(line) > 8:
|
||||
lines.append(line[8:].decode("utf-8", errors="replace"))
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def restart_container(endpoint: str, container_id: str) -> bool:
|
||||
"""Restart a container. Returns True on success."""
|
||||
eid = ENDPOINTS.get(endpoint, endpoint)
|
||||
try:
|
||||
portainer_api("POST", f"endpoints/{eid}/docker/containers/{container_id}/restart")
|
||||
return True
|
||||
except urllib.error.HTTPError as e:
|
||||
log.error("Restart failed for %s: %s", container_id, e)
|
||||
return False
|
||||
|
||||
|
||||
def inspect_container(endpoint: str, container_id: str) -> dict:
|
||||
"""Inspect a container for full config."""
|
||||
eid = ENDPOINTS.get(endpoint, endpoint)
|
||||
return portainer_api("GET", f"endpoints/{eid}/docker/containers/{container_id}/json")
|
||||
37
scripts/lib/prometheus.py
Normal file
37
scripts/lib/prometheus.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Prometheus query client."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
PROMETHEUS_URL = "http://192.168.0.210:9090"
|
||||
|
||||
|
||||
def prom_query(query: str, url: str = PROMETHEUS_URL) -> list[dict]:
|
||||
"""Instant PromQL query. Returns list of result dicts."""
|
||||
params = urllib.parse.urlencode({"query": query})
|
||||
req_url = f"{url.rstrip('/')}/api/v1/query?{params}"
|
||||
req = urllib.request.Request(req_url)
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
data = json.loads(resp.read())
|
||||
if data.get("status") != "success":
|
||||
raise RuntimeError(f"Prometheus query failed: {data}")
|
||||
return data["data"]["result"]
|
||||
|
||||
|
||||
def prom_query_range(query: str, start: str, end: str, step: str,
|
||||
url: str = PROMETHEUS_URL) -> list[dict]:
|
||||
"""Range PromQL query. start/end as Unix timestamps or RFC3339, step as duration string."""
|
||||
params = urllib.parse.urlencode({
|
||||
"query": query, "start": start, "end": end, "step": step,
|
||||
})
|
||||
req_url = f"{url.rstrip('/')}/api/v1/query_range?{params}"
|
||||
req = urllib.request.Request(req_url)
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
if data.get("status") != "success":
|
||||
raise RuntimeError(f"Prometheus range query failed: {data}")
|
||||
return data["data"]["result"]
|
||||
Reference in New Issue
Block a user