Sanitized mirror from private repository - 2026-04-05 05:50:13 UTC
This commit is contained in:
185
scripts/gmail-backup.py
Normal file
185
scripts/gmail-backup.py
Normal file
@@ -0,0 +1,185 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Download all Gmail emails as .eml files organized by label/folder."""
|
||||
|
||||
import email
|
||||
import email.header
|
||||
import imaplib
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def decode_header(raw):
|
||||
if not raw:
|
||||
return ""
|
||||
parts = email.header.decode_header(raw)
|
||||
decoded = []
|
||||
for data, charset in parts:
|
||||
if isinstance(data, bytes):
|
||||
try:
|
||||
decoded.append(data.decode(charset or "utf-8", errors="replace"))
|
||||
except (LookupError, UnicodeDecodeError):
|
||||
decoded.append(data.decode("utf-8", errors="replace"))
|
||||
else:
|
||||
decoded.append(data)
|
||||
return " ".join(decoded)
|
||||
|
||||
|
||||
def sanitize_filename(name, max_len=100):
|
||||
name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', name)
|
||||
name = name.strip('. ')
|
||||
return name[:max_len] if name else "no_subject"
|
||||
|
||||
|
||||
def backup_account(email_addr, app_password, output_dir, host="imap.gmail.com", port=993, starttls=False):
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Backing up: {email_addr}")
|
||||
print(f"Output: {output_dir}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
if starttls:
|
||||
import ssl
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
imap = imaplib.IMAP4(host, port)
|
||||
imap.starttls(ssl_context=ctx)
|
||||
else:
|
||||
imap = imaplib.IMAP4_SSL(host, port)
|
||||
imap.login(email_addr, app_password)
|
||||
|
||||
# List all folders
|
||||
status, folders = imap.list()
|
||||
folder_names = []
|
||||
for f in folders:
|
||||
# Parse folder REDACTED_APP_PASSWORD response
|
||||
match = re.search(r'"/" "(.*)"$|"/" (.*)$', f.decode())
|
||||
if match:
|
||||
name = match.group(1) or match.group(2)
|
||||
folder_names.append(name.strip('"'))
|
||||
|
||||
print(f"Found {len(folder_names)} folders")
|
||||
|
||||
total_downloaded = 0
|
||||
total_skipped = 0
|
||||
|
||||
for folder in folder_names:
|
||||
try:
|
||||
status, data = imap.select(f'"{folder}"', readonly=True)
|
||||
if status != "OK":
|
||||
continue
|
||||
msg_count = int(data[0])
|
||||
if msg_count == 0:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f" Skipping {folder}: {e}")
|
||||
continue
|
||||
|
||||
# Create folder directory
|
||||
safe_folder = folder.replace("/", "_").replace("[Gmail]_", "gmail_")
|
||||
folder_dir = Path(output_dir) / safe_folder
|
||||
folder_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"\n {folder}: {msg_count} messages")
|
||||
|
||||
# Fetch all message UIDs
|
||||
status, data = imap.search(None, "ALL")
|
||||
if status != "OK":
|
||||
continue
|
||||
uids = data[0].split()
|
||||
|
||||
for i, uid in enumerate(uids, 1):
|
||||
try:
|
||||
# Fetch full message
|
||||
status, msg_data = imap.fetch(uid, "(RFC822)")
|
||||
if status != "OK" or not msg_data[0]:
|
||||
continue
|
||||
|
||||
raw_email = msg_data[0][1]
|
||||
msg = email.message_from_bytes(raw_email)
|
||||
|
||||
# Build filename from date + subject
|
||||
date_str = msg.get("Date", "")
|
||||
subject = sanitize_filename(decode_header(msg.get("Subject", "no_subject")))
|
||||
msg_id = msg.get("Message-ID", f"uid_{uid.decode()}")
|
||||
safe_id = sanitize_filename(re.sub(r'[<>@.]', '_', msg_id), 40)
|
||||
|
||||
filename = f"{safe_id}_{subject}.eml"
|
||||
filepath = folder_dir / filename
|
||||
|
||||
if filepath.exists():
|
||||
total_skipped += 1
|
||||
continue
|
||||
|
||||
filepath.write_bytes(raw_email)
|
||||
total_downloaded += 1
|
||||
|
||||
if i % 50 == 0 or i == len(uids):
|
||||
print(f" {i}/{len(uids)} processed")
|
||||
except (imaplib.IMAP4.abort, imaplib.IMAP4.error, ConnectionError, OSError) as e:
|
||||
print(f" Connection lost at {i}/{len(uids)}: {e}")
|
||||
# Reconnect and re-select folder
|
||||
try:
|
||||
imap.logout()
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(2)
|
||||
if starttls:
|
||||
import ssl
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
imap = imaplib.IMAP4(host, port)
|
||||
imap.starttls(ssl_context=ctx)
|
||||
else:
|
||||
imap = imaplib.IMAP4_SSL(host, port)
|
||||
imap.login(email_addr, app_password)
|
||||
imap.select(f'"{folder}"', readonly=True)
|
||||
print(f" Reconnected, continuing...")
|
||||
|
||||
imap.logout()
|
||||
print(f"\n Done: {total_downloaded} downloaded, {total_skipped} skipped (already exist)")
|
||||
return total_downloaded
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
accounts = [
|
||||
{
|
||||
"email": "your-email@example.com",
|
||||
"password": "REDACTED_APP_PASSWORD", # pragma: allowlist secret
|
||||
"dir": "dvish92",
|
||||
},
|
||||
{
|
||||
"email": "lzbellina92@gmail.com",
|
||||
"password": "REDACTED_APP_PASSWORD", # pragma: allowlist secret
|
||||
"dir": "lzbellina92",
|
||||
},
|
||||
{
|
||||
"email": "admin@thevish.io",
|
||||
"password": "MsuiUGPLNlWhOewqmaK3gA", # pragma: allowlist secret
|
||||
"dir": "proton_admin",
|
||||
"host": "127.0.0.1",
|
||||
"port": 1143,
|
||||
"starttls": True,
|
||||
},
|
||||
]
|
||||
|
||||
base_dir = sys.argv[1] if len(sys.argv) > 1 else "/tmp/gmail_backup"
|
||||
|
||||
print(f"Email Backup — downloading all emails to {base_dir}")
|
||||
total = 0
|
||||
for acct in accounts:
|
||||
output = os.path.join(base_dir, acct["dir"])
|
||||
os.makedirs(output, exist_ok=True)
|
||||
total += backup_account(
|
||||
acct["email"], acct["password"], output,
|
||||
host=acct.get("host", "imap.gmail.com"),
|
||||
port=acct.get("port", 993),
|
||||
starttls=acct.get("starttls", False),
|
||||
)
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"BACKUP COMPLETE: {total} emails downloaded to {base_dir}")
|
||||
print(f"{'='*60}")
|
||||
Reference in New Issue
Block a user