-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
80 lines (63 loc) · 2.51 KB
/
main.py
File metadata and controls
80 lines (63 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import json
import boto3
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from botocore.exceptions import ClientError
class Config:
def __init__(self):
self.secrets_manager = boto3.client('secretsmanager')
def get_secret_from_aws(self, secret_name):
try:
response = self.secrets_manager.get_secret_value(SecretId=secret_name)
if 'SecretString' in response:
return json.loads(response['SecretString'])
return None
except (ClientError,):
return None
def get_config_value(self, key, default_value, secret_name=None):
if secret_name:
aws_secret = self.get_secret_from_aws(secret_name)
if aws_secret and key in aws_secret:
return aws_secret
if key in os.environ:
return os.environ[key]
return default_value
def get_db_config(self):
return {
"name": self.get_config_value("DB_NAME", "postgres", "python_interview_dbname"),
"user": self.get_config_value("DB_USER", "postgres", "python_interview_user"),
"password": self.get_config_value("DB_PASSWORD", "password", "python_interview_password"),
"host": self.get_config_value("DB_HOST", "localhost", "python_interview_host"),
"port": self.get_config_value("DB_PORT", "5432", "python_interview_port")
}
def get_conn_string(self):
db_config = self.get_db_config()
return "postgresql://{user}:{password}@{host}:{port}/{name}".format(
name=db_config['name'],
user=db_config['user'],
password=db_config['password'],
host=db_config['host'],
port=db_config['port']
)
class DatabaseConnection:
def __init__(self, conn_string):
self.engine = create_engine(conn_string)
self.session = sessionmaker(bind=self.engine)
def get_session(self):
new_session = self.session()
return new_session
def dedupe_data(session, table_name):
records = session.execute(text(f"SELECT * FROM {table_name}")).fetchall()
duplicates = []
outliers = []
for record in records:
print(record)
# Insert your anomaly detection logic here
if __name__ == "__main__":
config = Config()
conn_string = config.get_conn_string()
db = DatabaseConnection(conn_string)
session = db.get_session()
dedupe_data(session, "crypto_transactions")