#!/usr/bin/env python3 """Gmail Organizer — classifies emails using a local LLM and applies Gmail labels.""" import argparse import email import email.header import html import imaplib import json import logging import re import sqlite3 import sys import time import urllib.request import urllib.error from datetime import datetime, timedelta from pathlib import Path import yaml LOG_FMT = "%(asctime)s %(levelname)-8s %(message)s" log = logging.getLogger("gmail-organizer") DB_PATH = Path(__file__).parent / "processed.db" DEFAULT_CONFIG = Path(__file__).parent / "config.local.yaml" # ── helpers ────────────────────────────────────────────────────────────────── def load_config(path: Path) -> dict: with open(path) as f: return yaml.safe_load(f) def init_db(db_path: Path) -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.execute(""" CREATE TABLE IF NOT EXISTS processed ( message_id TEXT PRIMARY KEY, category TEXT NOT NULL, processed_at TEXT NOT NULL ) """) conn.commit() return conn def is_processed(conn: sqlite3.Connection, message_id: str) -> bool: row = conn.execute( "SELECT 1 FROM processed WHERE message_id = ?", (message_id,) ).fetchone() return row is not None def mark_processed(conn: sqlite3.Connection, message_id: str, category: str): conn.execute( "INSERT OR REPLACE INTO processed (message_id, category, processed_at) VALUES (?, ?, ?)", (message_id, category, datetime.now(tz=__import__('zoneinfo').ZoneInfo("UTC")).isoformat()), ) conn.commit() def decode_header(raw: str | None) -> str: if not raw: return "" parts = email.header.decode_header(raw) decoded = [] for data, charset in parts: if isinstance(data, bytes): decoded.append(data.decode(charset or "utf-8", errors="replace")) else: decoded.append(data) return " ".join(decoded) def extract_text(msg: email.message.Message, max_chars: int) -> str: """Extract plain-text body from an email, falling back to stripped HTML.""" body = "" if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") break elif ct == "text/html" and not body: payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" raw_html = payload.decode(charset, errors="replace") body = html.unescape(re.sub(r"<[^>]+>", " ", raw_html)) else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") if msg.get_content_type() == "text/html": body = html.unescape(re.sub(r"<[^>]+>", " ", body)) # Collapse whitespace and truncate body = re.sub(r"\s+", " ", body).strip() return body[:max_chars] # ── Gmail IMAP ─────────────────────────────────────────────────────────────── class GmailClient: def __init__(self, email_addr: str, app_password: "REDACTED_PASSWORD" self.email = email_addr self.conn = imaplib.IMAP4_SSL("imap.gmail.com") self.conn.login(email_addr, app_password) def fetch_uids(self, mailbox: str = "INBOX", search: str = "ALL", batch_size: int = 50) -> list[bytes]: self.conn.select(mailbox) _, data = self.conn.search(None, search) uids = data[0].split() # Most recent first return list(reversed(uids[-batch_size:])) def fetch_message(self, uid: bytes) -> email.message.Message: _, data = self.conn.fetch(uid, "(RFC822)") return email.message_from_bytes(data[0][1]) def get_labels(self, uid: bytes) -> list[str]: """Get existing Gmail labels (X-GM-LABELS) for a message.""" _, data = self.conn.fetch(uid, "(X-GM-LABELS)") raw = data[0].decode() if isinstance(data[0], bytes) else str(data[0]) match = re.search(r'X-GM-LABELS \(([^)]*)\)', raw) if match: return match.group(1).split() return [] def apply_label(self, uid: bytes, label: str): """Apply a Gmail label to a message. Creates the label if needed.""" # Gmail IMAP uses X-GM-LABELS for label manipulation result = self.conn.store(uid, "+X-GM-LABELS", f'("{label}")') if result[0] != "OK": # Fallback: copy to label (which creates it as a folder) try: self.conn.create(label) except imaplib.IMAP4.error: pass # Label already exists self.conn.copy(uid, label) def archive(self, uid: bytes): """Archive a message (remove from INBOX by removing \\Inbox label).""" self.conn.store(uid, "-X-GM-LABELS", '("\\\\Inbox")') def close(self): try: self.conn.close() self.conn.logout() except Exception: pass # ── Ollama LLM ─────────────────────────────────────────────────────────────── def classify_email( ollama_url: str, model: str, categories: dict, subject: str, sender: str, body_snippet: str, ) -> str: """Ask the LLM to classify an email into one of the categories.""" cat_descriptions = "\n".join( f"- **{name}**: {info['description']}" for name, info in categories.items() ) category_names = ", ".join(categories.keys()) prompt = f"""Classify this email into exactly ONE category. Reply with ONLY the category name, nothing else. Categories: {cat_descriptions} Email: From: {sender} Subject: {subject} Body: {body_snippet[:1000]} Reply with one of: {category_names}""" payload = json.dumps({ "model": model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, "num_predict": 20, }, }).encode() req = urllib.request.Request( f"{ollama_url.rstrip('/')}/api/generate", data=payload, headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=60) as resp: result = json.loads(resp.read()) except urllib.error.URLError as e: log.error("Ollama request failed: %s", e) raise raw_response = result.get("response", "").strip().lower() # Strip any thinking tags (qwen3 sometimes wraps reasoning in ...) raw_response = re.sub(r".*?", "", raw_response, flags=re.DOTALL).strip() # Extract just the category name for name in categories: if name in raw_response: return name log.warning("LLM returned unexpected category %r, defaulting to 'personal'", raw_response) return "personal" # ── main ───────────────────────────────────────────────────────────────────── def run(config_path: Path, dry_run: bool = False, reprocess: bool = False, limit: int | None = None): cfg = load_config(config_path) gmail_cfg = cfg["gmail"] ollama_cfg = cfg["ollama"] categories = cfg["categories"] proc_cfg = cfg.get("processing", {}) batch_size = limit or proc_cfg.get("batch_size", 50) max_body = proc_cfg.get("max_body_chars", 2000) dry_run = dry_run or proc_cfg.get("dry_run", False) mailbox = proc_cfg.get("mailbox", "INBOX") log.info("Connecting to Gmail as %s", gmail_cfg["email"]) client = GmailClient(gmail_cfg["email"], gmail_cfg["app_password"]) db = init_db(DB_PATH) try: uids = client.fetch_uids(mailbox=mailbox, batch_size=batch_size) log.info("Fetched %d message UIDs", len(uids)) stats = {cat: 0 for cat in categories} stats["skipped"] = 0 stats["errors"] = 0 for i, uid in enumerate(uids, 1): try: msg = client.fetch_message(uid) message_id = msg.get("Message-ID", f"uid-{uid.decode()}") subject = decode_header(msg.get("Subject")) sender = decode_header(msg.get("From")) if not reprocess and is_processed(db, message_id): stats["skipped"] += 1 continue body = extract_text(msg, max_body) log.info("[%d/%d] Classifying: %s (from: %s)", i, len(uids), subject[:60], sender[:40]) category = classify_email( ollama_cfg["url"], ollama_cfg["model"], categories, subject, sender, body, ) label = categories[category]["label"] log.info(" → %s (%s)", category, label) should_archive = categories[category].get("archive", False) if not dry_run: client.apply_label(uid, label) if should_archive: client.archive(uid) log.info(" 📥 Archived") mark_processed(db, message_id, category) else: log.info(" [DRY RUN] Would apply label: %s%s", label, " + archive" if should_archive else "") stats[category] = stats.get(category, 0) + 1 except Exception as e: log.error("Error processing UID %s: %s", uid, e) stats["errors"] += 1 continue log.info("Done! Stats: %s", json.dumps(stats, indent=2)) finally: client.close() db.close() def main(): parser = argparse.ArgumentParser(description="Gmail Organizer — LLM-powered email classification") parser.add_argument("-c", "--config", type=Path, default=DEFAULT_CONFIG, help="Path to config YAML (default: config.local.yaml)") parser.add_argument("-n", "--dry-run", action="store_true", help="Classify but don't apply labels") parser.add_argument("--reprocess", action="store_true", help="Re-classify already-processed emails") parser.add_argument("--limit", type=int, default=None, help="Override batch size") parser.add_argument("-v", "--verbose", action="store_true", help="Debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format=LOG_FMT, ) if not args.config.exists(): log.error("Config not found: %s", args.config) log.error("Copy config.yaml to config.local.yaml and fill in your credentials.") sys.exit(1) run(args.config, dry_run=args.dry_run, reprocess=args.reprocess, limit=args.limit) if __name__ == "__main__": main()