#!/usr/bin/env python3 """Feature 15 — Receipt Tracker. Extracts structured data from receipt emails using LLM and appends to CSV. Reads processed.db files from email organizers, fetches email content via IMAP, sends to Ollama for extraction, and writes results to data/expenses.csv. Cron: 0 10 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/receipt-tracker.py """ import argparse import csv import email import imaplib import json import logging import os import sqlite3 import sys from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo import yaml sys.path.insert(0, str(Path(__file__).parent)) from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError from lib.notify import send_email log = logging.getLogger(__name__) REPO_ROOT = Path("/home/homelab/organized/repos/homelab") SCRIPTS_DIR = REPO_ROOT / "scripts" DATA_DIR = REPO_ROOT / "data" CSV_PATH = DATA_DIR / "expenses.csv" TRACKER_DB = SCRIPTS_DIR / "receipt-tracker.db" CSV_COLUMNS = ["date", "vendor", "amount", "currency", "order_number", "email_account", "message_id"] ACCOUNTS = [ { "name": "lzbellina92@gmail.com", "processed_db": SCRIPTS_DIR / "gmail-organizer" / "processed.db", "config": SCRIPTS_DIR / "gmail-organizer" / "config.local.yaml", "type": "gmail", }, { "name": "your-email@example.com", "processed_db": SCRIPTS_DIR / "gmail-organizer-dvish" / "processed.db", "config": SCRIPTS_DIR / "gmail-organizer-dvish" / "config.local.yaml", "type": "gmail", }, { "name": "admin@thevish.io", "processed_db": SCRIPTS_DIR / "proton-organizer" / "processed.db", "config": SCRIPTS_DIR / "proton-organizer" / "config.local.yaml", "type": "proton", }, ] LLM_PROMPT = ( 'Extract from this receipt email: vendor, amount (number only), ' 'currency (USD/EUR/etc), date (YYYY-MM-DD), order_number. ' 'Reply ONLY as JSON: {"vendor": "...", "amount": "...", "currency": "...", ' '"date": "...", "order_number": "..."}\n\n' ) # ── tracker DB ─────────────────────────────────────────────────────────────── def init_tracker_db(): """Create the tracker SQLite database if it doesn't exist.""" conn = sqlite3.connect(TRACKER_DB) conn.execute(""" CREATE TABLE IF NOT EXISTS tracked ( message_id TEXT PRIMARY KEY, extracted_at TEXT ) """) conn.commit() conn.close() def is_tracked(message_id: str) -> bool: """Check if a message_id has already been processed.""" conn = sqlite3.connect(TRACKER_DB) row = conn.execute("SELECT 1 FROM tracked WHERE message_id = ?", (message_id,)).fetchone() conn.close() return row is not None def mark_tracked(message_id: str): """Mark a message_id as tracked.""" conn = sqlite3.connect(TRACKER_DB) conn.execute( "INSERT OR IGNORE INTO tracked (message_id, extracted_at) VALUES (?, ?)", (message_id, datetime.now(tz=ZoneInfo("UTC")).isoformat()), ) conn.commit() conn.close() # ── processed.db queries ───────────────────────────────────────────────────── def get_receipt_message_ids(db_path: Path) -> list[str]: """Get message IDs categorized as receipts from a processed.db.""" if not db_path.exists(): log.warning("Processed DB not found: %s", db_path) return [] conn = sqlite3.connect(db_path) rows = conn.execute( "SELECT message_id FROM processed WHERE category = 'receipts'" ).fetchall() conn.close() return [row[0] for row in rows] # ── IMAP email fetching ───────────────────────────────────────────────────── def load_imap_credentials(config_path: Path, account_type: str) -> dict: """Load IMAP credentials from config.local.yaml.""" if not config_path.exists(): log.warning("Config file not found: %s", config_path) return {} with open(config_path) as f: cfg = yaml.safe_load(f) if account_type == "gmail": gmail = cfg.get("gmail", {}) return { "host": "imap.gmail.com", "port": 993, "user": gmail.get("email", ""), "password": gmail.get("app_password", ""), "ssl": True, } elif account_type == "proton": proton = cfg.get("proton", {}) return { "host": proton.get("host", "127.0.0.1"), "port": proton.get("port", 1143), "user": proton.get("email", ""), "password": proton.get("bridge_password", ""), "ssl": False, } return {} def fetch_email_body(creds: dict, message_id: str) -> str | None: """Connect via IMAP and fetch email body for the given Message-ID.""" if not creds: return None try: if creds.get("ssl"): imap = imaplib.IMAP4_SSL(creds["host"], creds["port"]) else: imap = imaplib.IMAP4(creds["host"], creds["port"]) imap.login(creds["user"], creds["password"]) # Search across all mail for mailbox in ["INBOX", "[Gmail]/All Mail", "All Mail", "INBOX.All", "Folders/AutoOrg-Receipts"]: try: status, _ = imap.select(f'"{mailbox}"', readonly=True) if status != "OK": continue except Exception: continue # Search by Message-ID header search_criteria = f'(HEADER Message-ID "{message_id}")' status, data = imap.search(None, search_criteria) if status != "OK" or not data[0]: continue msg_nums = data[0].split() if not msg_nums: continue # Fetch the first matching message status, msg_data = imap.fetch(msg_nums[0], "(RFC822)") if status != "OK": continue raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) body = _extract_body(msg) imap.logout() return body imap.logout() log.debug("Message-ID %s not found in any mailbox", message_id) return None except Exception as e: log.warning("IMAP fetch failed for %s: %s", message_id, e) return None def _extract_body(msg: email.message.Message) -> str: """Extract text body from email message, truncated to 3000 chars.""" body_parts = [] if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain": try: payload = part.get_payload(decode=True) if payload: body_parts.append(payload.decode("utf-8", errors="replace")) except Exception: pass elif content_type == "text/html" and not body_parts: try: payload = part.get_payload(decode=True) if payload: body_parts.append(payload.decode("utf-8", errors="replace")) except Exception: pass else: try: payload = msg.get_payload(decode=True) if payload: body_parts.append(payload.decode("utf-8", errors="replace")) except Exception: pass text = "\n".join(body_parts) # Include subject for context subject = msg.get("Subject", "") sender = msg.get("From", "") date = msg.get("Date", "") header = f"From: {sender}\nDate: {date}\nSubject: {subject}\n\n" return (header + text)[:3000] # ── LLM extraction ────────────────────────────────────────────────────────── def extract_receipt_data(email_body: str) -> dict | None: """Send email body to LLM and parse structured receipt data.""" prompt = LLM_PROMPT + email_body try: response = ollama_generate(prompt, temperature=0.1, num_predict=500) except OllamaUnavailableError as e: log.warning("LLM unavailable: %s", e) return None # Try to parse JSON from response try: # Handle cases where LLM wraps in markdown code blocks cleaned = response.strip() if cleaned.startswith("```"): lines = cleaned.split("\n") lines = [l for l in lines if not l.startswith("```")] cleaned = "\n".join(lines).strip() data = json.loads(cleaned) # Validate required fields required = {"vendor", "amount", "currency", "date"} if not all(k in data for k in required): log.warning("LLM response missing required fields: %s", data) return None return { "vendor": str(data.get("vendor", "")).strip(), "amount": str(data.get("amount", "")).strip(), "currency": str(data.get("currency", "USD")).strip(), "date": str(data.get("date", "")).strip(), "order_number": str(data.get("order_number", "")).strip(), } except (json.JSONDecodeError, ValueError) as e: log.warning("Failed to parse LLM response as JSON: %s — response: %s", e, response[:200]) return None # ── CSV output ─────────────────────────────────────────────────────────────── def init_csv(): """Create data directory and CSV with headers if they don't exist.""" DATA_DIR.mkdir(parents=True, exist_ok=True) if not CSV_PATH.exists(): with open(CSV_PATH, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) writer.writeheader() log.info("Created %s", CSV_PATH) def append_csv(row: dict): """Append a row to the expenses CSV.""" with open(CSV_PATH, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) writer.writerow(row) # ── main ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Receipt Tracker — extract structured data from receipt emails") parser.add_argument("--dry-run", action="store_true", help="Extract and display but don't write CSV or send email") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) if not ollama_available(): log.error("Ollama is not available. Cannot extract receipt data without LLM.") sys.exit(1) init_tracker_db() init_csv() new_receipts = [] extracted_count = 0 failed_count = 0 for account in ACCOUNTS: account_name = account["name"] log.info("Processing account: %s", account_name) # Get receipt message IDs not yet tracked all_receipt_ids = get_receipt_message_ids(account["processed_db"]) new_ids = [mid for mid in all_receipt_ids if not is_tracked(mid)] log.info(" %d total receipts, %d new", len(all_receipt_ids), len(new_ids)) if not new_ids: continue # Load IMAP credentials creds = load_imap_credentials(account["config"], account["type"]) if not creds: log.warning(" No IMAP credentials for %s, skipping", account_name) # Mark as tracked anyway so we don't retry endlessly for mid in new_ids: mark_tracked(mid) continue for mid in new_ids: log.info(" Fetching email: %s", mid[:40]) # Fetch email body body = fetch_email_body(creds, mid) if not body: log.warning(" Could not fetch email body for %s", mid[:40]) mark_tracked(mid) failed_count += 1 continue # Extract via LLM data = extract_receipt_data(body) if not data: log.warning(" LLM extraction failed for %s", mid[:40]) mark_tracked(mid) failed_count += 1 continue row = { "date": data["date"], "vendor": data["vendor"], "amount": data["amount"], "currency": data["currency"], "order_number": data["order_number"], "email_account": account_name, "message_id": mid, } new_receipts.append(row) if args.dry_run: log.info(" [DRY-RUN] Would write: %s", row) else: append_csv(row) log.info(" Extracted: %s %s %s from %s", data["vendor"], data["amount"], data["currency"], account_name) mark_tracked(mid) extracted_count += 1 log.info("Done: %d extracted, %d failed", extracted_count, failed_count) if args.dry_run: if new_receipts: print("\n--- Extracted Receipts (dry run) ---") for r in new_receipts: print(f" {r['date']} | {r['vendor']} | {r['amount']} {r['currency']} | {r['email_account']}") else: print("No new receipts found.") return # Send summary email if we extracted anything if new_receipts: summary_lines = [f"Extracted {extracted_count} new receipt(s):"] for r in new_receipts: summary_lines.append(f" - {r['date']} {r['vendor']}: {r['amount']} {r['currency']}") if failed_count: summary_lines.append(f"\n{failed_count} receipt(s) failed extraction.") now = datetime.now(tz=ZoneInfo("America/Los_Angeles")) subject = f"Receipt Tracker: {extracted_count} new — {now.strftime('%b %d')}" send_email(subject=subject, text_body="\n".join(summary_lines)) log.info("Summary email sent") if __name__ == "__main__": main()