Files
homelab-optimized/scripts/gmail-organizer/gmail_organizer.py
Gitea Mirror Bot 19b90cee4d
Some checks failed
Documentation / Build Docusaurus (push) Failing after 5m1s
Documentation / Deploy to GitHub Pages (push) Has been skipped
Sanitized mirror from private repository - 2026-03-31 23:50:30 UTC
2026-03-31 23:50:30 +00:00

333 lines
12 KiB
Python

#!/usr/bin/env python3
"""Gmail Organizer — classifies emails using a local LLM and applies Gmail labels."""
import argparse
import email
import email.header
import html
import imaplib
import json
import logging
import re
import sqlite3
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
import yaml
LOG_FMT = "%(asctime)s %(levelname)-8s %(message)s"
log = logging.getLogger("gmail-organizer")
DB_PATH = Path(__file__).parent / "processed.db"
DEFAULT_CONFIG = Path(__file__).parent / "config.local.yaml"
# ── helpers ──────────────────────────────────────────────────────────────────
def load_config(path: Path) -> dict:
with open(path) as f:
return yaml.safe_load(f)
def init_db(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS processed (
message_id TEXT PRIMARY KEY,
category TEXT NOT NULL,
processed_at TEXT NOT NULL
)
""")
conn.commit()
return conn
def is_processed(conn: sqlite3.Connection, message_id: str) -> bool:
row = conn.execute(
"SELECT 1 FROM processed WHERE message_id = ?", (message_id,)
).fetchone()
return row is not None
def mark_processed(conn: sqlite3.Connection, message_id: str, category: str):
conn.execute(
"INSERT OR REPLACE INTO processed (message_id, category, processed_at) VALUES (?, ?, ?)",
(message_id, category, datetime.now(tz=__import__('zoneinfo').ZoneInfo("UTC")).isoformat()),
)
conn.commit()
def decode_header(raw: str | None) -> str:
if not raw:
return ""
parts = email.header.decode_header(raw)
decoded = []
for data, charset in parts:
if isinstance(data, bytes):
decoded.append(data.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(data)
return " ".join(decoded)
def extract_text(msg: email.message.Message, max_chars: int) -> str:
"""Extract plain-text body from an email, falling back to stripped HTML."""
body = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
break
elif ct == "text/html" and not body:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
raw_html = payload.decode(charset, errors="replace")
body = html.unescape(re.sub(r"<[^>]+>", " ", raw_html))
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace")
if msg.get_content_type() == "text/html":
body = html.unescape(re.sub(r"<[^>]+>", " ", body))
# Collapse whitespace and truncate
body = re.sub(r"\s+", " ", body).strip()
return body[:max_chars]
# ── Gmail IMAP ───────────────────────────────────────────────────────────────
class GmailClient:
def __init__(self, email_addr: str, app_password: "REDACTED_PASSWORD"
self.email = email_addr
self.conn = imaplib.IMAP4_SSL("imap.gmail.com")
self.conn.login(email_addr, app_password)
def fetch_uids(self, mailbox: str = "INBOX", search: str = "ALL",
batch_size: int = 50) -> list[bytes]:
self.conn.select(mailbox)
_, data = self.conn.search(None, search)
uids = data[0].split()
# Most recent first
return list(reversed(uids[-batch_size:]))
def fetch_message(self, uid: bytes) -> email.message.Message:
_, data = self.conn.fetch(uid, "(RFC822)")
return email.message_from_bytes(data[0][1])
def get_labels(self, uid: bytes) -> list[str]:
"""Get existing Gmail labels (X-GM-LABELS) for a message."""
_, data = self.conn.fetch(uid, "(X-GM-LABELS)")
raw = data[0].decode() if isinstance(data[0], bytes) else str(data[0])
match = re.search(r'X-GM-LABELS \(([^)]*)\)', raw)
if match:
return match.group(1).split()
return []
def apply_label(self, uid: bytes, label: str):
"""Apply a Gmail label to a message. Creates the label if needed."""
# Gmail IMAP uses X-GM-LABELS for label manipulation
result = self.conn.store(uid, "+X-GM-LABELS", f'("{label}")')
if result[0] != "OK":
# Fallback: copy to label (which creates it as a folder)
try:
self.conn.create(label)
except imaplib.IMAP4.error:
pass # Label already exists
self.conn.copy(uid, label)
def archive(self, uid: bytes):
"""Archive a message (remove from INBOX by removing \\Inbox label)."""
self.conn.store(uid, "-X-GM-LABELS", '("\\\\Inbox")')
def close(self):
try:
self.conn.close()
self.conn.logout()
except Exception:
pass
# ── Ollama LLM ───────────────────────────────────────────────────────────────
def classify_email(
ollama_url: str,
model: str,
categories: dict,
subject: str,
sender: str,
body_snippet: str,
) -> str:
"""Ask the LLM to classify an email into one of the categories."""
cat_descriptions = "\n".join(
f"- **{name}**: {info['description']}" for name, info in categories.items()
)
category_names = ", ".join(categories.keys())
prompt = f"""Classify this email into exactly ONE category. Reply with ONLY the category name, nothing else.
Categories:
{cat_descriptions}
Email:
From: {sender}
Subject: {subject}
Body: {body_snippet[:1000]}
Reply with one of: {category_names}"""
payload = json.dumps({
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 20,
},
}).encode()
req = urllib.request.Request(
f"{ollama_url.rstrip('/')}/api/generate",
data=payload,
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read())
except urllib.error.URLError as e:
log.error("Ollama request failed: %s", e)
raise
raw_response = result.get("response", "").strip().lower()
# Strip any thinking tags (qwen3 sometimes wraps reasoning in <think>...</think>)
raw_response = re.sub(r"<think>.*?</think>", "", raw_response, flags=re.DOTALL).strip()
# Extract just the category name
for name in categories:
if name in raw_response:
return name
log.warning("LLM returned unexpected category %r, defaulting to 'personal'", raw_response)
return "personal"
# ── main ─────────────────────────────────────────────────────────────────────
def run(config_path: Path, dry_run: bool = False, reprocess: bool = False,
limit: int | None = None):
cfg = load_config(config_path)
gmail_cfg = cfg["gmail"]
ollama_cfg = cfg["ollama"]
categories = cfg["categories"]
proc_cfg = cfg.get("processing", {})
batch_size = limit or proc_cfg.get("batch_size", 50)
max_body = proc_cfg.get("max_body_chars", 2000)
dry_run = dry_run or proc_cfg.get("dry_run", False)
mailbox = proc_cfg.get("mailbox", "INBOX")
log.info("Connecting to Gmail as %s", gmail_cfg["email"])
client = GmailClient(gmail_cfg["email"], gmail_cfg["app_password"])
db = init_db(DB_PATH)
try:
uids = client.fetch_uids(mailbox=mailbox, batch_size=batch_size)
log.info("Fetched %d message UIDs", len(uids))
stats = {cat: 0 for cat in categories}
stats["skipped"] = 0
stats["errors"] = 0
for i, uid in enumerate(uids, 1):
try:
msg = client.fetch_message(uid)
message_id = msg.get("Message-ID", f"uid-{uid.decode()}")
subject = decode_header(msg.get("Subject"))
sender = decode_header(msg.get("From"))
if not reprocess and is_processed(db, message_id):
stats["skipped"] += 1
continue
body = extract_text(msg, max_body)
log.info("[%d/%d] Classifying: %s (from: %s)",
i, len(uids), subject[:60], sender[:40])
category = classify_email(
ollama_cfg["url"],
ollama_cfg["model"],
categories,
subject,
sender,
body,
)
label = categories[category]["label"]
log.info("%s (%s)", category, label)
should_archive = categories[category].get("archive", False)
if not dry_run:
client.apply_label(uid, label)
if should_archive:
client.archive(uid)
log.info(" 📥 Archived")
mark_processed(db, message_id, category)
else:
log.info(" [DRY RUN] Would apply label: %s%s", label,
" + archive" if should_archive else "")
stats[category] = stats.get(category, 0) + 1
except Exception as e:
log.error("Error processing UID %s: %s", uid, e)
stats["errors"] += 1
continue
log.info("Done! Stats: %s", json.dumps(stats, indent=2))
finally:
client.close()
db.close()
def main():
parser = argparse.ArgumentParser(description="Gmail Organizer — LLM-powered email classification")
parser.add_argument("-c", "--config", type=Path, default=DEFAULT_CONFIG,
help="Path to config YAML (default: config.local.yaml)")
parser.add_argument("-n", "--dry-run", action="store_true",
help="Classify but don't apply labels")
parser.add_argument("--reprocess", action="store_true",
help="Re-classify already-processed emails")
parser.add_argument("--limit", type=int, default=None,
help="Override batch size")
parser.add_argument("-v", "--verbose", action="store_true",
help="Debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format=LOG_FMT,
)
if not args.config.exists():
log.error("Config not found: %s", args.config)
log.error("Copy config.yaml to config.local.yaml and fill in your credentials.")
sys.exit(1)
run(args.config, dry_run=args.dry_run, reprocess=args.reprocess, limit=args.limit)
if __name__ == "__main__":
main()