607 lines
22 KiB
Python
607 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""Gmail Organizer — classifies emails using a local LLM and applies Gmail labels."""
|
|
|
|
import argparse
|
|
import email
|
|
import email.header
|
|
import html
|
|
import imaplib
|
|
import json
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import yaml
|
|
|
|
LOG_FMT = "%(asctime)s %(levelname)-8s %(message)s"
|
|
log = logging.getLogger("gmail-organizer")
|
|
|
|
DB_PATH = Path(__file__).parent / "processed.db"
|
|
DEFAULT_CONFIG = Path(__file__).parent / "config.local.yaml"
|
|
|
|
|
|
# ── helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def load_config(path: Path) -> dict:
|
|
with open(path) as f:
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
def init_db(db_path: Path) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS processed (
|
|
message_id TEXT PRIMARY KEY,
|
|
category TEXT NOT NULL,
|
|
processed_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS sender_cache (
|
|
sender TEXT PRIMARY KEY,
|
|
category TEXT NOT NULL,
|
|
hit_count INTEGER DEFAULT 1,
|
|
last_seen TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.commit()
|
|
return conn
|
|
|
|
|
|
def is_processed(conn: sqlite3.Connection, message_id: str) -> bool:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM processed WHERE message_id = ?", (message_id,)
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def mark_processed(conn: sqlite3.Connection, message_id: str, category: str):
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO processed (message_id, category, processed_at) VALUES (?, ?, ?)",
|
|
(message_id, category, datetime.now(tz=ZoneInfo("UTC")).isoformat()),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def get_cached_sender(conn: sqlite3.Connection, sender: str, min_hits: int = 3) -> str | None:
|
|
"""Return cached category if sender has been classified the same way min_hits times."""
|
|
row = conn.execute(
|
|
"SELECT category, hit_count FROM sender_cache WHERE sender = ? AND hit_count >= ?",
|
|
(sender, min_hits),
|
|
).fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
def update_sender_cache(conn: sqlite3.Connection, sender: str, category: str):
|
|
"""Update sender cache. Increment if same category, reset if different."""
|
|
row = conn.execute(
|
|
"SELECT category FROM sender_cache WHERE sender = ?", (sender,)
|
|
).fetchone()
|
|
now = datetime.now(tz=ZoneInfo("UTC")).isoformat()
|
|
if row and row[0] == category:
|
|
conn.execute(
|
|
"UPDATE sender_cache SET hit_count = hit_count + 1, last_seen = ? WHERE sender = ?",
|
|
(now, sender),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO sender_cache (sender, category, hit_count, last_seen) VALUES (?, ?, 1, ?)",
|
|
(sender, category, now),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def decode_header(raw: str | None) -> str:
|
|
if not raw:
|
|
return ""
|
|
parts = email.header.decode_header(raw)
|
|
decoded = []
|
|
for data, charset in parts:
|
|
if isinstance(data, bytes):
|
|
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
decoded.append(data)
|
|
return " ".join(decoded)
|
|
|
|
|
|
def extract_email_address(from_header: str) -> str:
|
|
"""Extract just the email address from a From header."""
|
|
match = re.search(r'<([^>]+)>', from_header)
|
|
return match.group(1).lower() if match else from_header.strip().lower()
|
|
|
|
|
|
def extract_text(msg: email.message.Message, max_chars: int) -> str:
|
|
"""Extract plain-text body from an email, falling back to stripped HTML."""
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
break
|
|
elif ct == "text/html" and not body:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
raw_html = payload.decode(charset, errors="replace")
|
|
body = html.unescape(re.sub(r"<[^>]+>", " ", raw_html))
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
body = payload.decode(charset, errors="replace")
|
|
if msg.get_content_type() == "text/html":
|
|
body = html.unescape(re.sub(r"<[^>]+>", " ", body))
|
|
|
|
body = re.sub(r"\s+", " ", body).strip()
|
|
return body[:max_chars]
|
|
|
|
|
|
# ── Gmail IMAP ───────────────────────────────────────────────────────────────
|
|
|
|
class GmailClient:
|
|
def __init__(self, email_addr: str, app_password: "REDACTED_PASSWORD"
|
|
self.email = email_addr
|
|
self.conn = imaplib.IMAP4_SSL("imap.gmail.com")
|
|
self.conn.login(email_addr, app_password)
|
|
|
|
def fetch_uids(self, mailbox: str = "INBOX", search: str = "ALL",
|
|
batch_size: int = 50) -> list[bytes]:
|
|
self.conn.select(mailbox)
|
|
_, data = self.conn.search(None, search)
|
|
uids = data[0].split()
|
|
return list(reversed(uids[-batch_size:]))
|
|
|
|
def fetch_message(self, uid: bytes) -> email.message.Message:
|
|
_, data = self.conn.fetch(uid, "(RFC822)")
|
|
return email.message_from_bytes(data[0][1])
|
|
|
|
def get_labels(self, uid: bytes) -> list[str]:
|
|
"""Get existing Gmail labels (X-GM-LABELS) for a message."""
|
|
_, data = self.conn.fetch(uid, "(X-GM-LABELS)")
|
|
raw = data[0].decode() if isinstance(data[0], bytes) else str(data[0])
|
|
match = re.search(r'X-GM-LABELS \(([^)]*)\)', raw)
|
|
if match:
|
|
return match.group(1).split()
|
|
return []
|
|
|
|
def apply_label(self, uid: bytes, label: str):
|
|
"""Apply a Gmail label to a message. Creates the label if needed."""
|
|
result = self.conn.store(uid, "+X-GM-LABELS", f'("{label}")')
|
|
if result[0] != "OK":
|
|
try:
|
|
self.conn.create(label)
|
|
except imaplib.IMAP4.error:
|
|
pass
|
|
self.conn.copy(uid, label)
|
|
|
|
def archive(self, uid: bytes):
|
|
"""Archive a message (remove from INBOX)."""
|
|
self.conn.store(uid, "-X-GM-LABELS", '("\\\\Inbox")')
|
|
|
|
def star(self, uid: bytes):
|
|
"""Star/flag a message for priority."""
|
|
self.conn.store(uid, "+FLAGS", "\\Flagged")
|
|
|
|
def close(self):
|
|
try:
|
|
self.conn.close()
|
|
self.conn.logout()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ── Ollama LLM ───────────────────────────────────────────────────────────────
|
|
|
|
def _ollama_request(ollama_url: str, payload: dict, max_retries: int = 3) -> dict:
|
|
"""Make an Ollama API request with retry and exponential backoff."""
|
|
data = json.dumps(payload).encode()
|
|
req = urllib.request.Request(
|
|
f"{ollama_url.rstrip('/')}/api/generate",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
last_error = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=60) as resp:
|
|
return json.loads(resp.read())
|
|
except (urllib.error.URLError, TimeoutError) as e:
|
|
last_error = e
|
|
if attempt < max_retries - 1:
|
|
wait = 2 ** attempt
|
|
log.warning("Ollama attempt %d/%d failed: %s — retrying in %ds",
|
|
attempt + 1, max_retries, e, wait)
|
|
time.sleep(wait)
|
|
raise last_error
|
|
|
|
|
|
def classify_email(
|
|
ollama_url: str,
|
|
model: str,
|
|
categories: dict,
|
|
subject: str,
|
|
sender: str,
|
|
body_snippet: str,
|
|
) -> tuple[str, bool]:
|
|
"""Classify an email. Returns (category, is_confident)."""
|
|
cat_descriptions = "\n".join(
|
|
f"- **{name}**: {info['description']}" for name, info in categories.items()
|
|
)
|
|
category_names = ", ".join(categories.keys())
|
|
|
|
prompt = f"""Classify this email into exactly ONE category.
|
|
Reply with: CATEGORY CONFIDENCE
|
|
Where confidence is high, medium, or low.
|
|
|
|
Categories:
|
|
{cat_descriptions}
|
|
|
|
Email:
|
|
From: {sender}
|
|
Subject: {subject}
|
|
Body: {body_snippet[:1000]}
|
|
|
|
Reply with one of [{category_names}] followed by [high/medium/low]:"""
|
|
|
|
result = _ollama_request(ollama_url, {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {"temperature": 0.1, "num_predict": 30},
|
|
})
|
|
|
|
raw = result.get("response", "").strip().lower()
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
|
|
is_confident = "low" not in raw
|
|
for name in categories:
|
|
if name in raw:
|
|
return name, is_confident
|
|
|
|
log.warning("LLM returned unexpected category %r, defaulting to 'personal'", raw)
|
|
return "personal", False
|
|
|
|
|
|
def classify_batch(
|
|
ollama_url: str,
|
|
model: str,
|
|
categories: dict,
|
|
emails: list[dict],
|
|
) -> list[tuple[str, bool]]:
|
|
"""Classify multiple emails in one LLM call."""
|
|
if len(emails) == 1:
|
|
e = emails[0]
|
|
return [classify_email(ollama_url, model, categories,
|
|
e["subject"], e["sender"], e["body"])]
|
|
|
|
cat_descriptions = "\n".join(
|
|
f"- **{name}**: {info['description']}" for name, info in categories.items()
|
|
)
|
|
category_names = ", ".join(categories.keys())
|
|
|
|
sections = []
|
|
for i, e in enumerate(emails, 1):
|
|
sections.append(
|
|
f"[Email {i}]\nFrom: {e['sender']}\nSubject: {e['subject']}\nBody: {e['body'][:500]}"
|
|
)
|
|
|
|
prompt = f"""Classify each email into ONE category.
|
|
Reply with one line per email: NUMBER: CATEGORY CONFIDENCE
|
|
|
|
Categories:
|
|
{cat_descriptions}
|
|
|
|
Valid: {category_names}
|
|
Confidence: high, medium, or low
|
|
|
|
{chr(10).join(sections)}
|
|
|
|
Reply:"""
|
|
|
|
result = _ollama_request(ollama_url, {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {"temperature": 0.1, "num_predict": 20 * len(emails)},
|
|
})
|
|
|
|
raw = result.get("response", "").strip().lower()
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|
|
|
results = []
|
|
lines = raw.split("\n")
|
|
for i, e in enumerate(emails):
|
|
found = False
|
|
for line in lines:
|
|
if str(i + 1) in line:
|
|
confident = "low" not in line
|
|
for name in categories:
|
|
if name in line:
|
|
results.append((name, confident))
|
|
found = True
|
|
break
|
|
if found:
|
|
break
|
|
if not found:
|
|
results.append(classify_email(
|
|
ollama_url, model, categories,
|
|
e["subject"], e["sender"], e["body"],
|
|
))
|
|
return results
|
|
|
|
|
|
# ── notifications ────────────────────────────────────────────────────────────
|
|
|
|
def send_notification(url: str, message: str, title: str = "Gmail Organizer"):
|
|
"""Send a notification via ntfy/webhook URL."""
|
|
if not url:
|
|
return
|
|
try:
|
|
req = urllib.request.Request(url, data=message.encode(), headers={
|
|
"Title": title, "Content-Type": "text/plain",
|
|
})
|
|
with urllib.request.urlopen(req, timeout=10):
|
|
pass
|
|
except Exception as e:
|
|
log.warning("Failed to send notification: %s", e)
|
|
|
|
|
|
# ── digest ───────────────────────────────────────────────────────────────────
|
|
|
|
def generate_digest(stats: dict, details: list[dict]) -> str:
|
|
"""Generate a summary of what was processed."""
|
|
lines = ["Gmail Organizer Summary", "=" * 25, ""]
|
|
classified = sum(v for k, v in stats.items()
|
|
if k not in ("skipped", "errors", "cached", "low_confidence"))
|
|
lines.append(f"Classified: {classified}")
|
|
lines.append(f"Cached (sender match): {stats.get('cached', 0)}")
|
|
lines.append(f"Skipped (already done): {stats.get('skipped', 0)}")
|
|
lines.append(f"Low confidence: {stats.get('low_confidence', 0)}")
|
|
lines.append(f"Errors: {stats.get('errors', 0)}")
|
|
if details:
|
|
lines.append("")
|
|
lines.append("Details:")
|
|
for d in details:
|
|
tag = " [cached]" if d.get("cached") else ""
|
|
lines.append(f" {d['category']:>12} | {d['subject'][:50]}{tag}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── main ─────────────────────────────────────────────────────────────────────
|
|
|
|
def _apply_result(
|
|
client, db, uid, message_id, subject, sender_email,
|
|
category, is_confident, categories, stats, details,
|
|
dry_run, use_confidence, priority_cats,
|
|
):
|
|
"""Apply classification: label, archive, star, record."""
|
|
if use_confidence and not is_confident:
|
|
log.info(" -> %s (low confidence, skipping)", category)
|
|
stats["low_confidence"] += 1
|
|
return
|
|
|
|
label = categories[category]["label"]
|
|
should_archive = categories[category].get("archive", False)
|
|
should_star = category in priority_cats
|
|
log.info(" -> %s (%s)", category, label)
|
|
|
|
if not dry_run:
|
|
client.apply_label(uid, label)
|
|
if should_archive:
|
|
client.archive(uid)
|
|
log.info(" archived")
|
|
if should_star:
|
|
client.star(uid)
|
|
log.info(" starred")
|
|
mark_processed(db, message_id, category)
|
|
else:
|
|
extras = []
|
|
if should_archive:
|
|
extras.append("archive")
|
|
if should_star:
|
|
extras.append("star")
|
|
log.info(" [DRY RUN] %s%s", label, (" + " + ", ".join(extras)) if extras else "")
|
|
|
|
stats[category] = stats.get(category, 0) + 1
|
|
details.append({"subject": subject, "category": category,
|
|
"sender": sender_email, "cached": False})
|
|
|
|
|
|
def run(config_path: Path, dry_run: bool = False, reprocess: bool = False,
|
|
limit: int | None = None, digest: bool = False):
|
|
cfg = load_config(config_path)
|
|
gmail_cfg = cfg["gmail"]
|
|
ollama_cfg = cfg["ollama"]
|
|
categories = cfg["categories"]
|
|
proc_cfg = cfg.get("processing", {})
|
|
notify_cfg = cfg.get("notifications", {})
|
|
|
|
batch_size = limit or proc_cfg.get("batch_size", 50)
|
|
max_body = proc_cfg.get("max_body_chars", 2000)
|
|
dry_run = dry_run or proc_cfg.get("dry_run", False)
|
|
mailbox = proc_cfg.get("mailbox", "INBOX")
|
|
use_confidence = proc_cfg.get("confidence_threshold", True)
|
|
sender_min_hits = proc_cfg.get("sender_cache_min_hits", 3)
|
|
use_batch = proc_cfg.get("batch_classify", False)
|
|
batch_sz = proc_cfg.get("batch_classify_size", 5)
|
|
priority_cats = proc_cfg.get("priority_star", [])
|
|
|
|
log.info("Connecting to Gmail as %s", gmail_cfg["email"])
|
|
client = GmailClient(gmail_cfg["email"], gmail_cfg["app_password"])
|
|
db = init_db(DB_PATH)
|
|
|
|
try:
|
|
uids = client.fetch_uids(mailbox=mailbox, batch_size=batch_size)
|
|
log.info("Fetched %d message UIDs", len(uids))
|
|
|
|
stats = {cat: 0 for cat in categories}
|
|
stats.update({"skipped": 0, "errors": 0, "cached": 0, "low_confidence": 0})
|
|
details = []
|
|
consecutive_errors = 0
|
|
pending = []
|
|
|
|
for i, uid in enumerate(uids, 1):
|
|
try:
|
|
msg = client.fetch_message(uid)
|
|
message_id = msg.get("Message-ID", f"uid-{uid.decode()}")
|
|
subject = decode_header(msg.get("Subject"))
|
|
sender = decode_header(msg.get("From"))
|
|
sender_email = extract_email_address(sender)
|
|
|
|
if not reprocess and is_processed(db, message_id):
|
|
stats["skipped"] += 1
|
|
continue
|
|
|
|
body = extract_text(msg, max_body)
|
|
|
|
# Sender cache — skip LLM if sender always gets same category
|
|
cached = get_cached_sender(db, sender_email, sender_min_hits)
|
|
if cached and not reprocess:
|
|
log.info("[%d/%d] Cached: %s -> %s (from: %s)",
|
|
i, len(uids), subject[:50], cached, sender_email)
|
|
_apply_result(
|
|
client, db, uid, message_id, subject, sender_email,
|
|
cached, True, categories, stats, details,
|
|
dry_run, False, priority_cats,
|
|
)
|
|
details[-1]["cached"] = True
|
|
stats["cached"] += 1
|
|
update_sender_cache(db, sender_email, cached)
|
|
consecutive_errors = 0
|
|
continue
|
|
|
|
# Batch mode: accumulate then classify together
|
|
if use_batch:
|
|
pending.append({
|
|
"uid": uid, "message_id": message_id,
|
|
"subject": subject, "sender": sender,
|
|
"sender_email": sender_email, "body": body,
|
|
})
|
|
if len(pending) < batch_sz and i < len(uids):
|
|
continue
|
|
log.info("Classifying batch of %d...", len(pending))
|
|
results = classify_batch(
|
|
ollama_cfg["url"], ollama_cfg["model"], categories, pending,
|
|
)
|
|
for item, (cat, conf) in zip(pending, results):
|
|
_apply_result(
|
|
client, db, item["uid"], item["message_id"],
|
|
item["subject"], item["sender_email"],
|
|
cat, conf, categories, stats, details,
|
|
dry_run, use_confidence, priority_cats,
|
|
)
|
|
update_sender_cache(db, item["sender_email"], cat)
|
|
pending = []
|
|
consecutive_errors = 0
|
|
continue
|
|
|
|
# Single classification
|
|
log.info("[%d/%d] Classifying: %s (from: %s)",
|
|
i, len(uids), subject[:60], sender[:40])
|
|
category, is_confident = classify_email(
|
|
ollama_cfg["url"], ollama_cfg["model"],
|
|
categories, subject, sender, body,
|
|
)
|
|
_apply_result(
|
|
client, db, uid, message_id, subject, sender_email,
|
|
category, is_confident, categories, stats, details,
|
|
dry_run, use_confidence, priority_cats,
|
|
)
|
|
update_sender_cache(db, sender_email, category)
|
|
consecutive_errors = 0
|
|
|
|
except Exception as e:
|
|
log.error("Error processing UID %s: %s", uid, e)
|
|
stats["errors"] += 1
|
|
consecutive_errors += 1
|
|
if consecutive_errors >= 5:
|
|
log.error("5 consecutive errors, aborting")
|
|
send_notification(
|
|
notify_cfg.get("url"),
|
|
f"Gmail Organizer: 5 consecutive errors. Last: {e}",
|
|
title="Gmail Organizer ALERT",
|
|
)
|
|
break
|
|
|
|
# Flush remaining batch
|
|
if pending:
|
|
try:
|
|
results = classify_batch(
|
|
ollama_cfg["url"], ollama_cfg["model"], categories, pending,
|
|
)
|
|
for item, (cat, conf) in zip(pending, results):
|
|
_apply_result(
|
|
client, db, item["uid"], item["message_id"],
|
|
item["subject"], item["sender_email"],
|
|
cat, conf, categories, stats, details,
|
|
dry_run, use_confidence, priority_cats,
|
|
)
|
|
update_sender_cache(db, item["sender_email"], cat)
|
|
except Exception as e:
|
|
log.error("Batch failed: %s", e)
|
|
stats["errors"] += len(pending)
|
|
|
|
log.info("Done! Stats: %s", json.dumps(stats, indent=2))
|
|
|
|
if digest or notify_cfg.get("digest"):
|
|
digest_text = generate_digest(stats, details)
|
|
log.info("\n%s", digest_text)
|
|
if notify_cfg.get("url"):
|
|
send_notification(notify_cfg["url"], digest_text)
|
|
|
|
if stats["errors"] > 0 and notify_cfg.get("url"):
|
|
send_notification(
|
|
notify_cfg["url"],
|
|
f"Completed with {stats['errors']} errors",
|
|
title="Gmail Organizer Warning",
|
|
)
|
|
|
|
finally:
|
|
client.close()
|
|
db.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Gmail Organizer — LLM-powered email classification")
|
|
parser.add_argument("-c", "--config", type=Path, default=DEFAULT_CONFIG,
|
|
help="Path to config YAML (default: config.local.yaml)")
|
|
parser.add_argument("-n", "--dry-run", action="store_true",
|
|
help="Classify but don't apply labels")
|
|
parser.add_argument("--reprocess", action="store_true",
|
|
help="Re-classify already-processed emails")
|
|
parser.add_argument("--limit", type=int, default=None,
|
|
help="Override batch size")
|
|
parser.add_argument("--digest", action="store_true",
|
|
help="Print classification digest summary")
|
|
parser.add_argument("-v", "--verbose", action="store_true",
|
|
help="Debug logging")
|
|
|
|
args = parser.parse_args()
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format=LOG_FMT,
|
|
)
|
|
|
|
if not args.config.exists():
|
|
log.error("Config not found: %s", args.config)
|
|
log.error("Copy config.yaml to config.local.yaml and fill in your credentials.")
|
|
sys.exit(1)
|
|
|
|
run(args.config, dry_run=args.dry_run, reprocess=args.reprocess,
|
|
limit=args.limit, digest=args.digest)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|