-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessing_emails.py
More file actions
264 lines (221 loc) · 9.95 KB
/
processing_emails.py
File metadata and controls
264 lines (221 loc) · 9.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import os
import json
import re
from datetime import datetime
# --- ACTION REQUIRED ---
API_KEY = "INSERT_YOUR_API"
NEON_CONNECTION_STRING = "INSERT_YOUR_DATABASE_URL"
# --- Library Imports & Checks ---
# This section checks for required libraries and provides installation instructions.
try:
import requests
import psycopg2
from dateutil import parser as date_parser
except ImportError:
print("One or more required libraries are not installed.")
print("Please run the following commands in your terminal:")
print("pip install requests")
print("pip install psycopg2-binary")
print("pip install python-dateutil")
exit()
# --- Global Configuration ---
API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key={API_KEY}"
# --- Core Functions ---
def setup_database():
"""
Connects to the Neon database and ensures the necessary tables exist.
This is safe to run multiple times.
"""
try:
conn = psycopg2.connect(NEON_CONNECTION_STRING, connect_timeout=10)
cur = conn.cursor()
# Create 'emails' table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS emails (
id SERIAL PRIMARY KEY, sender TEXT, subject TEXT, body TEXT, date DATE
);
""")
# Create 'processed_emails' table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS processed_emails (
id SERIAL PRIMARY KEY, original_email_id INTEGER UNIQUE, category TEXT,
summary TEXT, date DATE,
FOREIGN KEY(original_email_id) REFERENCES emails(id) ON DELETE CASCADE
);
""")
conn.commit()
cur.close()
conn.close()
print("[✓] Database tables verified successfully.")
return True
except psycopg2.Error as e:
print(f"\n[X] DATABASE SETUP ERROR: Could not connect to or set up the Neon database.")
print(f"Please check your NEON_CONNECTION_STRING. Details: {e}")
return False
def get_ai_summary(email_body, context_date_str):
"""
Sends the email content to the Gemini AI, gets a structured summary,
and cleans the response to ensure it's in a predictable format.
"""
# 1. Define the instructions for the AI model (Model Context Protocol)
prompt = f"""
System Instructions:
You are an expert AI assistant for information extraction. Return a structured JSON object.
Rules:
- Adhere Strictly to the JSON Output Format.
- If a field is not found, return an empty string "" for date fields or an empty list [] for other fields.
- Standardize all dates to YYYY-MM-DD format.
User Task:
Based on the context date of {context_date_str}, analyze the following email. Extract these fields:
"summary", "firstdate", "finaldate", "actions", "time", and "venue".
Email Content:
---
{email_body}
---
"""
# 2. Prepare the request payload
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": { "response_mime_type": "application/json" }
}
# 3. Call the AI and handle potential errors
try:
response = requests.post(API_URL, json=payload, timeout=30)
response.raise_for_status()
# 4. Extract and clean the AI's response
response_data = response.json()
response_text = response_data['candidates'][0]['content']['parts'][0]['text']
ai_json = json.loads(response_text)
# --- Sanitize the output to handle AI inconsistencies ---
sanitized_json = {}
for key in ["summary", "firstdate", "finaldate", "actions", "time", "venue"]:
sanitized_json[key] = ai_json.get(key, "" if "date" in key else [])
for key in ['time', 'venue', 'actions']:
value = sanitized_json.get(key)
clean_list = []
if isinstance(value, str) and value:
clean_list = [value]
elif isinstance(value, list) and value:
full_string = "".join(map(str, value)).strip()
if full_string:
clean_list = [full_string]
sanitized_json[key] = clean_list
return sanitized_json
except (requests.exceptions.RequestException, KeyError, json.JSONDecodeError) as e:
print(f"\n--- AI Processing Error ---")
print(f"An error occurred while contacting the AI or parsing its response: {e}")
return {
"summary": "Error: Could not process email.", "firstdate": "", "finaldate": "",
"actions": [], "time": [], "venue": []
}
def analyze_and_save_emails(start_date):
"""
Fetches new emails from the database, processes them with the AI,
and saves the structured summaries back to the database.
"""
try:
conn = psycopg2.connect(NEON_CONNECTION_STRING, connect_timeout=10)
cur = conn.cursor()
except psycopg2.Error as e:
print(f"\n[X] DATABASE ERROR: Could not connect. Details: {e}")
return
# 1. Fetch all emails from the database that haven't been processed yet
query = """
SELECT id, body, date FROM emails
WHERE date >= %s AND id NOT IN (SELECT original_email_id FROM processed_emails WHERE original_email_id IS NOT NULL);
"""
cur.execute(query, (start_date,))
emails_to_process = cur.fetchall()
if not emails_to_process:
print("\nNo new emails to process for the specified date range.")
cur.close()
conn.close()
return
# 2. Loop through each email, get the AI summary, and prepare it for storage
processed_data = []
print(f"\nFound {len(emails_to_process)} emails to process. Contacting AI model...")
for i, (email_id, body, date) in enumerate(emails_to_process):
print(f"Processing email {i+1} of {len(emails_to_process)}...")
date_str = date_parser.parse(str(date)).strftime('%Y-%m-%d')
ai_analysis = get_ai_summary(body, date_str)
# Format the clean AI analysis into a readable summary string
summary_parts = [f"Summary: {ai_analysis.get('summary', '')}"]
if ai_analysis.get('firstdate'): summary_parts.append(f"Start Date: {ai_analysis['firstdate']}")
if ai_analysis.get('finaldate'): summary_parts.append(f"End Date/Deadline: {ai_analysis['finaldate']}")
if ai_analysis.get('time'): summary_parts.append(f"Time: {ai_analysis['time'][0]}")
if ai_analysis.get('venue'): summary_parts.append(f"Venue: {ai_analysis['venue'][0]}")
if ai_analysis.get('actions'): summary_parts.append(f"Action Items: {'; '.join(ai_analysis['actions'])}")
formatted_summary = "\n".join(summary_parts)
processed_data.append((email_id, "General", formatted_summary, date))
# 3. Save all the new summaries to the database
insert_query = """
INSERT INTO processed_emails (original_email_id, category, summary, date)
VALUES (%s, %s, %s, %s) ON CONFLICT (original_email_id) DO NOTHING;
"""
cur.executemany(insert_query, processed_data)
conn.commit()
cur.close()
conn.close()
print(f"\nSuccessfully processed and categorized {len(processed_data)} emails.")
def save_report_to_file():
"""
Connects to the database, generates a report string, and saves it to a text file.
"""
try:
conn = psycopg2.connect(NEON_CONNECTION_STRING, connect_timeout=10)
cur = conn.cursor()
except psycopg2.Error as e:
print(f"\n[X] DATABASE ERROR: Could not connect to generate report. Details: {e}")
return
# Fetch all processed data
query = """
SELECT e.sender, e.subject, p.summary, p.date
FROM processed_emails p
JOIN emails e ON p.original_email_id = e.id
ORDER BY p.date DESC;
"""
cur.execute(query)
results = cur.fetchall()
cur.close()
conn.close()
if not results:
print("No processed emails found in the database to report.")
return
# Build the report string
report_lines = ["--- Processed Emails Report ---"]
for row in results:
sender, subject, summary, date = row
report_lines.append(f"\n--- {subject} ---")
report_lines.append(f"From: {sender} | Date: {date}")
report_lines.append(summary)
report_content = "\n".join(report_lines)
# Save the string to a text file
file_name = "email_summary_report.txt"
try:
with open(file_name, "w") as f:
f.write(report_content)
print(f"\n[✓] Success! Report saved to '{file_name}'")
except IOError as e:
print(f"\n[X] FILE ERROR: Could not write report to file. Details: {e}")
# --- Main Execution ---
if __name__ == "__main__":
# 1. Perform pre-flight checks for credentials and libraries
if API_KEY == "YOUR_API_KEY_HERE" or "user:password" in NEON_CONNECTION_STRING:
print("[X] ERROR: API Key or Neon Database URL not found.")
print("Please open the script and paste your credentials at the top.")
exit()
# 2. Set up the database (creates tables if they don't exist)
if not setup_database():
exit()
# 3. Get a valid start date from the user
while True:
start_date_input = input("Enter the date from which to start categorization (YYYY-MM-DD), e.g., 2025-09-01: ")
try:
datetime.strptime(start_date_input, '%Y-%m-%d')
break
except ValueError:
print("Invalid date format. Please use YYYY-MM-DD.")
# 4. Run the main processing logic
analyze_and_save_emails(start_date_input)
# 5. Save the final report to a text file
save_report_to_file()