from flask import Flask, request, jsonify import requests import os app = Flask(__name__) NTFY_URL = os.environ.get('NTFY_URL', 'http://NTFY:80') NTFY_TOPIC = os.environ.get('NTFY_TOPIC', 'homelab-alerts') def get_status_icon(severity, status): if status == 'resolved': return 'white_check_mark' if severity == 'critical': return 'rotating_light' return 'warning' def get_priority(severity, status): if status == 'resolved': return '3' if severity == 'critical': return '5' return '4' def format_alert(alert): status = alert.get('status', 'firing') labels = alert.get('labels', {}) annotations = alert.get('annotations', {}) alertname = labels.get('alertname', 'Unknown Alert') severity = labels.get('severity', 'warning') instance = labels.get('instance', 'unknown') status_text = 'RESOLVED' if status == 'resolved' else 'FIRING' title = f"{alertname} [{status_text}]" summary = annotations.get('summary', '') description = annotations.get('description', '') body_parts = [] if summary: body_parts.append(summary) if description and description != summary: body_parts.append(description) if instance and instance != 'unknown': body_parts.append(f"Host: {instance}") body = '\n'.join(body_parts) if body_parts else f"Alert {status_text.lower()} on {instance}" return title, body, severity, status @app.route('/alert', methods=['POST']) def handle_alert(): try: data = request.json alerts = data.get('alerts', []) for alert in alerts: title, body, severity, status = format_alert(alert) priority = get_priority(severity, status) tag = get_status_icon(severity, status) response = requests.post( f"{NTFY_URL}/{NTFY_TOPIC}", data=body, headers={ 'Title': title, 'Priority': priority, 'Tags': tag } ) if response.status_code not in [200, 201]: print(f"Failed to send to ntfy: {response.status_code} - {response.text}") return jsonify({'status': 'sent', 'count': len(alerts)}) except Exception as e: print(f"Error: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @app.route('/health', methods=['GET']) def health(): return jsonify({'status': 'healthy'}) @app.route('/test', methods=['POST']) def test(): try: data = request.json or {} message = data.get('message', 'Test notification from ntfy-bridge') response = requests.post( f"{NTFY_URL}/{NTFY_TOPIC}", data=message, headers={ 'Title': 'Test Alert', 'Priority': '4', 'Tags': 'test_tube' } ) return jsonify({'status': 'sent'}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5001)