Sanitized mirror from private repository - 2026-04-06 03:11:43 UTC
This commit is contained in:
420
scripts/receipt-tracker.py
Normal file
420
scripts/receipt-tracker.py
Normal file
@@ -0,0 +1,420 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Feature 15 — Receipt Tracker.
|
||||
|
||||
Extracts structured data from receipt emails using LLM and appends to CSV.
|
||||
Reads processed.db files from email organizers, fetches email content via IMAP,
|
||||
sends to Ollama for extraction, and writes results to data/expenses.csv.
|
||||
|
||||
Cron: 0 10 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/receipt-tracker.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import email
|
||||
import imaplib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import yaml
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError
|
||||
from lib.notify import send_email
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
REPO_ROOT = Path("/home/homelab/organized/repos/homelab")
|
||||
SCRIPTS_DIR = REPO_ROOT / "scripts"
|
||||
DATA_DIR = REPO_ROOT / "data"
|
||||
CSV_PATH = DATA_DIR / "expenses.csv"
|
||||
TRACKER_DB = SCRIPTS_DIR / "receipt-tracker.db"
|
||||
|
||||
CSV_COLUMNS = ["date", "vendor", "amount", "currency", "order_number", "email_account", "message_id"]
|
||||
|
||||
ACCOUNTS = [
|
||||
{
|
||||
"name": "lzbellina92@gmail.com",
|
||||
"processed_db": SCRIPTS_DIR / "gmail-organizer" / "processed.db",
|
||||
"config": SCRIPTS_DIR / "gmail-organizer" / "config.local.yaml",
|
||||
"type": "gmail",
|
||||
},
|
||||
{
|
||||
"name": "your-email@example.com",
|
||||
"processed_db": SCRIPTS_DIR / "gmail-organizer-dvish" / "processed.db",
|
||||
"config": SCRIPTS_DIR / "gmail-organizer-dvish" / "config.local.yaml",
|
||||
"type": "gmail",
|
||||
},
|
||||
{
|
||||
"name": "admin@thevish.io",
|
||||
"processed_db": SCRIPTS_DIR / "proton-organizer" / "processed.db",
|
||||
"config": SCRIPTS_DIR / "proton-organizer" / "config.local.yaml",
|
||||
"type": "proton",
|
||||
},
|
||||
]
|
||||
|
||||
LLM_PROMPT = (
|
||||
'Extract from this receipt email: vendor, amount (number only), '
|
||||
'currency (USD/EUR/etc), date (YYYY-MM-DD), order_number. '
|
||||
'Reply ONLY as JSON: {"vendor": "...", "amount": "...", "currency": "...", '
|
||||
'"date": "...", "order_number": "..."}\n\n'
|
||||
)
|
||||
|
||||
|
||||
# ── tracker DB ───────────────────────────────────────────────────────────────
|
||||
|
||||
def init_tracker_db():
|
||||
"""Create the tracker SQLite database if it doesn't exist."""
|
||||
conn = sqlite3.connect(TRACKER_DB)
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS tracked (
|
||||
message_id TEXT PRIMARY KEY,
|
||||
extracted_at TEXT
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def is_tracked(message_id: str) -> bool:
|
||||
"""Check if a message_id has already been processed."""
|
||||
conn = sqlite3.connect(TRACKER_DB)
|
||||
row = conn.execute("SELECT 1 FROM tracked WHERE message_id = ?", (message_id,)).fetchone()
|
||||
conn.close()
|
||||
return row is not None
|
||||
|
||||
|
||||
def mark_tracked(message_id: str):
|
||||
"""Mark a message_id as tracked."""
|
||||
conn = sqlite3.connect(TRACKER_DB)
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO tracked (message_id, extracted_at) VALUES (?, ?)",
|
||||
(message_id, datetime.now(tz=ZoneInfo("UTC")).isoformat()),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
# ── processed.db queries ─────────────────────────────────────────────────────
|
||||
|
||||
def get_receipt_message_ids(db_path: Path) -> list[str]:
|
||||
"""Get message IDs categorized as receipts from a processed.db."""
|
||||
if not db_path.exists():
|
||||
log.warning("Processed DB not found: %s", db_path)
|
||||
return []
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
# Only process receipts from the last 30 days to avoid overwhelming Ollama on first run
|
||||
cutoff = (datetime.now(tz=ZoneInfo("UTC")) - timedelta(days=30)).isoformat()
|
||||
rows = conn.execute(
|
||||
"SELECT message_id FROM processed WHERE category = 'receipts' AND processed_at >= ?",
|
||||
(cutoff,),
|
||||
).fetchall()
|
||||
conn.close()
|
||||
|
||||
return [row[0] for row in rows]
|
||||
|
||||
|
||||
# ── IMAP email fetching ─────────────────────────────────────────────────────
|
||||
|
||||
def load_imap_credentials(config_path: Path, account_type: str) -> dict:
|
||||
"""Load IMAP credentials from config.local.yaml."""
|
||||
if not config_path.exists():
|
||||
log.warning("Config file not found: %s", config_path)
|
||||
return {}
|
||||
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f)
|
||||
|
||||
if account_type == "gmail":
|
||||
gmail = cfg.get("gmail", {})
|
||||
return {
|
||||
"host": "imap.gmail.com",
|
||||
"port": 993,
|
||||
"user": gmail.get("email", ""),
|
||||
"password": gmail.get("app_password", ""),
|
||||
"ssl": True,
|
||||
}
|
||||
elif account_type == "proton":
|
||||
proton = cfg.get("proton", {})
|
||||
return {
|
||||
"host": proton.get("host", "127.0.0.1"),
|
||||
"port": proton.get("port", 1143),
|
||||
"user": proton.get("email", ""),
|
||||
"password": proton.get("bridge_password", ""),
|
||||
"ssl": False,
|
||||
}
|
||||
return {}
|
||||
|
||||
|
||||
def fetch_email_body(creds: dict, message_id: str) -> str | None:
|
||||
"""Connect via IMAP and fetch email body for the given Message-ID."""
|
||||
if not creds:
|
||||
return None
|
||||
|
||||
try:
|
||||
if creds.get("ssl"):
|
||||
imap = imaplib.IMAP4_SSL(creds["host"], creds["port"])
|
||||
else:
|
||||
imap = imaplib.IMAP4(creds["host"], creds["port"])
|
||||
|
||||
imap.login(creds["user"], creds["password"])
|
||||
|
||||
# Search across all mail
|
||||
for mailbox in ["INBOX", "[Gmail]/All Mail", "All Mail", "INBOX.All", "Folders/AutoOrg-Receipts"]:
|
||||
try:
|
||||
status, _ = imap.select(f'"{mailbox}"', readonly=True)
|
||||
if status != "OK":
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Search by Message-ID header
|
||||
search_criteria = f'(HEADER Message-ID "{message_id}")'
|
||||
status, data = imap.search(None, search_criteria)
|
||||
if status != "OK" or not data[0]:
|
||||
continue
|
||||
|
||||
msg_nums = data[0].split()
|
||||
if not msg_nums:
|
||||
continue
|
||||
|
||||
# Fetch the first matching message
|
||||
status, msg_data = imap.fetch(msg_nums[0], "(RFC822)")
|
||||
if status != "OK":
|
||||
continue
|
||||
|
||||
raw_email = msg_data[0][1]
|
||||
msg = email.message_from_bytes(raw_email)
|
||||
body = _extract_body(msg)
|
||||
imap.logout()
|
||||
return body
|
||||
|
||||
imap.logout()
|
||||
log.debug("Message-ID %s not found in any mailbox", message_id)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
log.warning("IMAP fetch failed for %s: %s", message_id, e)
|
||||
return None
|
||||
|
||||
|
||||
def _extract_body(msg: email.message.Message) -> str:
|
||||
"""Extract text body from email message, truncated to 3000 chars."""
|
||||
body_parts = []
|
||||
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
if content_type == "text/plain":
|
||||
try:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
body_parts.append(payload.decode("utf-8", errors="replace"))
|
||||
except Exception:
|
||||
pass
|
||||
elif content_type == "text/html" and not body_parts:
|
||||
try:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
body_parts.append(payload.decode("utf-8", errors="replace"))
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
payload = msg.get_payload(decode=True)
|
||||
if payload:
|
||||
body_parts.append(payload.decode("utf-8", errors="replace"))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
text = "\n".join(body_parts)
|
||||
# Include subject for context
|
||||
subject = msg.get("Subject", "")
|
||||
sender = msg.get("From", "")
|
||||
date = msg.get("Date", "")
|
||||
header = f"From: {sender}\nDate: {date}\nSubject: {subject}\n\n"
|
||||
|
||||
return (header + text)[:3000]
|
||||
|
||||
|
||||
# ── LLM extraction ──────────────────────────────────────────────────────────
|
||||
|
||||
def extract_receipt_data(email_body: str) -> dict | None:
|
||||
"""Send email body to LLM and parse structured receipt data."""
|
||||
prompt = LLM_PROMPT + email_body
|
||||
|
||||
try:
|
||||
response = ollama_generate(prompt, temperature=0.1, num_predict=500)
|
||||
except OllamaUnavailableError as e:
|
||||
log.warning("LLM unavailable: %s", e)
|
||||
return None
|
||||
|
||||
# Try to parse JSON from response
|
||||
try:
|
||||
# Handle cases where LLM wraps in markdown code blocks
|
||||
cleaned = response.strip()
|
||||
if cleaned.startswith("```"):
|
||||
lines = cleaned.split("\n")
|
||||
lines = [l for l in lines if not l.startswith("```")]
|
||||
cleaned = "\n".join(lines).strip()
|
||||
|
||||
data = json.loads(cleaned)
|
||||
# Validate required fields
|
||||
required = {"vendor", "amount", "currency", "date"}
|
||||
if not all(k in data for k in required):
|
||||
log.warning("LLM response missing required fields: %s", data)
|
||||
return None
|
||||
|
||||
return {
|
||||
"vendor": str(data.get("vendor", "")).strip(),
|
||||
"amount": str(data.get("amount", "")).strip(),
|
||||
"currency": str(data.get("currency", "USD")).strip(),
|
||||
"date": str(data.get("date", "")).strip(),
|
||||
"order_number": str(data.get("order_number", "")).strip(),
|
||||
}
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
log.warning("Failed to parse LLM response as JSON: %s — response: %s", e, response[:200])
|
||||
return None
|
||||
|
||||
|
||||
# ── CSV output ───────────────────────────────────────────────────────────────
|
||||
|
||||
def init_csv():
|
||||
"""Create data directory and CSV with headers if they don't exist."""
|
||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
if not CSV_PATH.exists():
|
||||
with open(CSV_PATH, "w", newline="") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
|
||||
writer.writeheader()
|
||||
log.info("Created %s", CSV_PATH)
|
||||
|
||||
|
||||
def append_csv(row: dict):
|
||||
"""Append a row to the expenses CSV."""
|
||||
with open(CSV_PATH, "a", newline="") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Receipt Tracker — extract structured data from receipt emails")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Extract and display but don't write CSV or send email")
|
||||
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
if not ollama_available():
|
||||
log.error("Ollama is not available. Cannot extract receipt data without LLM.")
|
||||
sys.exit(1)
|
||||
|
||||
init_tracker_db()
|
||||
init_csv()
|
||||
|
||||
new_receipts = []
|
||||
extracted_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for account in ACCOUNTS:
|
||||
account_name = account["name"]
|
||||
log.info("Processing account: %s", account_name)
|
||||
|
||||
# Get receipt message IDs not yet tracked
|
||||
all_receipt_ids = get_receipt_message_ids(account["processed_db"])
|
||||
new_ids = [mid for mid in all_receipt_ids if not is_tracked(mid)]
|
||||
MAX_PER_RUN = 10 # prevent Ollama overload
|
||||
if len(new_ids) > MAX_PER_RUN:
|
||||
log.info(" %d new receipts, capping at %d per run", len(new_ids), MAX_PER_RUN)
|
||||
new_ids = new_ids[:MAX_PER_RUN]
|
||||
log.info(" %d total receipts, %d to process", len(all_receipt_ids), len(new_ids))
|
||||
|
||||
if not new_ids:
|
||||
continue
|
||||
|
||||
# Load IMAP credentials
|
||||
creds = load_imap_credentials(account["config"], account["type"])
|
||||
if not creds:
|
||||
log.warning(" No IMAP credentials for %s, skipping", account_name)
|
||||
# Mark as tracked anyway so we don't retry endlessly
|
||||
for mid in new_ids:
|
||||
mark_tracked(mid)
|
||||
continue
|
||||
|
||||
for mid in new_ids:
|
||||
log.info(" Fetching email: %s", mid[:40])
|
||||
|
||||
# Fetch email body
|
||||
body = fetch_email_body(creds, mid)
|
||||
if not body:
|
||||
log.warning(" Could not fetch email body for %s", mid[:40])
|
||||
mark_tracked(mid)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
# Extract via LLM
|
||||
data = extract_receipt_data(body)
|
||||
if not data:
|
||||
log.warning(" LLM extraction failed for %s", mid[:40])
|
||||
mark_tracked(mid)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
row = {
|
||||
"date": data["date"],
|
||||
"vendor": data["vendor"],
|
||||
"amount": data["amount"],
|
||||
"currency": data["currency"],
|
||||
"order_number": data["order_number"],
|
||||
"email_account": account_name,
|
||||
"message_id": mid,
|
||||
}
|
||||
new_receipts.append(row)
|
||||
|
||||
if args.dry_run:
|
||||
log.info(" [DRY-RUN] Would write: %s", row)
|
||||
else:
|
||||
append_csv(row)
|
||||
log.info(" Extracted: %s %s %s from %s",
|
||||
data["vendor"], data["amount"], data["currency"], account_name)
|
||||
|
||||
mark_tracked(mid)
|
||||
extracted_count += 1
|
||||
|
||||
log.info("Done: %d extracted, %d failed", extracted_count, failed_count)
|
||||
|
||||
if args.dry_run:
|
||||
if new_receipts:
|
||||
print("\n--- Extracted Receipts (dry run) ---")
|
||||
for r in new_receipts:
|
||||
print(f" {r['date']} | {r['vendor']} | {r['amount']} {r['currency']} | {r['email_account']}")
|
||||
else:
|
||||
print("No new receipts found.")
|
||||
return
|
||||
|
||||
# Send summary email if we extracted anything
|
||||
if new_receipts:
|
||||
summary_lines = [f"Extracted {extracted_count} new receipt(s):"]
|
||||
for r in new_receipts:
|
||||
summary_lines.append(f" - {r['date']} {r['vendor']}: {r['amount']} {r['currency']}")
|
||||
if failed_count:
|
||||
summary_lines.append(f"\n{failed_count} receipt(s) failed extraction.")
|
||||
|
||||
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
|
||||
subject = f"Receipt Tracker: {extracted_count} new — {now.strftime('%b %d')}"
|
||||
send_email(subject=subject, text_body="\n".join(summary_lines))
|
||||
log.info("Summary email sent")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user