Skip to content

Commit 8ee88f3

Browse files
committed
updates to custom migration path
1 parent 245ed43 commit 8ee88f3

32 files changed

+859
-129
lines changed

.gitignore

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package-lock.json
2020
**/.DS_Store
2121
**/.chalice/deployments/
2222
**/.chalice/deployed/
23-
target-tables/
24-
relational-migration/syncs3.sh
25-
.chalice/config.json
26-
syncs3.sh
23+
workshops/relational-migration/target-tables/
24+
workshops/relational-migration/syncs3.sh
25+
workshops/relational-migration/.chalice/config.json
26+
workshops/relational-migration/setenv.sh
27+
workshops/relational-migration/setenvd.sh
28+
workshops/relational-migration/source-tables/app_db.*
2729

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"Version": "2012-10-17",
3+
"Statement": [
4+
{
5+
"Sid": "DynamoDBIndexAccess",
6+
"Effect": "Allow",
7+
"Action": [
8+
"dynamodb:Scan",
9+
"dynamodb:Query"
10+
],
11+
"Resource": [
12+
"arn:aws:dynamodb:*:*:table/*/index/*"
13+
]
14+
},
15+
{
16+
"Sid": "DynamoDBTableAccess",
17+
"Effect": "Allow",
18+
"Action": [
19+
"dynamodb:BatchGetItem",
20+
"dynamodb:BatchWriteItem",
21+
"dynamodb:ConditionCheckItem",
22+
"dynamodb:PutItem",
23+
"dynamodb:DescribeTable",
24+
"dynamodb:DeleteItem",
25+
"dynamodb:GetItem",
26+
"dynamodb:Scan",
27+
"dynamodb:Query",
28+
"dynamodb:UpdateItem"
29+
],
30+
"Resource": "arn:aws:dynamodb:*:*:table/*"
31+
},
32+
{
33+
"Sid": "DynamoDBAccount",
34+
"Effect": "Allow",
35+
"Action": [
36+
"dynamodb:ListTables"
37+
],
38+
"Resource": "*"
39+
},
40+
{
41+
"Sid": "LambdaIAMLogs",
42+
"Effect": "Allow",
43+
"Action": [
44+
"logs:CreateLogGroup",
45+
"logs:CreateLogStream",
46+
"logs:PutLogEvents",
47+
"logs:DescribeLogStreams"
48+
],
49+
"Resource": ["arn:aws:logs:*:*:*"]
50+
}
51+
]
52+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Relational Database to DynamoDB Migration: Workshop Artifacts
22

33
Here you will find scripts and a sample database application
4-
to accompany the Relational Database Migration Workshop hands-on instructions
4+
to accompany the Relational Database Migration Workshop hands-on instructions
55
published at [catalog.workshops.aws](https://catalog.workshops.aws/)

workshops/relational-migration/app.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,48 @@
1+
from chalice import Chalice
2+
import boto3
3+
from botocore.exceptions import ClientError
4+
import logging
15
import os
2-
# import time, datetime
36
import json
4-
from chalice import Chalice
5-
from chalicelib import mysql_calls as db
6-
# from chalicelib import dynamodb_calls as db
7-
87
import mysql.connector
9-
import logging
108

11-
import boto3
12-
from botocore.exceptions import ClientError
9+
10+
migration_stage = 'dynamodb'
11+
# 'relational', 'dual-write', 'dynamodb']
12+
13+
if "MIGRATION_STAGE" in os.environ:
14+
migration_stage = os.environ['MIGRATION_STAGE']
15+
16+
if migration_stage == 'relational' or migration_stage == 'dual-write':
17+
from chalicelib import mysql_calls as db
18+
else:
19+
from chalicelib import dynamodb_calls as db
1320

1421
app = Chalice(app_name='migration')
1522

1623
region = "us-east-2"
17-
sql = "select * from Boston"
1824

1925
if "AWS_DEFAULT_REGION" in os.environ:
2026
region = os.environ['AWS_DEFAULT_REGION']
2127

2228
@app.route('/', methods=['GET'], cors=True)
2329
def ping():
24-
return {'engine': db.engine()}
30+
request = app.current_request
31+
context = request.to_dict()['context']
32+
return_status = {'engine': db.engine()}
33+
34+
if 'stage' in context:
35+
return_status['stage'] = context['stage']
36+
37+
return return_status
2538

2639

2740
@app.route('/list_tables', methods=['GET'], cors=True)
2841
def list_tables():
2942
request = app.current_request
30-
# print(request.to_dict())
43+
# print('*****')
44+
# print(json.dumps(request.to_dict(), indent=2))
45+
3146
return db.list_tables()
3247

3348

Lines changed: 195 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,200 @@
11
# Implementation of application's DynamoDB database calls
2+
import boto3
3+
from boto3.dynamodb.types import TypeSerializer, TypeDeserializer
4+
from botocore.config import Config
5+
from botocore.exceptions import ClientError
6+
7+
import os
8+
import time, datetime
9+
import json
10+
11+
deserializer = boto3.dynamodb.types.TypeDeserializer()
12+
serializer = boto3.dynamodb.types.TypeSerializer()
13+
14+
return_limit = 20
15+
region = 'us-east-2'
16+
17+
ddb = boto3.client('dynamodb', region_name=region)
18+
ddbr = boto3.resource('dynamodb', region_name=region)
19+
# resource vs client: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-client-resource
20+
21+
serializer = TypeSerializer()
22+
deserializer = TypeDeserializer()
23+
table_metadata = {}
24+
25+
26+
def engine():
27+
return "DynamoDB"
228

329

430
def list_tables():
31+
response = None
32+
table_names = []
33+
try:
34+
response = ddb.list_tables()
35+
# print('ddb list_tables response')
36+
base_tables = response['TableNames']
37+
for table in base_tables:
38+
table_names.append(table)
39+
40+
response_dt = ddb.describe_table(TableName=table)
41+
table_metadata = json.loads(json.dumps(response_dt['Table'], default=serialize_datetime))
42+
43+
if 'GlobalSecondaryIndexes' in table_metadata:
44+
gsis = table_metadata['GlobalSecondaryIndexes']
45+
for gsi in gsis:
46+
table_names.append(table + '.' + gsi['IndexName'])
47+
48+
except Exception as err:
49+
return {'Error': str(err)}
50+
51+
return table_names
52+
53+
54+
def desc_table(table):
55+
try:
56+
response = ddb.describe_table(TableName=table)
57+
response_str = json.dumps(response['Table'], default=serialize_datetime)
58+
59+
except Exception as err:
60+
return {'Error': str(err)}
61+
62+
# table_metadata[table] = json.loads(response_str)
63+
64+
return response_str
65+
66+
67+
def scan_table(table):
68+
response = None
69+
key_list = get_keys(table)
70+
71+
try:
72+
response = ddb.scan(TableName=table)
73+
except Exception as err:
74+
return {'Error': str(err)}
75+
76+
items = []
77+
78+
ddb_items = response['Items']
79+
80+
for item in ddb_items:
81+
new_item = deserialize_ddb(item) # convert DynamoDB JSON to plain JSON
82+
items.append(new_item)
83+
84+
return items
85+
86+
87+
def query(table, request):
88+
89+
keyList = list(request['queryRequest']['queryConditions'].keys())
90+
sql_condition = keyList[0] + ' = %s'
91+
92+
if len(keyList) > 1:
93+
sql_condition += ' AND ' + keyList[1] + ' = %s'
94+
95+
key_vals = list(request['queryRequest']['queryConditions'].values())
96+
97+
query_stmt = 'SELECT * FROM ' + table + ' WHERE ' + sql_condition
98+
99+
100+
# mysql_cur.execute(query_stmt, key_vals)
101+
# result = mysql_cur.fetchall()
102+
# dataset = format_sql_dataset(result)
103+
104+
return dataset
105+
106+
107+
def get_record(table, request):
108+
get_request = {'TableName': table}
109+
keyList = list(request['Key'].keys())
110+
pk_name = keyList[0]
111+
pk_value = request['Key'][keyList[0]]
112+
sk_name = None
113+
sk_value = None
114+
if len(keyList) > 1:
115+
sk_name = keyList[1]
116+
sk_value = request['Key'][keyList[1]]
117+
118+
response = None
119+
key_list = get_keys(table)
120+
get_request['Key'] = request['Key']
121+
122+
try:
123+
table = ddbr.Table(table)
124+
response = table.get_item(**get_request)
125+
# print('response: ')
126+
# print(response['Item'])
127+
128+
return response['Item']
129+
except Exception as err:
130+
return {'Error': str(err)}
131+
132+
items = []
133+
134+
# ddb_items = response['Items']
135+
# print(ddb_items)
136+
137+
return response['Item']
138+
139+
140+
def new_record(table, record):
141+
142+
return({"status":1})
143+
144+
145+
def update_record(table, request):
146+
147+
return({"status": 1})
148+
149+
150+
def delete_record(table, recordKey):
151+
152+
return({"status":1})
153+
154+
155+
def serialize_datetime(obj):
156+
if isinstance(obj, datetime.datetime):
157+
formatted = obj.isoformat()
158+
return formatted[:19].replace('T', ' ')
159+
raise TypeError("Type not serializable")
160+
161+
def deserialize_ddb(dynamodb_json_string):
162+
return deserializer.deserialize({'M': dynamodb_json_string})
163+
164+
def get_keys(table):
165+
key_list = []
166+
if table not in table_metadata:
167+
try:
168+
response = ddb.describe_table(TableName=table)
169+
response_str = json.dumps(response['Table'], default=serialize_datetime)
170+
171+
except Exception as err:
172+
return {'Error': str(err)}
173+
174+
table_metadata[table] = json.loads(response_str)
175+
176+
key_schema = table_metadata[table]['KeySchema']
177+
178+
hash_key = [key for key in key_schema if key['KeyType'] == 'HASH']
179+
range_key = [key for key in key_schema if key['KeyType'] == 'RANGE']
180+
181+
key_list.append(hash_key[0]['AttributeName'])
182+
if range_key:
183+
key_list.append(range_key[0]['AttributeName'])
184+
185+
return key_list
186+
187+
188+
def dynamo_to_python(dynamo_object: dict) -> dict:
189+
deserializer = TypeDeserializer()
190+
return {
191+
k: deserializer.deserialize(v)
192+
for k, v in dynamo_object.items()
193+
}
5194

6-
return ['Table1','Table2','Table3']
195+
def python_to_dynamo(python_object: dict) -> dict:
196+
serializer = TypeSerializer()
197+
return {
198+
k: serializer.serialize(v)
199+
for k, v in python_object.items()
200+
}

workshops/relational-migration/chalicelib/mysql_calls.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ def engine():
4343

4444

4545
def list_tables():
46-
request = "SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = '" + mysql_db + "'"
46+
request = "SELECT TABLE_NAME FROM information_schema.tables "
47+
request += "WHERE table_schema = '" + mysql_db + "' AND table_type = 'BASE TABLE'"
4748
mysql_cur.execute(request)
4849
result = mysql_cur.fetchall()
49-
print(result)
50+
# print(result)
51+
5052
tables = []
5153
for t in result:
5254
tables.append(t['TABLE_NAME'])
@@ -97,7 +99,6 @@ def scan_table(table):
9799
request += "FROM " + mysql_db + "." + table + " "
98100
request += "LIMIT " + str(limit)
99101

100-
101102
mysql_cur.execute(request)
102103
result = mysql_cur.fetchall()
103104
dataset = format_sql_dataset(result)
@@ -115,9 +116,9 @@ def query(table, request):
115116

116117
key_vals = list(request['queryRequest']['queryConditions'].values())
117118

118-
get_stmt = 'SELECT * FROM ' + table + ' WHERE ' + sql_condition
119+
query_stmt = 'SELECT * FROM ' + table + ' WHERE ' + sql_condition
119120

120-
mysql_cur.execute(get_stmt, key_vals)
121+
mysql_cur.execute(query_stmt, key_vals)
121122
result = mysql_cur.fetchall()
122123
dataset = format_sql_dataset(result)
123124

@@ -126,13 +127,13 @@ def query(table, request):
126127

127128
def get_record(table, request):
128129

129-
keyList = list(request['recordKey'].keys())
130+
keyList = list(request['Key'].keys())
130131
sql_condition = keyList[0] + ' = %s'
131132

132133
if len(keyList) > 1:
133134
sql_condition += ' AND ' + keyList[1] + ' = %s'
134135

135-
key_vals = list(request['recordKey'].values())
136+
key_vals = list(request['Key'].values())
136137

137138
get_stmt = 'SELECT * FROM ' + table + ' WHERE ' + sql_condition
138139

@@ -207,6 +208,7 @@ def delete_record(table, recordKey):
207208
return({"status":mysql_cur.rowcount})
208209

209210
def format_sql_dataset(dataset):
211+
210212
formatted_dataset = []
211213
for index, row in enumerate(dataset):
212214
formatted_row = {}

0 commit comments

Comments
 (0)