"""Notification helpers — ntfy and IMAP via Proton Bridge.""" import imaplib import logging import re import ssl import urllib.request from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from html import escape from zoneinfo import ZoneInfo log = logging.getLogger(__name__) SMTP_USER = "admin@thevish.io" SMTP_PASS = "REDACTED_PASSWORD" # pragma: allowlist secret DEFAULT_TO = "admin@thevish.io" IMAP_HOST = "127.0.0.1" IMAP_PORT = 1143 DIGEST_FOLDER = "Folders/Digests" # Map subject prefixes to source labels and emoji indicators _SOURCE_MAP = { "Backup": ("Backup Validator", "#e74c3c", "\u2601\ufe0f"), "Disk Predictor": ("Disk Predictor", "#e67e22", "\U0001f4be"), "Config Drift": ("Config Drift", "#9b59b6", "\u2699\ufe0f"), "[Homelab]": ("Stack Monitor", "#3498db", "\U0001f433"), "Receipt Tracker": ("Receipt Tracker", "#27ae60", "\U0001f9fe"), "Subscription": ("Subscription Auditor", "#f39c12", "\U0001f4b3"), "Email Digest": ("Email Digest", "#2980b9", "\U0001f4e8"), } def _detect_source(subject: str) -> tuple[str, str, str]: """Detect which script sent this based on the subject line.""" for prefix, (label, color, icon) in _SOURCE_MAP.items(): if prefix in subject: return label, color, icon return "Homelab Automation", "#7f8c8d", "\U0001f916" def _detect_status(subject: str) -> tuple[str, str]: """Detect status from subject keywords.""" subject_lower = subject.lower() if any(w in subject_lower for w in ["error", "fail", "issues found", "unsafe", "warning"]): return "Issue Detected", "#e74c3c" if any(w in subject_lower for w in ["ok", "restarted", "new"]): return "OK", "#27ae60" return "Report", "#7f8c8d" def _wrap_html(subject: str, body_html: str) -> str: """Wrap content in a styled HTML email template.""" source_label, source_color, icon = _detect_source(subject) status_label, status_color = _detect_status(subject) now = datetime.now(tz=ZoneInfo("America/Los_Angeles")) timestamp = now.strftime("%b %d, %Y at %I:%M %p %Z") return f"""\
{escaped}'
def send_ntfy(topic: str, title: str, message: str, priority: str = "default",
base_url: str = "https://ntfy.sh"):
"""Send a push notification via ntfy."""
url = f"{base_url.rstrip('/')}/{topic}"
try:
req = urllib.request.Request(url, data=message.encode(), headers={
"Title": title,
"Priority": priority,
"Content-Type": "text/plain",
})
with urllib.request.urlopen(req, timeout=10):
pass
log.info("ntfy sent: %s", title)
except Exception as e:
log.warning("ntfy failed: %s", e)
def send_email(subject: str, html_body: str = "", text_body: str = "",
to: str = DEFAULT_TO, from_addr: str = SMTP_USER):
"""Place email directly into Digests IMAP folder via Proton Bridge."""
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = from_addr
msg["To"] = to
msg["Date"] = datetime.now(tz=ZoneInfo("America/Los_Angeles")).strftime(
"%a, %d %b %Y %H:%M:%S %z"
)
if text_body:
msg.attach(MIMEText(text_body, "plain"))
# Build enhanced HTML: wrap provided HTML or convert plain text
if html_body:
wrapped = _wrap_html(subject, html_body)
elif text_body:
wrapped = _wrap_html(subject, _text_to_html(text_body))
else:
wrapped = ""
if wrapped:
msg.attach(MIMEText(wrapped, "html"))
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
imap = imaplib.IMAP4(IMAP_HOST, IMAP_PORT)
imap.starttls(ctx)
imap.login(SMTP_USER, SMTP_PASS)
# Create folder if it doesn't exist
status, folders = imap.list()
folder_exists = any(DIGEST_FOLDER.encode() in f for f in (folders or []))
if not folder_exists:
imap.create(DIGEST_FOLDER)
log.info("Created IMAP folder: %s", DIGEST_FOLDER)
now = imaplib.Time2Internaldate(datetime.now(tz=ZoneInfo("UTC")))
imap.append(DIGEST_FOLDER, "(\\Seen)", now, msg.as_bytes())
imap.logout()
log.info("Email filed to Digests: %s", subject)