-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
50 lines (44 loc) · 1.95 KB
/
consumer.py
File metadata and controls
50 lines (44 loc) · 1.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python
import pika, sys, os, json, csv, os.path
from db_connect import connect
def main():
#initiate localhost connection with pika.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#declare queue named "leads_pipe"
channel.queue_declare(queue='leads_pipe')
def callback(ch, method, properties, body):
message = json.loads(body)
psql_insert(message)
#begin comsuming on channel
channel.basic_consume(queue='leads_pipe', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
def psql_insert(message):
csv_columns = ['registration_dttm', 'id', 'first_name',
'last_name', 'email','gender', 'ip_address',
'cc', 'country', 'birthdate', 'salary', 'title',
'comments'] #columns for csv_dump
if message['country'] == 'United States': #if country is US
connect(message, 'leads') #send message and determined table name to db_connect.py
elif message['cc'] != '': #if no cc number
connect(message, 'high_priority') #send message and determined table name to db_connect.py
else:
try:
file_exists = os.path.isfile('leads_dump.csv') #check if file already exists
with open('leads_dump.csv', 'a') as csvfile: #open file for dumping rest of leads
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
if not file_exists:
writer.writeheader() # file doesn't exist yet, write a header
writer.writerow(message) # append each message to file as a new row
except IOError:
print("I/O error")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)