Files
homelab-optimized/scripts/email-digest.py
Gitea Mirror Bot 5b52908426
Some checks failed
Documentation / Build Docusaurus (push) Has started running
Documentation / Deploy to GitHub Pages (push) Has been cancelled
Sanitized mirror from private repository - 2026-04-19 08:25:34 UTC
2026-04-19 08:25:34 +00:00

204 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""Daily digest of email organizer activity — files summary into Digests IMAP folder."""
import sqlite3
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
sys.path.insert(0, str(Path(__file__).parent))
from lib.notify import send_email # noqa: E402
# ── config ───────────────────────────────────────────────────────────────────
ACCOUNTS = [
{
"name": "lzbellina92@gmail.com",
"db": Path(__file__).parent / "gmail-organizer" / "processed.db",
},
{
"name": "your-email@example.com",
"db": Path(__file__).parent / "gmail-organizer-dvish" / "processed.db",
},
{
"name": "admin@thevish.io",
"db": Path(__file__).parent / "proton-organizer" / "processed.db",
},
]
# ── gather stats ─────────────────────────────────────────────────────────────
def get_stats(db_path: Path, since: str) -> dict:
"""Query processed.db for entries since a given ISO timestamp."""
if not db_path.exists():
return {"total": 0, "categories": {}, "items": []}
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT message_id, category, processed_at FROM processed WHERE processed_at >= ? ORDER BY processed_at DESC",
(since,),
).fetchall()
conn.close()
categories = defaultdict(int)
items = []
for msg_id, category, ts in rows:
categories[category] += 1
items.append({"category": category, "time": ts})
return {
"total": len(rows),
"categories": dict(categories),
"items": items,
}
def get_sender_cache_stats(db_path: Path) -> int:
"""Count entries in sender cache."""
if not db_path.exists():
return 0
conn = sqlite3.connect(db_path)
try:
row = conn.execute("SELECT COUNT(*) FROM sender_cache").fetchone()
return row[0] if row else 0
except sqlite3.OperationalError:
return 0
finally:
conn.close()
# ── format ───────────────────────────────────────────────────────────────────
# Category colors for the HTML bar charts
_CAT_COLORS = {
"receipts": "#27ae60",
"newsletters": "#3498db",
"finance": "#f39c12",
"accounts": "#e67e22",
"spam": "#e74c3c",
"personal": "#9b59b6",
"work": "#2c3e50",
}
def build_html(account_stats: list[dict], hours: int) -> str:
"""Build inner HTML content for the digest (wrapped by notify.py template)."""
total_all = sum(a["stats"]["total"] for a in account_stats)
parts = [
f'<p style="margin:0 0 16px 0;color:#555;">Last <strong>{hours} hours</strong> &mdash; '
f'<strong>{total_all}</strong> emails classified across {len(account_stats)} accounts.</p>',
]
if total_all == 0:
parts.append('<p style="color:#999;font-style:italic;">No new emails were classified in this period.</p>')
else:
for a in account_stats:
stats = a["stats"]
if stats["total"] == 0:
continue
# Account header
parts.append(
f'<div style="margin:16px 0 8px 0;padding:8px 12px;background:#f8f9fa;border-radius:4px;border-left:3px solid #3498db;">'
f'<strong>{a["name"]}</strong>'
f'<span style="float:right;color:#666;">{stats["total"]} classified</span>'
f'</div>'
)
# Category table with colored bars
parts.append(
'<table width="100%" cellpadding="0" cellspacing="0" style="margin:0 0 8px 0;">'
)
max_count = max(stats["categories"].values())
for cat, count in sorted(stats["categories"].items(), key=lambda x: -x[1]):
bar_pct = int((count / max_count) * 100) if max_count else 0
color = _CAT_COLORS.get(cat, "#95a5a6")
parts.append(
f'<tr>'
f'<td style="padding:4px 8px;width:120px;font-size:13px;color:#555;">{cat}</td>'
f'<td style="padding:4px 0;">'
f'<div style="background:#f0f0f0;border-radius:3px;overflow:hidden;">'
f'<div style="background:{color};height:20px;width:{bar_pct}%;border-radius:3px;min-width:24px;text-align:center;">'
f'<span style="color:#fff;font-size:11px;font-weight:600;line-height:20px;">{count}</span>'
f'</div></div></td>'
f'</tr>'
)
parts.append('</table>')
if a.get("sender_cache"):
parts.append(
f'<p style="margin:4px 0 12px 0;color:#aaa;font-size:11px;">'
f'\U0001f9e0 Sender cache: {a["sender_cache"]} known senders</p>'
)
return "\n".join(parts)
def build_text(account_stats: list[dict], hours: int) -> str:
"""Build a plain-text email body."""
total_all = sum(a["stats"]["total"] for a in account_stats)
lines = [
f"Email Organizer Digest — Last {hours} hours",
f"Total: {total_all} emails classified across {len(account_stats)} accounts",
"",
]
if total_all == 0:
lines.append("No new emails were classified in this period.")
else:
for a in account_stats:
stats = a["stats"]
if stats["total"] == 0:
continue
lines.append(f"--- {a['name']} ({stats['total']}) ---")
for cat, count in sorted(stats["categories"].items(), key=lambda x: -x[1]):
lines.append(f" {cat:>15}: {count}")
if a.get("sender_cache"):
lines.append(f" sender cache: {a['sender_cache']} known senders")
lines.append("")
return "\n".join(lines)
# ── main ─────────────────────────────────────────────────────────────────────
def main():
import argparse
parser = argparse.ArgumentParser(description="Daily email organizer digest")
parser.add_argument("--hours", type=int, default=24, help="Look back N hours (default: 24)")
parser.add_argument("--dry-run", action="store_true", help="Print digest without sending")
args = parser.parse_args()
since = (datetime.now(tz=ZoneInfo("UTC")) - timedelta(hours=args.hours)).isoformat()
account_stats = []
for acct in ACCOUNTS:
stats = get_stats(acct["db"], since)
sender_cache = get_sender_cache_stats(acct["db"])
account_stats.append({
"name": acct["name"],
"stats": stats,
"sender_cache": sender_cache,
})
total = sum(a["stats"]["total"] for a in account_stats)
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
subject = f"Email Digest: {total} classified — {now.strftime('%b %d')}"
html_body = build_html(account_stats, args.hours)
text_body = build_text(account_stats, args.hours)
if args.dry_run:
print(text_body)
return
send_email(subject=subject, html_body=html_body, text_body=text_body)
if __name__ == "__main__":
main()