Sanitized mirror from private repository - 2026-04-16 07:19:56 UTC
Some checks failed
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Documentation / Build Docusaurus (push) Has been cancelled

This commit is contained in:
Gitea Mirror Bot
2026-04-16 07:19:56 +00:00
commit d72af152e3
1418 changed files with 359968 additions and 0 deletions

1
scripts/lib/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Shared utilities for homelab automation scripts

38
scripts/lib/gitea.py Normal file
View File

@@ -0,0 +1,38 @@
"""Gitea API client."""
import json
import logging
import urllib.request
import urllib.error
log = logging.getLogger(__name__)
GITEA_URL = "https://git.vish.gg"
GITEA_TOKEN = "REDACTED_TOKEN" # pragma: allowlist secret
DEFAULT_REPO = "vish/homelab"
def gitea_api(method: str, path: str, data: dict | None = None,
url: str = GITEA_URL, token: str = GITEA_TOKEN) -> dict | list:
"""Make a Gitea API request."""
full_url = f"{url.rstrip('/')}/api/v1/{path.lstrip('/')}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(full_url, data=body, method=method, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
def get_commits_since(since: str, repo: str = DEFAULT_REPO) -> list[dict]:
"""Get commits since an ISO timestamp."""
return gitea_api("GET", f"repos/{repo}/commits?sha=main&limit=50&since={since}")
def create_release(tag: str, title: str, body: str, repo: str = DEFAULT_REPO) -> dict:
"""Create a Gitea release."""
return gitea_api("POST", f"repos/{repo}/releases", data={
"tag_name": tag, "name": title, "body": body, "draft": False, "prerelease": False,
})

95
scripts/lib/notify.py Normal file
View File

@@ -0,0 +1,95 @@
"""Notification helpers — ntfy and SMTP via Proton Bridge."""
import imaplib
import logging
import smtplib
import ssl
import urllib.request
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from zoneinfo import ZoneInfo
log = logging.getLogger(__name__)
SMTP_HOST = "127.0.0.1"
SMTP_PORT = 1025
SMTP_USER = "admin@thevish.io"
SMTP_PASS = "REDACTED_PASSWORD" # pragma: allowlist secret
DEFAULT_TO = "admin@thevish.io"
IMAP_HOST = "127.0.0.1"
IMAP_PORT = 1143
DIGEST_FOLDER = "Folders/Digests"
def send_ntfy(topic: str, title: str, message: str, priority: str = "default",
base_url: str = "https://ntfy.sh"):
"""Send a push notification via ntfy."""
url = f"{base_url.rstrip('/')}/{topic}"
try:
req = urllib.request.Request(url, data=message.encode(), headers={
"Title": title,
"Priority": priority,
"Content-Type": "text/plain",
})
with urllib.request.urlopen(req, timeout=10):
pass
log.info("ntfy sent: %s", title)
except Exception as e:
log.warning("ntfy failed: %s", e)
def _file_to_digests(msg_bytes: bytes):
"""File a copy of the message into the Digests folder via Proton Bridge IMAP."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
imap = imaplib.IMAP4(IMAP_HOST, IMAP_PORT)
imap.starttls(ctx)
imap.login(SMTP_USER, SMTP_PASS)
status, folders = imap.list()
folder_exists = any(DIGEST_FOLDER.encode() in f for f in (folders or []))
if not folder_exists:
imap.create(DIGEST_FOLDER)
log.info("Created IMAP folder: %s", DIGEST_FOLDER)
now = imaplib.Time2Internaldate(datetime.now(tz=ZoneInfo("UTC")))
imap.append(DIGEST_FOLDER, None, now, msg_bytes)
imap.logout()
log.info("Filed message to %s folder", DIGEST_FOLDER)
def send_email(subject: str, html_body: str = "", text_body: str = "",
to: str = DEFAULT_TO, from_addr: str = SMTP_USER):
"""Send email via Proton Bridge SMTP on localhost, then file into Digests folder."""
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = from_addr
msg["To"] = to
msg["Date"] = datetime.now(tz=ZoneInfo("America/Los_Angeles")).strftime(
"%a, %d %b %Y %H:%M:%S %z"
)
if text_body:
msg.attach(MIMEText(text_body, "plain"))
if html_body:
msg.attach(MIMEText(html_body, "html"))
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls(context=ctx)
server.login(SMTP_USER, SMTP_PASS)
server.send_message(msg)
log.info("Email sent: %s -> %s", subject, to)
try:
_file_to_digests(msg.as_bytes())
except Exception as e:
log.warning("Failed to file to Digests folder: %s", e)

91
scripts/lib/ollama.py Normal file
View File

@@ -0,0 +1,91 @@
"""Ollama LLM client with retry and response cleaning."""
import json
import logging
import re
import time
import urllib.request
import urllib.error
log = logging.getLogger(__name__)
DEFAULT_URL = "http://192.168.0.145:31434"
DEFAULT_MODEL = "qwen3-coder:latest"
class OllamaUnavailableError(Exception):
pass
def ollama_available(url: str = DEFAULT_URL) -> bool:
"""Quick health check — GET /api/tags."""
try:
req = urllib.request.Request(f"{url.rstrip('/')}/api/tags")
with urllib.request.urlopen(req, timeout=5):
return True
except Exception:
return False
_last_call_time = 0.0
MIN_CALL_INTERVAL = 2.0 # seconds between calls to avoid overwhelming Ollama
def ollama_generate(
prompt: str,
model: str = DEFAULT_MODEL,
url: str = DEFAULT_URL,
max_retries: int = 3,
timeout: int = 120,
temperature: float = 0.3,
num_predict: int = 4000, # Needs headroom for thinking + response with qwen3:32b
) -> str:
"""Generate text from Ollama with retry + backoff. Returns cleaned response."""
global _last_call_time
elapsed = time.time() - _last_call_time
if elapsed < MIN_CALL_INTERVAL:
time.sleep(MIN_CALL_INTERVAL - elapsed)
_last_call_time = time.time()
# Use /api/chat which properly separates thinking from content
data = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": temperature, "num_predict": num_predict},
}).encode()
req = urllib.request.Request(
f"{url.rstrip('/')}/api/chat",
data=data,
headers={"Content-Type": "application/json"},
)
last_error = None
for attempt in range(max_retries):
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
msg = result.get("message", {})
content = msg.get("content", "").strip()
thinking = msg.get("thinking", "").strip()
# Content has the actual answer; thinking has the reasoning
# If content exists, use it (strip any leaked think tags)
if content:
return re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
# If only thinking exists, the model ran out of tokens before answering
# Try to extract the answer from the end of the thinking text
if thinking:
# Look for category/keyword answers in the last 200 chars of thinking
return thinking
# Fallback to legacy response field
raw = result.get("response", "").strip()
return re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
except (urllib.error.URLError, TimeoutError, OSError) as e:
last_error = e
if attempt < max_retries - 1:
wait = 2 ** attempt
log.warning("Ollama attempt %d/%d failed: %s — retrying in %ds",
attempt + 1, max_retries, e, wait)
time.sleep(wait)
raise OllamaUnavailableError(f"Ollama unavailable after {max_retries} attempts: {last_error}")

71
scripts/lib/portainer.py Normal file
View File

@@ -0,0 +1,71 @@
"""Portainer API client."""
import json
import logging
import urllib.request
import urllib.error
log = logging.getLogger(__name__)
PORTAINER_URL = "http://100.83.230.112:10000"
PORTAINER_KEY = "REDACTED_PORTAINER_TOKEN" # pragma: allowlist secret
ENDPOINTS = {
"atlantis": 2,
"calypso": 443397,
"nuc": 443398,
"homelab": 443399,
"rpi5": 443395,
}
def portainer_api(method: str, path: str, data: dict | None = None,
url: str = PORTAINER_URL, key: str = PORTAINER_KEY) -> dict | list:
"""Make a Portainer API request."""
full_url = f"{url.rstrip('/')}/api/{path.lstrip('/')}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(full_url, data=body, method=method, headers={
"X-API-Key": key,
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
def list_containers(endpoint: str, all_containers: bool = True) -> list[dict]:
"""List containers on an endpoint."""
eid = ENDPOINTS.get(endpoint, endpoint)
params = "all=true" if all_containers else "all=false"
return portainer_api("GET", f"endpoints/{eid}/docker/containers/json?{params}")
def get_container_logs(endpoint: str, container_id: str, tail: int = 100) -> str:
"""Get container logs."""
eid = ENDPOINTS.get(endpoint, endpoint)
url = f"{PORTAINER_URL}/api/endpoints/{eid}/docker/containers/{container_id}/logs?stdout=true&stderr=true&tail={tail}"
req = urllib.request.Request(url, headers={"X-API-Key": PORTAINER_KEY})
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
# Strip Docker log prefix bytes (8-byte header per line)
lines = []
for line in raw.split(b"\n"):
if len(line) > 8:
lines.append(line[8:].decode("utf-8", errors="replace"))
return "\n".join(lines)
def restart_container(endpoint: str, container_id: str) -> bool:
"""Restart a container. Returns True on success."""
eid = ENDPOINTS.get(endpoint, endpoint)
try:
portainer_api("POST", f"endpoints/{eid}/docker/containers/{container_id}/restart")
return True
except urllib.error.HTTPError as e:
log.error("Restart failed for %s: %s", container_id, e)
return False
def inspect_container(endpoint: str, container_id: str) -> dict:
"""Inspect a container for full config."""
eid = ENDPOINTS.get(endpoint, endpoint)
return portainer_api("GET", f"endpoints/{eid}/docker/containers/{container_id}/json")

37
scripts/lib/prometheus.py Normal file
View File

@@ -0,0 +1,37 @@
"""Prometheus query client."""
import json
import logging
import urllib.request
import urllib.parse
log = logging.getLogger(__name__)
PROMETHEUS_URL = "http://192.168.0.210:9090"
def prom_query(query: str, url: str = PROMETHEUS_URL) -> list[dict]:
"""Instant PromQL query. Returns list of result dicts."""
params = urllib.parse.urlencode({"query": query})
req_url = f"{url.rstrip('/')}/api/v1/query?{params}"
req = urllib.request.Request(req_url)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
if data.get("status") != "success":
raise RuntimeError(f"Prometheus query failed: {data}")
return data["data"]["result"]
def prom_query_range(query: str, start: str, end: str, step: str,
url: str = PROMETHEUS_URL) -> list[dict]:
"""Range PromQL query. start/end as Unix timestamps or RFC3339, step as duration string."""
params = urllib.parse.urlencode({
"query": query, "start": start, "end": end, "step": step,
})
req_url = f"{url.rstrip('/')}/api/v1/query_range?{params}"
req = urllib.request.Request(req_url)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
if data.get("status") != "success":
raise RuntimeError(f"Prometheus range query failed: {data}")
return data["data"]["result"]