Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 160 additions & 63 deletions modules/service_key_finder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys
import boto3
import logging


def find_kms_key_usage(session, key_region, input_key_arn, key_resources):

Expand All @@ -16,117 +18,145 @@ def find_kms_key_usage(session, key_region, input_key_arn, key_resources):
if key_description['KeyMetadata']['KeyManager'] == 'AWS':
#AWS Managed Key should only have 1 alias.
alias = find_key_aliases(session, key_region, input_key_arn)

service = alias[0][10:] #Remove alias/aws/
print(service)

try:
func = "find_" + service + "_key_usage(session, key_region, input_key_arn, key_resources)"
exec(func)
except:
print("Error with Managed Key and " + service + " Module.")
if str(alias[0]).startswith('alias/aws/'):
service = alias[0][10:] # Remove alias/aws/

try:
func = f"find_{service}_key_usage(session, key_region, input_key_arn, key_resources)"
exec(func)
except Exception as e:
logging.error(
f"A managed key check for the {service} has not been defined yet.")
else:
logging.error(
f"A conflict between KeyMetadata and the alias has occurred in {alias[0]} with KeyManager: {key_description['KeyMetadata']['KeyManager']}")

else:
#Non Managed Key can be used across multiple services.
try:
find_glue_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Glue Module.")
except Exception as e:
logging.error(f"Error with Glue Module: {e}")

try:
find_s3_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with S3 Module")
except Exception as e:
logging.error(f"Error with S3 Module: {e}")

try:
find_ebs_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with EBS Module")
except Exception as e:
logging.error(f"Error with EBS Module: {e}")

try:
find_qldb_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with QLDB Module")
except Exception as e:
logging.error(f"Error with QLDB Module: {e}")

try:
find_keyspaces_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Keyspaces Module")
except Exception as e:
logging.error(f"Error with Keyspaces Module: {e}")

try:
find_timestream_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Timestream Module")
except Exception as e:
logging.error(f"Error with Timestream Module: {e}")

try:
find_dynamodb_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with DynamoDB Module")
except Exception as e:
logging.error(f"Error with DynamoDB Module: {e}")

try:
find_neptune_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Neptune Module")
except Exception as e:
logging.error(f"Error with Neptune Module: {e}")

try:
find_secretsmanager_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Secrets Manager Module")
except Exception as e:
logging.error(f"Error with Secrets Manager Module: {e}")

try:
find_ssm_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with SSM Module")
except Exception as e:
logging.error(f"Error with SSM Module: {e}")

try:
find_redshift_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Redshift Module")
except Exception as e:
logging.error(f"Error with Redshift Module: {e}")

try:
find_redshiftserverless_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Redshift Serverless Module")
except Exception as e:
logging.error(f"Error with Redshift Serverless Module: {e}")

try:
find_elasticfilesystem_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with EFS Module")
except Exception as e:
logging.error(f"Error with EFS Module: {e}")

try:
find_elasticache_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with Elasticache Module")
except Exception as e:
logging.error(f"Error with Elasticache Module: {e}")

try:
find_docdb_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with DocumentDB Module")
except Exception as e:
logging.error(f"Error with DocumentDB Module: {e}")

try:
find_rds_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with RDS Module")
except Exception as e:
logging.error(f"Error with RDS Module: {e}")

try:
find_sqs_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with SQS Module")
except Exception as e:
logging.error(f"Error with SQS Module: {e}")

try:
find_sns_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with SNS Module")
except Exception as e:
logging.error(f"Error with SNS Module: {e}")

try:
find_fsx_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with FSx Module")
except Exception as e:
logging.error(f"Error with FSx Module: {e}")

try:
find_mq_key_usage(session, key_region, input_key_arn, key_resources)
except:
print("Error with MQ Module")
except Exception as e:
logging.error(f"Error with MQ Module: {e}")

try:
find_acm_key_usage(session, key_region,
input_key_arn, key_resources)
except Exception as e:
logging.error(f"Error with ACM Module: {e}")

try:
find_dms_key_usage(session, key_region,
input_key_arn, key_resources)
except Exception as e:
logging.error(f"Error with DMS Module: {e}")

try:
find_lambda_key_usage(session, key_region,
input_key_arn, key_resources)
except Exception as e:
logging.error(f"Error with Lambda Module: {e}")

try:
find_es_key_usage(session, key_region,
input_key_arn, key_resources)
except Exception as e:
logging.error(f"Error with Elasticsearch Module: {e}")


def key_resources_append(service, resource, arn, context, key_resources):
Expand Down Expand Up @@ -156,6 +186,73 @@ def find_key_aliases(session, key_region, input_key_arn):

return key_aliases


def find_acm_key_usage(session, key_region, input_key_arn, key_resources):
acm_pca_client = session.client('acm-pca', region_name=key_region)

ca_list_response = acm_pca_client.list_certificate_authorities()
for ca_summary in ca_list_response.get('CertificateAuthorities', []):
ca_arn = ca_summary['Arn']

ca_details = acm_pca_client.describe_certificate_authority(CertificateAuthorityArn=ca_arn)
kms_key_id = ca_details['CertificateAuthority'].get('KeyId')
if kms_key_id == input_key_arn:
key_resources_append('ACM Private CA', 'Certificate Authority', ca_arn, 'CA Key Material Encryption', key_resources)

def find_dms_key_usage(session, key_region, input_key_arn, key_resources):
dms_client = session.client('dms', region_name=key_region)

replication_instances = dms_client.describe_replication_instances()
for instance in replication_instances.get('ReplicationInstances', []):
kms_key_id = instance.get('KmsKeyId')
if kms_key_id == input_key_arn:
key_resources_append('DMS', 'Replication Instance',
instance['ReplicationInstanceIdentifier'], 'Encryption At Rest', key_resources)

endpoints = dms_client.describe_endpoints()
for endpoint in endpoints.get('Endpoints', []):
kms_key_id = endpoint.get('KmsKeyId')
if kms_key_id == input_key_arn:
key_resources_append(
'DMS', 'Endpoint', endpoint['EndpointIdentifier'], 'Encrypted Connection Info', key_resources)


def find_lambda_key_usage(session, key_region, input_key_arn, key_resources):
lambda_client = session.client('lambda', region_name=key_region)

paginator = lambda_client.get_paginator('list_functions')
for page in paginator.paginate():
for function in page['Functions']:
function_name = function['FunctionName']

# Get full config to check KMS key
config = lambda_client.get_function_configuration(
FunctionName=function_name)

kms_key_arn = config.get('KMSKeyArn')
if kms_key_arn == input_key_arn:
key_resources_append('Lambda', 'Function', function_name,
'Environment Variable Encryption', key_resources)


def find_es_key_usage(session, key_region, input_key_arn, key_resources):
es_client = session.client('es', region_name=key_region)

domains_response = es_client.list_domain_names()
for domain_info in domains_response.get('DomainNames', []):
domain_name = domain_info['DomainName']

domain_details = es_client.describe_elasticsearch_domain(
DomainName=domain_name)
encryption_config = domain_details['DomainStatus'].get(
'EncryptionAtRestOptions', {})

kms_key_id = encryption_config.get('KmsKeyId')
if kms_key_id == input_key_arn:
key_resources_append('Elasticsearch', 'Domain',
domain_name, 'Encryption At Rest', key_resources)


def find_ebs_key_usage(session, key_region, input_key_arn, key_resources):
#EBS Volumes
#TODO: EBS Default Encryption Setting
Expand Down Expand Up @@ -463,7 +560,7 @@ def find_elasticache_key_usage(session, key_region, input_key_arn, key_resources

for replication_group in elasticache_replication_group_results['ReplicationGroups']:
if replication_group['AtRestEncryptionEnabled']:
if replication_group['KmsKeyId'] == input_key_arn:
if replication_group.get('KmsKeyId', 'DISABLED') == input_key_arn:
key_resources_append('Elasticache', 'Replication Group', replication_group['ARN'], 'Encryption At Rest', key_resources)

def find_docdb_key_usage(session, key_region, input_key_arn, key_resources):
Expand Down Expand Up @@ -655,7 +752,7 @@ def find_sqs_key_usage(session, key_region, input_key_arn, key_resources):

key_descriptors.append(key_id)

for queue in sqs_queue_results['QueueUrls']:
for queue in sqs_queue_results.get('QueueUrls', []):
queue_attributes = sqs_client.get_queue_attributes(
QueueUrl = queue,
AttributeNames = [
Expand Down