Sanitized mirror from private repository - 2026-04-01 03:48:17 UTC
This commit is contained in:
4
scripts/gmail-organizer/.gitignore
vendored
Normal file
4
scripts/gmail-organizer/.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
config.local.yaml
|
||||
processed.db
|
||||
__pycache__/
|
||||
*.pyc
|
||||
47
scripts/gmail-organizer/config.yaml
Normal file
47
scripts/gmail-organizer/config.yaml
Normal file
@@ -0,0 +1,47 @@
|
||||
# Gmail Organizer Configuration
|
||||
# Copy this to config.local.yaml and fill in your credentials
|
||||
|
||||
gmail:
|
||||
email: "your.email@gmail.com"
|
||||
app_password: "REDACTED_PASSWORD" xxxx xxxx xxxx" # 16-char app password from Google # pragma: allowlist secret
|
||||
|
||||
ollama:
|
||||
url: "https://a5be22681.vishinator.olares.com"
|
||||
model: "qwen3-coder:latest"
|
||||
|
||||
# Categories and their Gmail labels
|
||||
# The LLM will classify each email into one of these
|
||||
categories:
|
||||
receipts:
|
||||
label: "AutoOrg/Receipts"
|
||||
description: "Purchase confirmations, invoices, payment receipts, order updates"
|
||||
archive: false # keep in inbox — you may need to act on these
|
||||
newsletters:
|
||||
label: "AutoOrg/Newsletters"
|
||||
description: "Mailing lists, digests, blog updates, promotional content from subscriptions"
|
||||
archive: true # auto-archive out of inbox
|
||||
work:
|
||||
label: "AutoOrg/Work"
|
||||
description: "Professional correspondence, meeting invites, project updates, work tools"
|
||||
archive: false
|
||||
accounts:
|
||||
label: "AutoOrg/Accounts"
|
||||
description: "Security alerts, password resets, 2FA notifications, account verification, login alerts from services"
|
||||
archive: true # auto-archive — check label if needed
|
||||
spam:
|
||||
label: "AutoOrg/Spam"
|
||||
description: "Unsolicited marketing, phishing attempts, junk mail that bypassed filters"
|
||||
archive: true # auto-archive junk
|
||||
personal:
|
||||
label: "AutoOrg/Personal"
|
||||
description: "Friends, family, personal accounts, non-work non-commercial emails"
|
||||
archive: false
|
||||
|
||||
# Processing settings
|
||||
processing:
|
||||
batch_size: 50 # Emails per run
|
||||
max_body_chars: 2000 # Truncate body to save tokens
|
||||
skip_already_labeled: true
|
||||
dry_run: false # Set true to preview without applying labels
|
||||
process_read: true # Also process already-read emails
|
||||
mailbox: "INBOX" # IMAP mailbox to process
|
||||
332
scripts/gmail-organizer/gmail_organizer.py
Normal file
332
scripts/gmail-organizer/gmail_organizer.py
Normal file
@@ -0,0 +1,332 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Gmail Organizer — classifies emails using a local LLM and applies Gmail labels."""
|
||||
|
||||
import argparse
|
||||
import email
|
||||
import email.header
|
||||
import html
|
||||
import imaplib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
LOG_FMT = "%(asctime)s %(levelname)-8s %(message)s"
|
||||
log = logging.getLogger("gmail-organizer")
|
||||
|
||||
DB_PATH = Path(__file__).parent / "processed.db"
|
||||
DEFAULT_CONFIG = Path(__file__).parent / "config.local.yaml"
|
||||
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def load_config(path: Path) -> dict:
|
||||
with open(path) as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def init_db(db_path: Path) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS processed (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
category TEXT NOT NULL,
|
||||
processed_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def is_processed(conn: sqlite3.Connection, message_id: str) -> bool:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM processed WHERE message_id = ?", (message_id,)
|
||||
).fetchone()
|
||||
return row is not None
|
||||
|
||||
|
||||
def mark_processed(conn: sqlite3.Connection, message_id: str, category: str):
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO processed (message_id, category, processed_at) VALUES (?, ?, ?)",
|
||||
(message_id, category, datetime.now(tz=__import__('zoneinfo').ZoneInfo("UTC")).isoformat()),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def decode_header(raw: str | None) -> str:
|
||||
if not raw:
|
||||
return ""
|
||||
parts = email.header.decode_header(raw)
|
||||
decoded = []
|
||||
for data, charset in parts:
|
||||
if isinstance(data, bytes):
|
||||
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
||||
else:
|
||||
decoded.append(data)
|
||||
return " ".join(decoded)
|
||||
|
||||
|
||||
def extract_text(msg: email.message.Message, max_chars: int) -> str:
|
||||
"""Extract plain-text body from an email, falling back to stripped HTML."""
|
||||
body = ""
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
ct = part.get_content_type()
|
||||
if ct == "text/plain":
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = part.get_content_charset() or "utf-8"
|
||||
body = payload.decode(charset, errors="replace")
|
||||
break
|
||||
elif ct == "text/html" and not body:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = part.get_content_charset() or "utf-8"
|
||||
raw_html = payload.decode(charset, errors="replace")
|
||||
body = html.unescape(re.sub(r"<[^>]+>", " ", raw_html))
|
||||
else:
|
||||
payload = msg.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = msg.get_content_charset() or "utf-8"
|
||||
body = payload.decode(charset, errors="replace")
|
||||
if msg.get_content_type() == "text/html":
|
||||
body = html.unescape(re.sub(r"<[^>]+>", " ", body))
|
||||
|
||||
# Collapse whitespace and truncate
|
||||
body = re.sub(r"\s+", " ", body).strip()
|
||||
return body[:max_chars]
|
||||
|
||||
|
||||
# ── Gmail IMAP ───────────────────────────────────────────────────────────────
|
||||
|
||||
class GmailClient:
|
||||
def __init__(self, email_addr: str, app_password: "REDACTED_PASSWORD"
|
||||
self.email = email_addr
|
||||
self.conn = imaplib.IMAP4_SSL("imap.gmail.com")
|
||||
self.conn.login(email_addr, app_password)
|
||||
|
||||
def fetch_uids(self, mailbox: str = "INBOX", search: str = "ALL",
|
||||
batch_size: int = 50) -> list[bytes]:
|
||||
self.conn.select(mailbox)
|
||||
_, data = self.conn.search(None, search)
|
||||
uids = data[0].split()
|
||||
# Most recent first
|
||||
return list(reversed(uids[-batch_size:]))
|
||||
|
||||
def fetch_message(self, uid: bytes) -> email.message.Message:
|
||||
_, data = self.conn.fetch(uid, "(RFC822)")
|
||||
return email.message_from_bytes(data[0][1])
|
||||
|
||||
def get_labels(self, uid: bytes) -> list[str]:
|
||||
"""Get existing Gmail labels (X-GM-LABELS) for a message."""
|
||||
_, data = self.conn.fetch(uid, "(X-GM-LABELS)")
|
||||
raw = data[0].decode() if isinstance(data[0], bytes) else str(data[0])
|
||||
match = re.search(r'X-GM-LABELS \(([^)]*)\)', raw)
|
||||
if match:
|
||||
return match.group(1).split()
|
||||
return []
|
||||
|
||||
def apply_label(self, uid: bytes, label: str):
|
||||
"""Apply a Gmail label to a message. Creates the label if needed."""
|
||||
# Gmail IMAP uses X-GM-LABELS for label manipulation
|
||||
result = self.conn.store(uid, "+X-GM-LABELS", f'("{label}")')
|
||||
if result[0] != "OK":
|
||||
# Fallback: copy to label (which creates it as a folder)
|
||||
try:
|
||||
self.conn.create(label)
|
||||
except imaplib.IMAP4.error:
|
||||
pass # Label already exists
|
||||
self.conn.copy(uid, label)
|
||||
|
||||
def archive(self, uid: bytes):
|
||||
"""Archive a message (remove from INBOX by removing \\Inbox label)."""
|
||||
self.conn.store(uid, "-X-GM-LABELS", '("\\\\Inbox")')
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
self.conn.logout()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ── Ollama LLM ───────────────────────────────────────────────────────────────
|
||||
|
||||
def classify_email(
|
||||
ollama_url: str,
|
||||
model: str,
|
||||
categories: dict,
|
||||
subject: str,
|
||||
sender: str,
|
||||
body_snippet: str,
|
||||
) -> str:
|
||||
"""Ask the LLM to classify an email into one of the categories."""
|
||||
|
||||
cat_descriptions = "\n".join(
|
||||
f"- **{name}**: {info['description']}" for name, info in categories.items()
|
||||
)
|
||||
category_names = ", ".join(categories.keys())
|
||||
|
||||
prompt = f"""Classify this email into exactly ONE category. Reply with ONLY the category name, nothing else.
|
||||
|
||||
Categories:
|
||||
{cat_descriptions}
|
||||
|
||||
Email:
|
||||
From: {sender}
|
||||
Subject: {subject}
|
||||
Body: {body_snippet[:1000]}
|
||||
|
||||
Reply with one of: {category_names}"""
|
||||
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {
|
||||
"temperature": 0.1,
|
||||
"num_predict": 20,
|
||||
},
|
||||
}).encode()
|
||||
|
||||
req = urllib.request.Request(
|
||||
f"{ollama_url.rstrip('/')}/api/generate",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
result = json.loads(resp.read())
|
||||
except urllib.error.URLError as e:
|
||||
log.error("Ollama request failed: %s", e)
|
||||
raise
|
||||
|
||||
raw_response = result.get("response", "").strip().lower()
|
||||
# Strip any thinking tags (qwen3 sometimes wraps reasoning in <think>...</think>)
|
||||
raw_response = re.sub(r"<think>.*?</think>", "", raw_response, flags=re.DOTALL).strip()
|
||||
# Extract just the category name
|
||||
for name in categories:
|
||||
if name in raw_response:
|
||||
return name
|
||||
|
||||
log.warning("LLM returned unexpected category %r, defaulting to 'personal'", raw_response)
|
||||
return "personal"
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def run(config_path: Path, dry_run: bool = False, reprocess: bool = False,
|
||||
limit: int | None = None):
|
||||
cfg = load_config(config_path)
|
||||
gmail_cfg = cfg["gmail"]
|
||||
ollama_cfg = cfg["ollama"]
|
||||
categories = cfg["categories"]
|
||||
proc_cfg = cfg.get("processing", {})
|
||||
|
||||
batch_size = limit or proc_cfg.get("batch_size", 50)
|
||||
max_body = proc_cfg.get("max_body_chars", 2000)
|
||||
dry_run = dry_run or proc_cfg.get("dry_run", False)
|
||||
mailbox = proc_cfg.get("mailbox", "INBOX")
|
||||
|
||||
log.info("Connecting to Gmail as %s", gmail_cfg["email"])
|
||||
client = GmailClient(gmail_cfg["email"], gmail_cfg["app_password"])
|
||||
db = init_db(DB_PATH)
|
||||
|
||||
try:
|
||||
uids = client.fetch_uids(mailbox=mailbox, batch_size=batch_size)
|
||||
log.info("Fetched %d message UIDs", len(uids))
|
||||
|
||||
stats = {cat: 0 for cat in categories}
|
||||
stats["skipped"] = 0
|
||||
stats["errors"] = 0
|
||||
|
||||
for i, uid in enumerate(uids, 1):
|
||||
try:
|
||||
msg = client.fetch_message(uid)
|
||||
message_id = msg.get("Message-ID", f"uid-{uid.decode()}")
|
||||
subject = decode_header(msg.get("Subject"))
|
||||
sender = decode_header(msg.get("From"))
|
||||
|
||||
if not reprocess and is_processed(db, message_id):
|
||||
stats["skipped"] += 1
|
||||
continue
|
||||
|
||||
body = extract_text(msg, max_body)
|
||||
log.info("[%d/%d] Classifying: %s (from: %s)",
|
||||
i, len(uids), subject[:60], sender[:40])
|
||||
|
||||
category = classify_email(
|
||||
ollama_cfg["url"],
|
||||
ollama_cfg["model"],
|
||||
categories,
|
||||
subject,
|
||||
sender,
|
||||
body,
|
||||
)
|
||||
label = categories[category]["label"]
|
||||
log.info(" → %s (%s)", category, label)
|
||||
|
||||
should_archive = categories[category].get("archive", False)
|
||||
|
||||
if not dry_run:
|
||||
client.apply_label(uid, label)
|
||||
if should_archive:
|
||||
client.archive(uid)
|
||||
log.info(" 📥 Archived")
|
||||
mark_processed(db, message_id, category)
|
||||
else:
|
||||
log.info(" [DRY RUN] Would apply label: %s%s", label,
|
||||
" + archive" if should_archive else "")
|
||||
|
||||
stats[category] = stats.get(category, 0) + 1
|
||||
|
||||
except Exception as e:
|
||||
log.error("Error processing UID %s: %s", uid, e)
|
||||
stats["errors"] += 1
|
||||
continue
|
||||
|
||||
log.info("Done! Stats: %s", json.dumps(stats, indent=2))
|
||||
|
||||
finally:
|
||||
client.close()
|
||||
db.close()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Gmail Organizer — LLM-powered email classification")
|
||||
parser.add_argument("-c", "--config", type=Path, default=DEFAULT_CONFIG,
|
||||
help="Path to config YAML (default: config.local.yaml)")
|
||||
parser.add_argument("-n", "--dry-run", action="store_true",
|
||||
help="Classify but don't apply labels")
|
||||
parser.add_argument("--reprocess", action="store_true",
|
||||
help="Re-classify already-processed emails")
|
||||
parser.add_argument("--limit", type=int, default=None,
|
||||
help="Override batch size")
|
||||
parser.add_argument("-v", "--verbose", action="store_true",
|
||||
help="Debug logging")
|
||||
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format=LOG_FMT,
|
||||
)
|
||||
|
||||
if not args.config.exists():
|
||||
log.error("Config not found: %s", args.config)
|
||||
log.error("Copy config.yaml to config.local.yaml and fill in your credentials.")
|
||||
sys.exit(1)
|
||||
|
||||
run(args.config, dry_run=args.dry_run, reprocess=args.reprocess, limit=args.limit)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
scripts/gmail-organizer/requirements.txt
Normal file
1
scripts/gmail-organizer/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
pyyaml>=6.0
|
||||
Reference in New Issue
Block a user