Sanitized mirror from private repository - 2026-04-05 11:04:10 UTC
This commit is contained in:
4
scripts/gmail-organizer/.gitignore
vendored
Normal file
4
scripts/gmail-organizer/.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
config.local.yaml
|
||||
processed.db
|
||||
__pycache__/
|
||||
*.pyc
|
||||
58
scripts/gmail-organizer/config.yaml
Normal file
58
scripts/gmail-organizer/config.yaml
Normal file
@@ -0,0 +1,58 @@
|
||||
# Gmail Organizer Configuration
|
||||
# Copy this to config.local.yaml and fill in your credentials
|
||||
|
||||
gmail:
|
||||
email: "your.email@gmail.com"
|
||||
app_password: "REDACTED_PASSWORD" xxxx xxxx xxxx" # 16-char app password from Google # pragma: allowlist secret
|
||||
|
||||
ollama:
|
||||
url: "http://192.168.0.145:31434"
|
||||
model: "qwen3-coder:latest"
|
||||
|
||||
# Categories and their Gmail labels
|
||||
# The LLM will classify each email into one of these
|
||||
categories:
|
||||
receipts:
|
||||
label: "AutoOrg/Receipts"
|
||||
description: "Purchase confirmations, invoices, payment receipts, order updates"
|
||||
archive: false # keep in inbox — you may need to act on these
|
||||
newsletters:
|
||||
label: "AutoOrg/Newsletters"
|
||||
description: "Mailing lists, digests, blog updates, promotional content from subscriptions"
|
||||
archive: true # auto-archive out of inbox
|
||||
work:
|
||||
label: "AutoOrg/Work"
|
||||
description: "Professional correspondence, meeting invites, project updates, work tools"
|
||||
archive: false
|
||||
accounts:
|
||||
label: "AutoOrg/Accounts"
|
||||
description: "Security alerts, password resets, 2FA notifications, account verification, login alerts from services"
|
||||
archive: true # auto-archive — check label if needed
|
||||
spam:
|
||||
label: "AutoOrg/Spam"
|
||||
description: "Unsolicited marketing, phishing attempts, junk mail that bypassed filters"
|
||||
archive: true # auto-archive junk
|
||||
personal:
|
||||
label: "AutoOrg/Personal"
|
||||
description: "Friends, family, personal accounts, non-work non-commercial emails"
|
||||
archive: false
|
||||
|
||||
# Processing settings
|
||||
processing:
|
||||
batch_size: 50 # Emails per run
|
||||
max_body_chars: 2000 # Truncate body to save tokens
|
||||
skip_already_labeled: true
|
||||
dry_run: false # Set true to preview without applying labels
|
||||
process_read: true # Also process already-read emails
|
||||
mailbox: "INBOX" # IMAP mailbox to process
|
||||
confidence_threshold: true # Skip emails where LLM confidence is "low"
|
||||
sender_cache_min_hits: 3 # After N identical classifications, skip LLM for this sender
|
||||
batch_classify: false # Classify multiple emails in one LLM call (faster, less accurate)
|
||||
batch_classify_size: 5 # How many emails per batch call
|
||||
priority_star: # Categories that get starred for attention
|
||||
- accounts
|
||||
|
||||
# Notifications (optional) — ntfy URL or similar webhook
|
||||
# notifications:
|
||||
# url: "https://ntfy.sh/REDACTED_TOPIC"
|
||||
# digest: true # Send summary after each run
|
||||
607
scripts/gmail-organizer/gmail_organizer.py
Normal file
607
scripts/gmail-organizer/gmail_organizer.py
Normal file
@@ -0,0 +1,607 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Gmail Organizer — classifies emails using a local LLM and applies Gmail labels."""
|
||||
|
||||
import argparse
|
||||
import email
|
||||
import email.header
|
||||
import html
|
||||
import imaplib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import yaml
|
||||
|
||||
LOG_FMT = "%(asctime)s %(levelname)-8s %(message)s"
|
||||
log = logging.getLogger("gmail-organizer")
|
||||
|
||||
DB_PATH = Path(__file__).parent / "processed.db"
|
||||
DEFAULT_CONFIG = Path(__file__).parent / "config.local.yaml"
|
||||
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def load_config(path: Path) -> dict:
|
||||
with open(path) as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def init_db(db_path: Path) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS processed (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
category TEXT NOT NULL,
|
||||
processed_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS sender_cache (
|
||||
sender TEXT PRIMARY KEY,
|
||||
category TEXT NOT NULL,
|
||||
hit_count INTEGER DEFAULT 1,
|
||||
last_seen TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def is_processed(conn: sqlite3.Connection, message_id: str) -> bool:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM processed WHERE message_id = ?", (message_id,)
|
||||
).fetchone()
|
||||
return row is not None
|
||||
|
||||
|
||||
def mark_processed(conn: sqlite3.Connection, message_id: str, category: str):
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO processed (message_id, category, processed_at) VALUES (?, ?, ?)",
|
||||
(message_id, category, datetime.now(tz=ZoneInfo("UTC")).isoformat()),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_cached_sender(conn: sqlite3.Connection, sender: str, min_hits: int = 3) -> str | None:
|
||||
"""Return cached category if sender has been classified the same way min_hits times."""
|
||||
row = conn.execute(
|
||||
"SELECT category, hit_count FROM sender_cache WHERE sender = ? AND hit_count >= ?",
|
||||
(sender, min_hits),
|
||||
).fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
def update_sender_cache(conn: sqlite3.Connection, sender: str, category: str):
|
||||
"""Update sender cache. Increment if same category, reset if different."""
|
||||
row = conn.execute(
|
||||
"SELECT category FROM sender_cache WHERE sender = ?", (sender,)
|
||||
).fetchone()
|
||||
now = datetime.now(tz=ZoneInfo("UTC")).isoformat()
|
||||
if row and row[0] == category:
|
||||
conn.execute(
|
||||
"UPDATE sender_cache SET hit_count = hit_count + 1, last_seen = ? WHERE sender = ?",
|
||||
(now, sender),
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO sender_cache (sender, category, hit_count, last_seen) VALUES (?, ?, 1, ?)",
|
||||
(sender, category, now),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def decode_header(raw: str | None) -> str:
|
||||
if not raw:
|
||||
return ""
|
||||
parts = email.header.decode_header(raw)
|
||||
decoded = []
|
||||
for data, charset in parts:
|
||||
if isinstance(data, bytes):
|
||||
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
||||
else:
|
||||
decoded.append(data)
|
||||
return " ".join(decoded)
|
||||
|
||||
|
||||
def extract_email_address(from_header: str) -> str:
|
||||
"""Extract just the email address from a From header."""
|
||||
match = re.search(r'<([^>]+)>', from_header)
|
||||
return match.group(1).lower() if match else from_header.strip().lower()
|
||||
|
||||
|
||||
def extract_text(msg: email.message.Message, max_chars: int) -> str:
|
||||
"""Extract plain-text body from an email, falling back to stripped HTML."""
|
||||
body = ""
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
ct = part.get_content_type()
|
||||
if ct == "text/plain":
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = part.get_content_charset() or "utf-8"
|
||||
body = payload.decode(charset, errors="replace")
|
||||
break
|
||||
elif ct == "text/html" and not body:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = part.get_content_charset() or "utf-8"
|
||||
raw_html = payload.decode(charset, errors="replace")
|
||||
body = html.unescape(re.sub(r"<[^>]+>", " ", raw_html))
|
||||
else:
|
||||
payload = msg.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = msg.get_content_charset() or "utf-8"
|
||||
body = payload.decode(charset, errors="replace")
|
||||
if msg.get_content_type() == "text/html":
|
||||
body = html.unescape(re.sub(r"<[^>]+>", " ", body))
|
||||
|
||||
body = re.sub(r"\s+", " ", body).strip()
|
||||
return body[:max_chars]
|
||||
|
||||
|
||||
# ── Gmail IMAP ───────────────────────────────────────────────────────────────
|
||||
|
||||
class GmailClient:
|
||||
def __init__(self, email_addr: str, app_password: "REDACTED_PASSWORD"
|
||||
self.email = email_addr
|
||||
self.conn = imaplib.IMAP4_SSL("imap.gmail.com")
|
||||
self.conn.login(email_addr, app_password)
|
||||
|
||||
def fetch_uids(self, mailbox: str = "INBOX", search: str = "ALL",
|
||||
batch_size: int = 50) -> list[bytes]:
|
||||
self.conn.select(mailbox)
|
||||
_, data = self.conn.search(None, search)
|
||||
uids = data[0].split()
|
||||
return list(reversed(uids[-batch_size:]))
|
||||
|
||||
def fetch_message(self, uid: bytes) -> email.message.Message:
|
||||
# Use BODY.PEEK[] to avoid marking as read (\Seen flag)
|
||||
_, data = self.conn.fetch(uid, "(BODY.PEEK[])")
|
||||
return email.message_from_bytes(data[0][1])
|
||||
|
||||
def get_labels(self, uid: bytes) -> list[str]:
|
||||
"""Get existing Gmail labels (X-GM-LABELS) for a message."""
|
||||
_, data = self.conn.fetch(uid, "(X-GM-LABELS)")
|
||||
raw = data[0].decode() if isinstance(data[0], bytes) else str(data[0])
|
||||
match = re.search(r'X-GM-LABELS \(([^)]*)\)', raw)
|
||||
if match:
|
||||
return match.group(1).split()
|
||||
return []
|
||||
|
||||
def apply_label(self, uid: bytes, label: str):
|
||||
"""Apply a Gmail label to a message. Creates the label if needed."""
|
||||
result = self.conn.store(uid, "+X-GM-LABELS", f'("{label}")')
|
||||
if result[0] != "OK":
|
||||
try:
|
||||
self.conn.create(label)
|
||||
except imaplib.IMAP4.error:
|
||||
pass
|
||||
self.conn.copy(uid, label)
|
||||
|
||||
def archive(self, uid: bytes):
|
||||
"""Archive a message (remove from INBOX)."""
|
||||
self.conn.store(uid, "-X-GM-LABELS", '("\\\\Inbox")')
|
||||
|
||||
def star(self, uid: bytes):
|
||||
"""Star/flag a message for priority."""
|
||||
self.conn.store(uid, "+FLAGS", "\\Flagged")
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
self.conn.logout()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ── Ollama LLM ───────────────────────────────────────────────────────────────
|
||||
|
||||
def _ollama_request(ollama_url: str, payload: dict, max_retries: int = 3) -> dict:
|
||||
"""Make an Ollama API request with retry and exponential backoff."""
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{ollama_url.rstrip('/')}/api/generate",
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
last_error = None
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
return json.loads(resp.read())
|
||||
except (urllib.error.URLError, TimeoutError) as e:
|
||||
last_error = e
|
||||
if attempt < max_retries - 1:
|
||||
wait = 2 ** attempt
|
||||
log.warning("Ollama attempt %d/%d failed: %s — retrying in %ds",
|
||||
attempt + 1, max_retries, e, wait)
|
||||
time.sleep(wait)
|
||||
raise last_error
|
||||
|
||||
|
||||
def classify_email(
|
||||
ollama_url: str,
|
||||
model: str,
|
||||
categories: dict,
|
||||
subject: str,
|
||||
sender: str,
|
||||
body_snippet: str,
|
||||
) -> tuple[str, bool]:
|
||||
"""Classify an email. Returns (category, is_confident)."""
|
||||
cat_descriptions = "\n".join(
|
||||
f"- **{name}**: {info['description']}" for name, info in categories.items()
|
||||
)
|
||||
category_names = ", ".join(categories.keys())
|
||||
|
||||
prompt = f"""Classify this email into exactly ONE category.
|
||||
Reply with: CATEGORY CONFIDENCE
|
||||
Where confidence is high, medium, or low.
|
||||
|
||||
Categories:
|
||||
{cat_descriptions}
|
||||
|
||||
Email:
|
||||
From: {sender}
|
||||
Subject: {subject}
|
||||
Body: {body_snippet[:1000]}
|
||||
|
||||
Reply with one of [{category_names}] followed by [high/medium/low]:"""
|
||||
|
||||
result = _ollama_request(ollama_url, {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 30},
|
||||
})
|
||||
|
||||
raw = result.get("response", "").strip().lower()
|
||||
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||
|
||||
is_confident = "low" not in raw
|
||||
for name in categories:
|
||||
if name in raw:
|
||||
return name, is_confident
|
||||
|
||||
log.warning("LLM returned unexpected category %r, defaulting to 'personal'", raw)
|
||||
return "personal", False
|
||||
|
||||
|
||||
def classify_batch(
|
||||
ollama_url: str,
|
||||
model: str,
|
||||
categories: dict,
|
||||
emails: list[dict],
|
||||
) -> list[tuple[str, bool]]:
|
||||
"""Classify multiple emails in one LLM call."""
|
||||
if len(emails) == 1:
|
||||
e = emails[0]
|
||||
return [classify_email(ollama_url, model, categories,
|
||||
e["subject"], e["sender"], e["body"])]
|
||||
|
||||
cat_descriptions = "\n".join(
|
||||
f"- **{name}**: {info['description']}" for name, info in categories.items()
|
||||
)
|
||||
category_names = ", ".join(categories.keys())
|
||||
|
||||
sections = []
|
||||
for i, e in enumerate(emails, 1):
|
||||
sections.append(
|
||||
f"[Email {i}]\nFrom: {e['sender']}\nSubject: {e['subject']}\nBody: {e['body'][:500]}"
|
||||
)
|
||||
|
||||
prompt = f"""Classify each email into ONE category.
|
||||
Reply with one line per email: NUMBER: CATEGORY CONFIDENCE
|
||||
|
||||
Categories:
|
||||
{cat_descriptions}
|
||||
|
||||
Valid: {category_names}
|
||||
Confidence: high, medium, or low
|
||||
|
||||
{chr(10).join(sections)}
|
||||
|
||||
Reply:"""
|
||||
|
||||
result = _ollama_request(ollama_url, {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 20 * len(emails)},
|
||||
})
|
||||
|
||||
raw = result.get("response", "").strip().lower()
|
||||
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
||||
|
||||
results = []
|
||||
lines = raw.split("\n")
|
||||
for i, e in enumerate(emails):
|
||||
found = False
|
||||
for line in lines:
|
||||
if str(i + 1) in line:
|
||||
confident = "low" not in line
|
||||
for name in categories:
|
||||
if name in line:
|
||||
results.append((name, confident))
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
if not found:
|
||||
results.append(classify_email(
|
||||
ollama_url, model, categories,
|
||||
e["subject"], e["sender"], e["body"],
|
||||
))
|
||||
return results
|
||||
|
||||
|
||||
# ── notifications ────────────────────────────────────────────────────────────
|
||||
|
||||
def send_notification(url: str, message: str, title: str = "Gmail Organizer"):
|
||||
"""Send a notification via ntfy/webhook URL."""
|
||||
if not url:
|
||||
return
|
||||
try:
|
||||
req = urllib.request.Request(url, data=message.encode(), headers={
|
||||
"Title": title, "Content-Type": "text/plain",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=10):
|
||||
pass
|
||||
except Exception as e:
|
||||
log.warning("Failed to send notification: %s", e)
|
||||
|
||||
|
||||
# ── digest ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def generate_digest(stats: dict, details: list[dict]) -> str:
|
||||
"""Generate a summary of what was processed."""
|
||||
lines = ["Gmail Organizer Summary", "=" * 25, ""]
|
||||
classified = sum(v for k, v in stats.items()
|
||||
if k not in ("skipped", "errors", "cached", "low_confidence"))
|
||||
lines.append(f"Classified: {classified}")
|
||||
lines.append(f"Cached (sender match): {stats.get('cached', 0)}")
|
||||
lines.append(f"Skipped (already done): {stats.get('skipped', 0)}")
|
||||
lines.append(f"Low confidence: {stats.get('low_confidence', 0)}")
|
||||
lines.append(f"Errors: {stats.get('errors', 0)}")
|
||||
if details:
|
||||
lines.append("")
|
||||
lines.append("Details:")
|
||||
for d in details:
|
||||
tag = " [cached]" if d.get("cached") else ""
|
||||
lines.append(f" {d['category']:>12} | {d['subject'][:50]}{tag}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _apply_result(
|
||||
client, db, uid, message_id, subject, sender_email,
|
||||
category, is_confident, categories, stats, details,
|
||||
dry_run, use_confidence, priority_cats,
|
||||
):
|
||||
"""Apply classification: label, archive, star, record."""
|
||||
if use_confidence and not is_confident:
|
||||
log.info(" -> %s (low confidence, skipping)", category)
|
||||
stats["low_confidence"] += 1
|
||||
return
|
||||
|
||||
label = categories[category]["label"]
|
||||
should_archive = categories[category].get("archive", False)
|
||||
should_star = category in priority_cats
|
||||
log.info(" -> %s (%s)", category, label)
|
||||
|
||||
if not dry_run:
|
||||
client.apply_label(uid, label)
|
||||
if should_archive:
|
||||
client.archive(uid)
|
||||
log.info(" archived")
|
||||
if should_star:
|
||||
client.star(uid)
|
||||
log.info(" starred")
|
||||
mark_processed(db, message_id, category)
|
||||
else:
|
||||
extras = []
|
||||
if should_archive:
|
||||
extras.append("archive")
|
||||
if should_star:
|
||||
extras.append("star")
|
||||
log.info(" [DRY RUN] %s%s", label, (" + " + ", ".join(extras)) if extras else "")
|
||||
|
||||
stats[category] = stats.get(category, 0) + 1
|
||||
details.append({"subject": subject, "category": category,
|
||||
"sender": sender_email, "cached": False})
|
||||
|
||||
|
||||
def run(config_path: Path, dry_run: bool = False, reprocess: bool = False,
|
||||
limit: int | None = None, digest: bool = False):
|
||||
cfg = load_config(config_path)
|
||||
gmail_cfg = cfg["gmail"]
|
||||
ollama_cfg = cfg["ollama"]
|
||||
categories = cfg["categories"]
|
||||
proc_cfg = cfg.get("processing", {})
|
||||
notify_cfg = cfg.get("notifications", {})
|
||||
|
||||
batch_size = limit or proc_cfg.get("batch_size", 50)
|
||||
max_body = proc_cfg.get("max_body_chars", 2000)
|
||||
dry_run = dry_run or proc_cfg.get("dry_run", False)
|
||||
mailbox = proc_cfg.get("mailbox", "INBOX")
|
||||
use_confidence = proc_cfg.get("confidence_threshold", True)
|
||||
sender_min_hits = proc_cfg.get("sender_cache_min_hits", 3)
|
||||
use_batch = proc_cfg.get("batch_classify", False)
|
||||
batch_sz = proc_cfg.get("batch_classify_size", 5)
|
||||
priority_cats = proc_cfg.get("priority_star", [])
|
||||
|
||||
log.info("Connecting to Gmail as %s", gmail_cfg["email"])
|
||||
client = GmailClient(gmail_cfg["email"], gmail_cfg["app_password"])
|
||||
db = init_db(DB_PATH)
|
||||
|
||||
try:
|
||||
uids = client.fetch_uids(mailbox=mailbox, batch_size=batch_size)
|
||||
log.info("Fetched %d message UIDs", len(uids))
|
||||
|
||||
stats = {cat: 0 for cat in categories}
|
||||
stats.update({"skipped": 0, "errors": 0, "cached": 0, "low_confidence": 0})
|
||||
details = []
|
||||
consecutive_errors = 0
|
||||
pending = []
|
||||
|
||||
for i, uid in enumerate(uids, 1):
|
||||
try:
|
||||
msg = client.fetch_message(uid)
|
||||
message_id = msg.get("Message-ID", f"uid-{uid.decode()}")
|
||||
subject = decode_header(msg.get("Subject"))
|
||||
sender = decode_header(msg.get("From"))
|
||||
sender_email = extract_email_address(sender)
|
||||
|
||||
if not reprocess and is_processed(db, message_id):
|
||||
stats["skipped"] += 1
|
||||
continue
|
||||
|
||||
body = extract_text(msg, max_body)
|
||||
|
||||
# Sender cache — skip LLM if sender always gets same category
|
||||
cached = get_cached_sender(db, sender_email, sender_min_hits)
|
||||
if cached and not reprocess:
|
||||
log.info("[%d/%d] Cached: %s -> %s (from: %s)",
|
||||
i, len(uids), subject[:50], cached, sender_email)
|
||||
_apply_result(
|
||||
client, db, uid, message_id, subject, sender_email,
|
||||
cached, True, categories, stats, details,
|
||||
dry_run, False, priority_cats,
|
||||
)
|
||||
details[-1]["cached"] = True
|
||||
stats["cached"] += 1
|
||||
update_sender_cache(db, sender_email, cached)
|
||||
consecutive_errors = 0
|
||||
continue
|
||||
|
||||
# Batch mode: accumulate then classify together
|
||||
if use_batch:
|
||||
pending.append({
|
||||
"uid": uid, "message_id": message_id,
|
||||
"subject": subject, "sender": sender,
|
||||
"sender_email": sender_email, "body": body,
|
||||
})
|
||||
if len(pending) < batch_sz and i < len(uids):
|
||||
continue
|
||||
log.info("Classifying batch of %d...", len(pending))
|
||||
results = classify_batch(
|
||||
ollama_cfg["url"], ollama_cfg["model"], categories, pending,
|
||||
)
|
||||
for item, (cat, conf) in zip(pending, results):
|
||||
_apply_result(
|
||||
client, db, item["uid"], item["message_id"],
|
||||
item["subject"], item["sender_email"],
|
||||
cat, conf, categories, stats, details,
|
||||
dry_run, use_confidence, priority_cats,
|
||||
)
|
||||
update_sender_cache(db, item["sender_email"], cat)
|
||||
pending = []
|
||||
consecutive_errors = 0
|
||||
continue
|
||||
|
||||
# Single classification
|
||||
log.info("[%d/%d] Classifying: %s (from: %s)",
|
||||
i, len(uids), subject[:60], sender[:40])
|
||||
category, is_confident = classify_email(
|
||||
ollama_cfg["url"], ollama_cfg["model"],
|
||||
categories, subject, sender, body,
|
||||
)
|
||||
_apply_result(
|
||||
client, db, uid, message_id, subject, sender_email,
|
||||
category, is_confident, categories, stats, details,
|
||||
dry_run, use_confidence, priority_cats,
|
||||
)
|
||||
update_sender_cache(db, sender_email, category)
|
||||
consecutive_errors = 0
|
||||
|
||||
except Exception as e:
|
||||
log.error("Error processing UID %s: %s", uid, e)
|
||||
stats["errors"] += 1
|
||||
consecutive_errors += 1
|
||||
if consecutive_errors >= 5:
|
||||
log.error("5 consecutive errors, aborting")
|
||||
send_notification(
|
||||
notify_cfg.get("url"),
|
||||
f"Gmail Organizer: 5 consecutive errors. Last: {e}",
|
||||
title="Gmail Organizer ALERT",
|
||||
)
|
||||
break
|
||||
|
||||
# Flush remaining batch
|
||||
if pending:
|
||||
try:
|
||||
results = classify_batch(
|
||||
ollama_cfg["url"], ollama_cfg["model"], categories, pending,
|
||||
)
|
||||
for item, (cat, conf) in zip(pending, results):
|
||||
_apply_result(
|
||||
client, db, item["uid"], item["message_id"],
|
||||
item["subject"], item["sender_email"],
|
||||
cat, conf, categories, stats, details,
|
||||
dry_run, use_confidence, priority_cats,
|
||||
)
|
||||
update_sender_cache(db, item["sender_email"], cat)
|
||||
except Exception as e:
|
||||
log.error("Batch failed: %s", e)
|
||||
stats["errors"] += len(pending)
|
||||
|
||||
log.info("Done! Stats: %s", json.dumps(stats, indent=2))
|
||||
|
||||
if digest or notify_cfg.get("digest"):
|
||||
digest_text = generate_digest(stats, details)
|
||||
log.info("\n%s", digest_text)
|
||||
if notify_cfg.get("url"):
|
||||
send_notification(notify_cfg["url"], digest_text)
|
||||
|
||||
if stats["errors"] > 0 and notify_cfg.get("url"):
|
||||
send_notification(
|
||||
notify_cfg["url"],
|
||||
f"Completed with {stats['errors']} errors",
|
||||
title="Gmail Organizer Warning",
|
||||
)
|
||||
|
||||
finally:
|
||||
client.close()
|
||||
db.close()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Gmail Organizer — LLM-powered email classification")
|
||||
parser.add_argument("-c", "--config", type=Path, default=DEFAULT_CONFIG,
|
||||
help="Path to config YAML (default: config.local.yaml)")
|
||||
parser.add_argument("-n", "--dry-run", action="store_true",
|
||||
help="Classify but don't apply labels")
|
||||
parser.add_argument("--reprocess", action="store_true",
|
||||
help="Re-classify already-processed emails")
|
||||
parser.add_argument("--limit", type=int, default=None,
|
||||
help="Override batch size")
|
||||
parser.add_argument("--digest", action="store_true",
|
||||
help="Print classification digest summary")
|
||||
parser.add_argument("-v", "--verbose", action="store_true",
|
||||
help="Debug logging")
|
||||
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format=LOG_FMT,
|
||||
)
|
||||
|
||||
if not args.config.exists():
|
||||
log.error("Config not found: %s", args.config)
|
||||
log.error("Copy config.yaml to config.local.yaml and fill in your credentials.")
|
||||
sys.exit(1)
|
||||
|
||||
run(args.config, dry_run=args.dry_run, reprocess=args.reprocess,
|
||||
limit=args.limit, digest=args.digest)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
scripts/gmail-organizer/requirements.txt
Normal file
1
scripts/gmail-organizer/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
pyyaml>=6.0
|
||||
Reference in New Issue
Block a user