Files
homelab-optimized/scripts/receipt-tracker.py
Gitea Mirror Bot af2cf711f4
Some checks failed
Documentation / Build Docusaurus (push) Failing after 5m0s
Documentation / Deploy to GitHub Pages (push) Has been skipped
Sanitized mirror from private repository - 2026-04-18 11:03:33 UTC
2026-04-18 11:03:33 +00:00

442 lines
16 KiB
Python

#!/usr/bin/env python3
"""Feature 15 — Receipt Tracker.
Extracts structured data from receipt emails using LLM and appends to CSV.
Reads processed.db files from email organizers, fetches email content via IMAP,
sends to Ollama for extraction, and writes results to data/expenses.csv.
Cron: 0 10 * * * cd /home/homelab/organized/repos/homelab && python3 scripts/receipt-tracker.py
"""
import argparse
import csv
import email
import imaplib
import json
import logging
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
import yaml
sys.path.insert(0, str(Path(__file__).parent))
from lib.ollama import ollama_generate, ollama_available, OllamaUnavailableError
from lib.notify import send_email
log = logging.getLogger(__name__)
REPO_ROOT = Path("/home/homelab/organized/repos/homelab")
SCRIPTS_DIR = REPO_ROOT / "scripts"
DATA_DIR = REPO_ROOT / "data"
CSV_PATH = DATA_DIR / "expenses.csv"
TRACKER_DB = SCRIPTS_DIR / "receipt-tracker.db"
CSV_COLUMNS = ["date", "vendor", "amount", "currency", "order_number", "email_account", "message_id"]
ACCOUNTS = [
{
"name": "lzbellina92@gmail.com",
"processed_db": SCRIPTS_DIR / "gmail-organizer" / "processed.db",
"config": SCRIPTS_DIR / "gmail-organizer" / "config.local.yaml",
"type": "gmail",
},
{
"name": "your-email@example.com",
"processed_db": SCRIPTS_DIR / "gmail-organizer-dvish" / "processed.db",
"config": SCRIPTS_DIR / "gmail-organizer-dvish" / "config.local.yaml",
"type": "gmail",
},
{
"name": "admin@thevish.io",
"processed_db": SCRIPTS_DIR / "proton-organizer" / "processed.db",
"config": SCRIPTS_DIR / "proton-organizer" / "config.local.yaml",
"type": "proton",
},
]
LLM_PROMPT = (
'Extract from this receipt email: vendor, amount (number only), '
'currency (USD/EUR/etc), date (YYYY-MM-DD), order_number. '
'Reply ONLY as JSON: {"vendor": "...", "amount": "...", "currency": "...", '
'"date": "...", "order_number": "..."}\n\n'
)
# ── tracker DB ───────────────────────────────────────────────────────────────
def init_tracker_db():
"""Create the tracker SQLite database if it doesn't exist."""
conn = sqlite3.connect(TRACKER_DB)
conn.execute("""
CREATE TABLE IF NOT EXISTS tracked (
message_id TEXT PRIMARY KEY,
extracted_at TEXT
)
""")
conn.commit()
conn.close()
def is_tracked(message_id: str) -> bool:
"""Check if a message_id has already been processed."""
conn = sqlite3.connect(TRACKER_DB)
row = conn.execute("SELECT 1 FROM tracked WHERE message_id = ?", (message_id,)).fetchone()
conn.close()
return row is not None
def mark_tracked(message_id: str):
"""Mark a message_id as tracked."""
conn = sqlite3.connect(TRACKER_DB)
conn.execute(
"INSERT OR IGNORE INTO tracked (message_id, extracted_at) VALUES (?, ?)",
(message_id, datetime.now(tz=ZoneInfo("UTC")).isoformat()),
)
conn.commit()
conn.close()
# ── processed.db queries ─────────────────────────────────────────────────────
def get_receipt_message_ids(db_path: Path) -> list[str]:
"""Get message IDs categorized as receipts from a processed.db."""
if not db_path.exists():
log.warning("Processed DB not found: %s", db_path)
return []
conn = sqlite3.connect(db_path)
# Only process receipts from the last 30 days to avoid overwhelming Ollama on first run
cutoff = (datetime.now(tz=ZoneInfo("UTC")) - timedelta(days=30)).isoformat()
rows = conn.execute(
"SELECT message_id FROM processed WHERE category = 'receipts' AND processed_at >= ?",
(cutoff,),
).fetchall()
conn.close()
return [row[0] for row in rows]
# ── IMAP email fetching ─────────────────────────────────────────────────────
def load_imap_credentials(config_path: Path, account_type: str) -> dict:
"""Load IMAP credentials from config.local.yaml."""
if not config_path.exists():
log.warning("Config file not found: %s", config_path)
return {}
with open(config_path) as f:
cfg = yaml.safe_load(f)
if account_type == "gmail":
gmail = cfg.get("gmail", {})
return {
"host": "imap.gmail.com",
"port": 993,
"user": gmail.get("email", ""),
"password": gmail.get("app_password", ""),
"ssl": True,
}
elif account_type == "proton":
proton = cfg.get("proton", {})
return {
"host": proton.get("host", "127.0.0.1"),
"port": proton.get("port", 1143),
"user": proton.get("email", ""),
"password": proton.get("bridge_password", ""),
"ssl": False,
}
return {}
def fetch_email_body(creds: dict, message_id: str) -> str | None:
"""Connect via IMAP and fetch email body for the given Message-ID."""
if not creds:
return None
try:
if creds.get("ssl"):
imap = imaplib.IMAP4_SSL(creds["host"], creds["port"])
else:
imap = imaplib.IMAP4(creds["host"], creds["port"])
imap.login(creds["user"], creds["password"])
# Search across all mail
for mailbox in ["INBOX", "[Gmail]/All Mail", "All Mail", "INBOX.All", "Folders/AutoOrg-Receipts"]:
try:
status, _ = imap.select(f'"{mailbox}"', readonly=True)
if status != "OK":
continue
except Exception:
continue
# Search by Message-ID header
search_criteria = f'(HEADER Message-ID "{message_id}")'
status, data = imap.search(None, search_criteria)
if status != "OK" or not data[0]:
continue
msg_nums = data[0].split()
if not msg_nums:
continue
# Fetch the first matching message
status, msg_data = imap.fetch(msg_nums[0], "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
body = _extract_body(msg)
imap.logout()
return body
imap.logout()
log.debug("Message-ID %s not found in any mailbox", message_id)
return None
except Exception as e:
log.warning("IMAP fetch failed for %s: %s", message_id, e)
return None
def _extract_body(msg: email.message.Message) -> str:
"""Extract text body from email message, truncated to 3000 chars."""
body_parts = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
payload = part.get_payload(decode=True)
if payload:
body_parts.append(payload.decode("utf-8", errors="replace"))
except Exception:
pass
elif content_type == "text/html" and not body_parts:
try:
payload = part.get_payload(decode=True)
if payload:
body_parts.append(payload.decode("utf-8", errors="replace"))
except Exception:
pass
else:
try:
payload = msg.get_payload(decode=True)
if payload:
body_parts.append(payload.decode("utf-8", errors="replace"))
except Exception:
pass
text = "\n".join(body_parts)
# Include subject for context
subject = msg.get("Subject", "")
sender = msg.get("From", "")
date = msg.get("Date", "")
header = f"From: {sender}\nDate: {date}\nSubject: {subject}\n\n"
return (header + text)[:3000]
# ── LLM extraction ──────────────────────────────────────────────────────────
def extract_receipt_data(email_body: str) -> dict | None:
"""Send email body to LLM and parse structured receipt data."""
prompt = LLM_PROMPT + email_body
try:
response = ollama_generate(prompt, temperature=0.1, num_predict=500)
except OllamaUnavailableError as e:
log.warning("LLM unavailable: %s", e)
return None
# Try to parse JSON from response
try:
# Handle cases where LLM wraps in markdown code blocks
cleaned = response.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")
lines = [l for l in lines if not l.startswith("```")]
cleaned = "\n".join(lines).strip()
data = json.loads(cleaned)
# Validate required fields
required = {"vendor", "amount", "currency", "date"}
if not all(k in data for k in required):
log.warning("LLM response missing required fields: %s", data)
return None
return {
"vendor": str(data.get("vendor", "")).strip(),
"amount": str(data.get("amount", "")).strip(),
"currency": str(data.get("currency", "USD")).strip(),
"date": str(data.get("date", "")).strip(),
"order_number": str(data.get("order_number", "")).strip(),
}
except (json.JSONDecodeError, ValueError) as e:
log.warning("Failed to parse LLM response as JSON: %s — response: %s", e, response[:200])
return None
# ── CSV output ───────────────────────────────────────────────────────────────
def init_csv():
"""Create data directory and CSV with headers if they don't exist."""
DATA_DIR.mkdir(parents=True, exist_ok=True)
if not CSV_PATH.exists():
with open(CSV_PATH, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writeheader()
log.info("Created %s", CSV_PATH)
def is_duplicate(row: dict) -> bool:
"""Check if a row with same date+vendor+amount already exists in the CSV."""
if not CSV_PATH.exists():
return False
key = f"{row.get('date', '')}|{row.get('vendor', '')}|{row.get('amount', '')}"
with open(CSV_PATH, newline="") as f:
for existing in csv.DictReader(f):
existing_key = f"{existing.get('date', '')}|{existing.get('vendor', '')}|{existing.get('amount', '')}"
if key == existing_key:
return True
return False
def append_csv(row: dict):
"""Append a row to the expenses CSV, skipping duplicates and $0 amounts."""
amount = float(row.get("amount", 0) or 0)
if amount == 0:
log.info(" Skipping $0 entry: %s", row.get("vendor", "?"))
return False
if is_duplicate(row):
log.info(" Skipping duplicate: %s $%s on %s", row.get("vendor", "?"), row.get("amount", "?"), row.get("date", "?"))
return False
with open(CSV_PATH, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writerow(row)
return True
# ── main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Receipt Tracker — extract structured data from receipt emails")
parser.add_argument("--dry-run", action="store_true", help="Extract and display but don't write CSV or send email")
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
if not ollama_available():
log.error("Ollama is not available. Cannot extract receipt data without LLM.")
sys.exit(1)
init_tracker_db()
init_csv()
new_receipts = []
extracted_count = 0
failed_count = 0
for account in ACCOUNTS:
account_name = account["name"]
log.info("Processing account: %s", account_name)
# Get receipt message IDs not yet tracked
all_receipt_ids = get_receipt_message_ids(account["processed_db"])
new_ids = [mid for mid in all_receipt_ids if not is_tracked(mid)]
MAX_PER_RUN = 10 # prevent Ollama overload
if len(new_ids) > MAX_PER_RUN:
log.info(" %d new receipts, capping at %d per run", len(new_ids), MAX_PER_RUN)
new_ids = new_ids[:MAX_PER_RUN]
log.info(" %d total receipts, %d to process", len(all_receipt_ids), len(new_ids))
if not new_ids:
continue
# Load IMAP credentials
creds = load_imap_credentials(account["config"], account["type"])
if not creds:
log.warning(" No IMAP credentials for %s, skipping", account_name)
# Mark as tracked anyway so we don't retry endlessly
for mid in new_ids:
mark_tracked(mid)
continue
for mid in new_ids:
log.info(" Fetching email: %s", mid[:40])
# Fetch email body
body = fetch_email_body(creds, mid)
if not body:
log.warning(" Could not fetch email body for %s", mid[:40])
mark_tracked(mid)
failed_count += 1
continue
# Extract via LLM
data = extract_receipt_data(body)
if not data:
log.warning(" LLM extraction failed for %s", mid[:40])
mark_tracked(mid)
failed_count += 1
continue
row = {
"date": data["date"],
"vendor": data["vendor"],
"amount": data["amount"],
"currency": data["currency"],
"order_number": data["order_number"],
"email_account": account_name,
"message_id": mid,
}
new_receipts.append(row)
if args.dry_run:
log.info(" [DRY-RUN] Would write: %s", row)
else:
append_csv(row)
log.info(" Extracted: %s %s %s from %s",
data["vendor"], data["amount"], data["currency"], account_name)
mark_tracked(mid)
extracted_count += 1
log.info("Done: %d extracted, %d failed", extracted_count, failed_count)
if args.dry_run:
if new_receipts:
print("\n--- Extracted Receipts (dry run) ---")
for r in new_receipts:
print(f" {r['date']} | {r['vendor']} | {r['amount']} {r['currency']} | {r['email_account']}")
else:
print("No new receipts found.")
return
# Send summary email if we extracted anything
if new_receipts:
summary_lines = [f"Extracted {extracted_count} new receipt(s):"]
for r in new_receipts:
summary_lines.append(f" - {r['date']} {r['vendor']}: {r['amount']} {r['currency']}")
if failed_count:
summary_lines.append(f"\n{failed_count} receipt(s) failed extraction.")
now = datetime.now(tz=ZoneInfo("America/Los_Angeles"))
subject = f"Receipt Tracker: {extracted_count} new — {now.strftime('%b %d')}"
send_email(subject=subject, text_body="\n".join(summary_lines))
log.info("Summary email sent")
if __name__ == "__main__":
main()