-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataRemover.py
More file actions
executable file
·350 lines (286 loc) · 12.2 KB
/
dataRemover.py
File metadata and controls
executable file
·350 lines (286 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python3
"""
Data Remover Script for MySQL Database
Connects to MySQL, retrieves account data, parses DG1 for user information,
and provides interactive deletion of users from account and sod tables
"""
import argparse
import os
import sys
from binascii import unhexlify
from datetime import datetime
import mysql.connector
from mysql.connector import Error
# Import DG1 parsing logic
from binascii import unhexlify
try:
from mrz.checker.td3 import TD3CodeChecker as TD3Code
except ImportError:
try:
from mrz.checker.td3 import TD3Code
except ImportError:
try:
from mrz import TD3
TD3Code = TD3
except ImportError:
print("Error: Could not import MRZ library. Please install it with: pip3 install mrz")
sys.exit(1)
def parse_arguments():
"""Parse command line arguments for MySQL connection parameters"""
parser = argparse.ArgumentParser(description='Remove account data from MySQL database')
parser.add_argument('--host', required=True, help='MySQL server host')
parser.add_argument('--port', type=int, default=3306, help='MySQL server port (default: 3306)')
parser.add_argument('--username', required=True, help='MySQL username')
parser.add_argument('--password', required=True, help='MySQL password')
parser.add_argument('--database', required=True, help='MySQL database name')
parser.add_argument('--uid-filter', required=True, help='Comma-separated list of UIDs to delete. Use quotes for UIDs with special characters (e.g., "1,2,3" or \'user@domain.com,admin.test\'). Supports both numeric and string UIDs.')
return parser.parse_args()
def connect_to_mysql(host, port, username, password, database):
"""Connect to MySQL database and return connection object"""
try:
print(f"Connecting to MySQL database: {database} on {host}:{port} as {username}")
connection = mysql.connector.connect(
host=host,
port=port,
user=username,
password=password,
database=database
)
if connection.is_connected():
print(f"Successfully connected to MySQL database: {database}")
return connection
except Error as e:
print(f"Error connecting to MySQL: {e}")
sys.exit(1)
def fetch_user_data(connection, uid_filter):
"""Fetch user data from account table filtered by UIDs"""
try:
cursor = connection.cursor()
# Parse comma-separated UIDs and create placeholders for SQL IN clause
# Handle quoted UIDs properly by stripping outer quotes if present
uid_list = []
for uid in uid_filter.split(','):
uid = uid.strip()
# Remove outer quotes if present (both single and double quotes)
if (uid.startswith('"') and uid.endswith('"')) or (uid.startswith("'") and uid.endswith("'")):
uid = uid[1:-1]
if uid: # Only add non-empty UIDs
uid_list.append(uid)
if not uid_list:
print("Error: No valid UIDs provided in filter")
return []
# Create placeholders for SQL IN clause
placeholders = ','.join(['%s'] * len(uid_list))
query = f"""
SELECT uid, country, sodId, expires, aaPublicKey, aaSigAlgo, aaCount,
aaLastAuthn, dg1, dg2
FROM account
WHERE uid IN ({placeholders})
"""
cursor.execute(query, uid_list)
print(f"Filtering by UIDs: {', '.join(uid_list)}")
records = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
cursor.close()
print(f"Retrieved {len(records)} records from account table")
return records, columns
except Error as e:
print(f"Error fetching data: {e}")
return [], []
def mrz_date_to_readable(mrz_date: str) -> str:
"""Convert MRZ date format to readable date"""
if not mrz_date or mrz_date == "Invalid":
return "Invalid"
try:
yy = int(mrz_date[:2])
mm = int(mrz_date[2:4])
dd = int(mrz_date[4:6])
current_year = datetime.now().year % 100
# For birth dates: if year > current year, assume 1900s, else 2000s
# For expiry dates: if year < current year, assume 2000s, else 1900s (but this is rare)
# Simple heuristic: years 00-50 are 2000s, 51-99 are 1900s
if yy <= 50:
century = 2000
else:
century = 1900
year = century + yy
return datetime(year, mm, dd).strftime("%Y-%m-%d")
except (ValueError, IndexError):
return "Invalid"
def parse_dg1(dg1_hex):
"""Parse DG1 data to extract MRZ information"""
if not dg1_hex:
return {}
try:
# Handle different data types
if isinstance(dg1_hex, bytes):
# Already bytes, use directly
dg1_bytes = dg1_hex
elif isinstance(dg1_hex, str):
# Clean and validate hex string
dg1_hex_clean = dg1_hex.strip()
# Remove any non-hex characters
dg1_hex_clean = ''.join(c for c in dg1_hex_clean if c in '0123456789ABCDEFabcdef')
# Ensure even length (pad with 0 if odd)
if len(dg1_hex_clean) % 2 != 0:
dg1_hex_clean = '0' + dg1_hex_clean
if not dg1_hex_clean:
print("No valid hex data found in DG1")
return {}
# Convert hex to bytes
dg1_bytes = unhexlify(dg1_hex_clean)
else:
print(f"Unsupported DG1 data type: {type(dg1_hex)}")
return {}
# Extract ASCII MRZ from DG1 bytes (skip initial TLV/ASN.1 bytes)
mrz_bytes = dg1_bytes[5:]
# Filter ASCII characters (32-126)
mrz_text = "".join(chr(b) for b in mrz_bytes if 32 <= b <= 126)
# Split into lines (TD3: 2 lines × 44 chars)
line_length = 44
mrz_lines = [mrz_text[i:i+line_length] for i in range(0, len(mrz_text), line_length)]
mrz_text_clean = "\n".join(mrz_lines[:2]) # Take first 2 lines
if not mrz_text_clean.strip():
return {}
# Parse MRZ using mrz library
mrz = TD3Code(mrz_text_clean)
# Initialize with default values
parsed_data = {
"document_type": 'N/A',
"issuing_state": 'N/A',
"surname": 'N/A',
"given_names": 'N/A',
"document_number": 'N/A',
"nationality": 'N/A',
"date_of_birth": 'Invalid',
"sex": 'N/A',
"date_of_expiry": 'Invalid'
}
# Try fields attribute/method (most reliable for this MRZ library)
if hasattr(mrz, 'fields'):
try:
if callable(mrz.fields):
fields_data = mrz.fields()
else:
fields_data = mrz.fields
parsed_data.update({
"document_type": getattr(fields_data, 'document_type', 'N/A'),
"issuing_state": getattr(fields_data, 'country', 'N/A'),
"surname": getattr(fields_data, 'surname', 'N/A'),
"given_names": getattr(fields_data, 'name', getattr(fields_data, 'given_names', 'N/A')),
"document_number": getattr(fields_data, 'document_number', 'N/A'),
"nationality": getattr(fields_data, 'nationality', 'N/A'),
"date_of_birth": mrz_date_to_readable(str(getattr(fields_data, 'birth_date', ''))),
"sex": getattr(fields_data, 'sex', 'N/A'),
"date_of_expiry": mrz_date_to_readable(str(getattr(fields_data, 'expiry_date', '')))
})
except Exception:
pass
return parsed_data
except Exception as e:
print(f"Error parsing DG1: {e}")
return {}
def delete_user_from_database(connection, uid, sod_id):
"""Delete user from both account and sod tables"""
try:
cursor = connection.cursor()
# Delete from account table
account_query = "DELETE FROM account WHERE uid = %s"
cursor.execute(account_query, (uid,))
account_deleted = cursor.rowcount
# Delete from sod table if sod_id exists
sod_deleted = 0
if sod_id:
sod_query = "DELETE FROM sod WHERE id = %s"
cursor.execute(sod_query, (sod_id,))
sod_deleted = cursor.rowcount
# Commit the transaction
connection.commit()
cursor.close()
return account_deleted, sod_deleted
except Error as e:
print(f"Error deleting user {uid}: {e}")
connection.rollback()
return 0, 0
def process_users_for_deletion(records, columns, connection):
"""Process users and provide interactive deletion"""
deleted_count = 0
skipped_count = 0
print(f"\nProcessing {len(records)} users for potential deletion...")
print("=" * 80)
for record in records:
# Convert record to dictionary
row_data = dict(zip(columns, record))
# Parse DG1 for user information
dg1_data = parse_dg1(row_data.get('dg1', ''))
# Extract user information
uid = row_data.get('uid', 'unknown')
country = row_data.get('country', 'N/A')
sod_id = row_data.get('sodId', None)
surname = dg1_data.get('surname', 'N/A')
given_names = dg1_data.get('given_names', 'N/A')
expiry_date = dg1_data.get('date_of_expiry', 'N/A')
# Display user information
print(f"\nUser Information:")
print(f" UID: {uid}")
print(f" Name: {surname}, {given_names}")
print(f" Country: {country}")
print(f" Passport Expiry Date: {expiry_date}")
print(f" SOD ID: {sod_id}")
print("-" * 50)
# Ask for confirmation
while True:
response = input("Do you want to delete this user? (yes/no): ").strip().lower()
if response in ['yes', 'y']:
# Delete user
account_deleted, sod_deleted = delete_user_from_database(connection, uid, sod_id)
if account_deleted > 0:
print(f"✅ Successfully deleted user {uid}")
if sod_deleted > 0:
print(f"✅ Also deleted SOD record {sod_id}")
deleted_count += 1
else:
print(f"❌ Failed to delete user {uid}")
break
elif response in ['no', 'n']:
print(f"⏭️ Skipped user {uid}")
skipped_count += 1
break
else:
print("Please enter 'yes' or 'no'")
# Summary
print("\n" + "=" * 80)
print("DELETION SUMMARY:")
print(f" Total users processed: {len(records)}")
print(f" Users deleted: {deleted_count}")
print(f" Users skipped: {skipped_count}")
print("=" * 80)
def main():
"""Main function"""
# Parse command line arguments
args = parse_arguments()
# Validate parameters
if not all([args.host, args.username, args.password, args.database, args.uid_filter]):
print("Error: Missing required parameters")
print("Required: --host, --username, --password, --database, --uid-filter")
sys.exit(1)
# Connect to MySQL
connection = connect_to_mysql(
args.host, args.port, args.username,
args.password, args.database
)
try:
# Fetch user data from account table
records, columns = fetch_user_data(connection, args.uid_filter)
if not records:
print("NO RECORDS FOUND FOR THE SPECIFIED UIDs")
return
# Process users for deletion
process_users_for_deletion(records, columns, connection)
finally:
# Close connection
if connection.is_connected():
connection.close()
print("\nMySQL connection closed")
if __name__ == "__main__":
main()