-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalytics_processor.py
More file actions
executable file
·167 lines (135 loc) · 5.61 KB
/
analytics_processor.py
File metadata and controls
executable file
·167 lines (135 loc) · 5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python3
"""
Analytics Processor Script
This script processes a list of emails, makes curl requests for each email to fetch analytics data,
and sums up the linesSuggested and linesAccepted for each email.
"""
import json
import subprocess
import argparse
from datetime import datetime
import sys
from typing import List, Dict, Any
def make_curl_request(email: str, start_date: str, end_date: str, service_key: str) -> Dict[str, Any]:
"""
Make a curl request to fetch analytics data for a specific email.
Args:
email: The email to fetch data for
start_date: Start date in ISO format (YYYY-MM-DDT00:00:00Z)
end_date: End date in ISO format (YYYY-MM-DDT00:00:00Z)
service_key: Service key for API authentication
Returns:
Dictionary containing the response data
"""
curl_data = {
"service_key": service_key,
"start_timestamp": start_date,
"end_timestamp": end_date,
"emails": [email],
"ide_types": ["editor"],
"query_requests": [
{
"cascade_lines": {}
}
]
}
curl_command = [
"curl", "-s", "-X", "POST",
"--header", "Content-Type: application/json",
"--data", json.dumps(curl_data),
"https://server.codeium.com/api/v1/CascadeAnalytics"
]
try:
result = subprocess.run(curl_command, capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
print(f"Error making curl request for {email}: {e}", file=sys.stderr)
print(f"Error output: {e.stderr}", file=sys.stderr)
return {"queryResults": []}
except json.JSONDecodeError:
print(f"Error parsing JSON response for {email}", file=sys.stderr)
return {"queryResults": []}
def process_analytics_data(response_data: Dict[str, Any]) -> Dict[str, int]:
"""
Process the analytics data to sum up linesSuggested and linesAccepted.
Args:
response_data: The response data from the API
Returns:
Dictionary with summed up linesSuggested and linesAccepted
"""
total_suggested = 0
total_accepted = 0
# Check if we have valid query results
if not response_data.get("queryResults"):
return {"linesSuggested": 0, "linesAccepted": 0}
# Extract the cascade lines data
for query_result in response_data.get("queryResults", []):
cascade_lines_data = query_result.get("cascadeLines", {}).get("cascadeLines", [])
# Sum up the lines
for day_data in cascade_lines_data:
# Convert string values to integers, defaulting to 0 if not present
suggested = int(day_data.get("linesSuggested", "0"))
accepted = int(day_data.get("linesAccepted", "0"))
total_suggested += suggested
total_accepted += accepted
return {
"linesSuggested": total_suggested,
"linesAccepted": total_accepted
}
def process_emails(emails: List[str], start_date: str, end_date: str, service_key: str) -> Dict[str, Dict[str, int]]:
"""
Process a list of emails to get analytics data for each.
Args:
emails: List of emails to process
start_date: Start date in ISO format
end_date: End date in ISO format
service_key: Service key for API authentication
Returns:
Dictionary mapping emails to their analytics data
"""
results = {}
for email in emails:
print(f"Processing email: {email}")
response_data = make_curl_request(email, start_date, end_date, service_key)
analytics = process_analytics_data(response_data)
results[email] = analytics
return results
def main():
"""Main function to parse arguments and run the script."""
parser = argparse.ArgumentParser(description="Process analytics data for a list of emails")
parser.add_argument("--emails", "-e", required=True, help="Comma-separated list of emails or path to a file with one email per line")
parser.add_argument("--start-date", "-s", required=True, help="Start date in ISO format (YYYY-MM-DDT00:00:00Z)")
parser.add_argument("--end-date", "-d", required=True, help="End date in ISO format (YYYY-MM-DDT00:00:00Z)")
parser.add_argument("--service-key", "-k", required=True, help="Service key for API authentication")
parser.add_argument("--output", "-o", help="Output file path (JSON format)")
args = parser.parse_args()
# Parse emails (either from comma-separated list or from file)
if "," in args.emails:
email_list = [email.strip() for email in args.emails.split(",")]
else:
try:
with open(args.emails, "r") as f:
email_list = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
# If not a file, treat as a single email
email_list = [args.emails.strip()]
# Process the emails
results = process_emails(email_list, args.start_date, args.end_date, args.service_key)
# Format the results
formatted_results = {
"timestamp": datetime.now().isoformat(),
"period": {
"start": args.start_date,
"end": args.end_date
},
"results": results
}
# Output the results
if args.output:
with open(args.output, "w") as f:
json.dump(formatted_results, f, indent=2)
print(f"Results written to {args.output}")
else:
print(json.dumps(formatted_results, indent=2))
if __name__ == "__main__":
main()