diff --git a/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK.py b/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK.py index 7920df86b..51c2f8523 100644 --- a/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK.py +++ b/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK.py @@ -8,50 +8,80 @@ # or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for # the specific language governing permissions and limitations under the License. -# -# Rule Name: -# SQS_ENCRYPTION_CHECK -# Description: -# Check whether SQS queue has encryption at rest enabled. -# -# Rationale: -# Regular checks to ensure that SQS queue has encryption at rest enabled. -# -# Indicative Severity: -# Medium -# -# Trigger: -# Periodic checks on AWS::SQS::Queue -# -# Reports on: -# AWS::SQS::Queue -# -# Rule Parameters: -# QueueNameStartsWith -# (Optional) Specify your SQS queue names to check for. Starting SQS queue names will suffice. For example, your SQS queue names are "processimages" and "extractdocs". -# You can specify process, extract as the value for QueueNameStartsWith -# -# Scenarios: -# Scenario: 1 -# Given: Rules parameter is provided -# And: It contains a parameter key other than QueueNameStartsWith -# Then: Return ERROR -# Scenario: 2 -# Given: Rules parameter is provided -# And: There is no value specified for QueueNameStartsWith -# Then: Checks all SQS queues. Return COMPLIANT with annotation if SQS queues have encryption at rest enabled. Otherwise, return NON_COMPLIANT. -# Scenario: 3 -# Given: Rules parameter is provided and starting SQS queue names values are provided -# And: Checks SQS queue names that match the specified starting SQS queue names in QueueNameStartsWith -# Then: Checks specific SQS queues. Return COMPLIANT with annotation if SQS queues have encryption at rest enabled. Otherwise, return NON_COMPLIANT. -# Scenario: 4 -# Given: Rules parameter is provided and (optional) starting SQS queue names values are provided -# And: No SQS queue names exist or (optional) no matching SQS queue names found. -# Then: Return NO RESULTS and print no SQS queues to check for to CloudWatch. - +""" +##################################### +## Samir Bassaly (bsslys) ## +##################################### +Rule Name: + SQS_ENCRYPTION_CHECK +Version: + 2.0 +Description: + Check whether SQS queue has encryption at rest enabled. +Trigger: + Periodic - 24 hours +Reports on: + AWS::SQS::Queue +Rule Parameters: + ExemptedQueueNames (Optional) + Comma separated list of queues names to be exempted from evaluation (NOT_APPLICABLE); Regex patterns are allowed. + The default value in case of not passing this parameter is an empty list [] + +Scenarios: + Scenario: 1 - valid rule parameter + Given: Rule parameters contain the key ExemptedQueueNames + AND: The value of key ExemptedQueueNames is string + Then: Process the parameter value and use it for evaluation exemption + Scenario: 2 - invalid rule parameter + Given: Rule parameters doesn't contain the key ExemptedQueueNames + OR: The value of key ExemptedQueueNames isn't string + Then: Use an empty list [] of names for evaluation exemption + + Scenario: 3 - exempted by name + Given: Queue "x" from the results paginator + And: Queue "x" name is in the list of ExemptedQueueNames + Then: Return NOT_APPLICABLE with annotation EXEMPTED_BY_NAME + Scenario: 4 - exempted by regex + Given: Queue "x" from the results paginator + And: Queue "x" name matches a regex in the list of ExemptedQueueNames + Then: Return NOT_APPLICABLE with annotation EXEMPTED_BY_REGEX + + Scenario: 5 - non compliant queue; no key is set + Given: Queue "x" from the results paginator + And: Queue "x" name doesn't match any name or regex in the list of ExemptedQueueNames + AND: Queue "x" doesn't have the attribute "KmsMasterKeyId" set + Then: Return NON_COMPLIANT with annotation ENCRYPTION_DISABLED + + Scenario: 6 - non compliant queue; key doesn't exist + Given: Queue "x" from the results paginator + And: Queue "x" name doesn't match any name or regex in the list of ExemptedQueueNames + AND: Queue "x" has the attribute "KmsMasterKeyId" set + AND: KmsMasterKeyId points to a non existing key + Then: Return NON_COMPLIANT with annotation KEY_DOES_NOT_EXIST + + Scenario: 7 - non compliant queue; key isn't enabled + Given: Queue "x" from the results paginator + And: Queue "x" name doesn't match any name or regex in the list of ExemptedQueueNames + AND: Queue "x" has the attribute "KmsMasterKeyId" set + AND: KmsMasterKeyId points to an existing key + AND: KmsMasterKeyId points to a key where its state isn't ENABLED + Then: Return NON_COMPLIANT with annotation KEY_IS_NOT_ENABLED + + Scenario: 8 - compliant queue + Given: Queue "x" from the results paginator + And: Queue "x" name doesn't match any name or regex in the list of ExemptedQueueNames + AND: Queue "x" has the attribute "KmsMasterKeyId" set + AND: KmsMasterKeyId points to an existing key + AND: KmsMasterKeyId points to an ENABLED key + Then: Return COMPLIANT with annotation COMPLIANT + + +""" +import datetime import json +import re import sys -import datetime + import boto3 import botocore @@ -76,51 +106,129 @@ ############# # Main Code # ############# +EXEMPTED_BY_NAME = 'Queue is explicitly exempted from evaluation by name: {}' +EXEMPTED_BY_REGEX = 'Queue is exempted from evaluation by regex: {}' +ENCRYPTION_DISABLED = 'Encryption disabled; KmsMasterKeyId is not set' +KEY_DOES_NOT_EXIST = 'Encryption key {} does not exist' +KEY_IS_NOT_ENABLED = 'Encryption key {} is not in Enabled state, its state is {}' +COMPLIANT = 'Queue is compliant. KmsMasterKeyId is set to an existing enabled key: {}' + def evaluate_compliance(event, configuration_item, valid_rule_parameters): evaluations = [] - yourqueues = [] - check = {} - sqs = get_client('sqs', event) - if valid_rule_parameters: - yourqueues = valid_rule_parameters["QueueNameStartsWith"].split(",") - for queue in yourqueues: - caseinsensitivequeue = queue.lower() - response = sqs.list_queues(QueueNamePrefix=caseinsensitivequeue.strip()) - if "QueueUrls" not in response.keys(): - print("There are no SQS queues to check for.") - return None - for qurl in response["QueueUrls"]: - check = sqs.get_queue_attributes(QueueUrl=qurl, AttributeNames=['KmsMasterKeyId'],) - if "Attributes" in check.keys(): - evaluations.append(build_evaluation(qurl, 'COMPLIANT', event, annotation='SQS Queue URL is encrypted with KMS key: '+check["Attributes"]["KmsMasterKeyId"])) - else: - evaluations.append(build_evaluation(qurl, 'NON_COMPLIANT', event, annotation='SQS Queue URL is not encrypted.')) - else: - # Checking all queues. Maximum number is 1000 queues. - response = sqs.list_queues() - if "QueueUrls" not in response.keys(): - print("There are no SQS queues to check for.") - return None - for qurl in response["QueueUrls"]: - check = sqs.get_queue_attributes(QueueUrl=qurl, AttributeNames=['KmsMasterKeyId'],) - if "Attributes" in check.keys(): - evaluations.append(build_evaluation(qurl, 'COMPLIANT', event, annotation='SQS Queue URL is encrypted with KMS key: '+check["Attributes"]["KmsMasterKeyId"])) + + # List sqs queues in the given account and region + client = get_client('sqs', event) + # Need to paginate and iterate over results + paginator = client.get_paginator('list_queues') + operation_parameters = {} + page_iterator = paginator.paginate(**operation_parameters) + + for page in page_iterator: + for queue_url in page.get('QueueUrls', []): + queue_name = queue_url.split('/')[-1] + + # Scenario: 3 - exempted by name + # Scenario: 4 - exempted by regex + results, annotation = is_queue_exempted(queue_name, valid_rule_parameters) + if results: + evaluations.append( + build_evaluation(queue_url, 'NOT_APPLICABLE', event, annotation=annotation)) + + # Evaluate queue else: - evaluations.append(build_evaluation(qurl, 'NON_COMPLIANT', event, annotation='SQS Queue URL is not encrypted.')) + queue_attributes = client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['KmsMasterKeyId']) + + kms_master_key_id = queue_attributes.get("Attributes", {}).get('KmsMasterKeyId', None) + + # Scenario: 5 - non compliant queue; no key is set + if kms_master_key_id is None: + evaluations.append( + build_evaluation(queue_url, 'NON_COMPLIANT', event, annotation=ENCRYPTION_DISABLED)) + + else: + key_metadata = get_key(kms_master_key_id) + + # Scenario: 6 - non compliant queue; key doesn't exist + if key_metadata is None: + evaluations.append( + build_evaluation(queue_url, 'NON_COMPLIANT', event, annotation=KEY_DOES_NOT_EXIST.format( + kms_master_key_id))) + + # Scenario: 7 - non compliant queue; key isn't enabled + elif key_metadata['KeyState'] != 'Enabled': + evaluations.append( + build_evaluation(queue_url, 'NON_COMPLIANT', event, annotation=KEY_IS_NOT_ENABLED.format( + kms_master_key_id, key_metadata['KeyState']))) + + # Scenario: 8 - compliant queue + else: + evaluations.append(build_evaluation(queue_url, 'COMPLIANT', event, annotation=COMPLIANT)) return evaluations -def evaluate_parameters(rule_parameters): + +def get_key(key_id): + client = boto3.client('kms') try: - if rule_parameters["QueueNameStartsWith"] != "" and isinstance(rule_parameters["QueueNameStartsWith"], str): - valid_rule_parameters = rule_parameters - else: - print("Please specify a valid starting SQS queue name or multiple queue names separated by comma(,)") - return valid_rule_parameters - except LookupError: - print("Please input QueueNameStartsWith as the key.") + results = client.describe_key(KeyId=key_id) + return results.get('KeyMetadata', None) + except Exception as ex: + print(str(ex)) + return None + + +def is_queue_exempted(queue_name, rule_parameters): + exempted_names = rule_parameters['exempted_names'] + exempted_regex = rule_parameters['exempted_regex'] + + # Scenario: 3 - exempted by name + if is_queue_exempted_by_name(queue_name, exempted_names): + return True, EXEMPTED_BY_NAME.format(queue_name) + # Scenario: 4 - exempted by regex + results, pattern = is_queue_exempted_by_regex(queue_name, exempted_regex) + if results: + return True, EXEMPTED_BY_REGEX.format(pattern) + + return False, None + + +def is_queue_exempted_by_name(queue_name, exempted_names): + return exempted_names and queue_name in exempted_names + + +def is_queue_exempted_by_regex(queue_name, exempted_regex): + if exempted_regex: + for pattern in exempted_regex: + if re.search(pattern, queue_name, re.IGNORECASE): + return True, pattern + return False, None + + +def evaluate_parameters(rule_parameters): + exempted_queue_names = rule_parameters.get("ExemptedQueueNames", None) + + # Scenario: 1 - valid rule parameter + if exempted_queue_names and isinstance(exempted_queue_names, str): + exempted_queue_names = exempted_queue_names.replace(" ", "") + exempted_queue_names_list = exempted_queue_names.split(",") + exempted_names = [] + exempted_regex = [] + + regex_characters = ['[', ']', '\\', '.', '^', '$', '*', '+', '{', '}', '|', '(', ')'] + for exempted_string in exempted_queue_names_list: + # Check if regex is passed + if any(c in exempted_string for c in regex_characters): + exempted_regex.append(exempted_string) + else: + exempted_names.append(exempted_string) + + return {"exempted_names": exempted_names, "exempted_regex": exempted_regex} + + # Scenario: 2 - invalid rule parameter + else: + return {"exempted_names": [], "exempted_regex": []} #################### @@ -134,10 +242,11 @@ def build_parameters_value_error_response(ex): Keyword arguments: ex -- Exception text """ - return build_error_response(internal_error_message="Parameter value is invalid", - internal_error_details="An ValueError was raised during the validation of the Parameter value", - customer_error_code="InvalidParameterValueException", - customer_error_message=str(ex)) + return build_error_response(internal_error_message="Parameter value is invalid", + internal_error_details="An ValueError was raised during the validation of the Parameter value", + customer_error_code="InvalidParameterValueException", + customer_error_message=str(ex)) + # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. @@ -156,7 +265,8 @@ def get_client(service, event, region=None): aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'], region_name=region - ) + ) + # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): @@ -178,6 +288,7 @@ def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_ eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc + def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on configuration change rules. @@ -195,6 +306,7 @@ def build_evaluation_from_config_item(configuration_item, compliance_type, annot eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime'] return eval_ci + #################### # Boilerplate Code # #################### @@ -214,28 +326,33 @@ def get_execution_role_arn(event): return role_arn + # Build annotation within Service constraints def build_annotation(annotation_string): if len(annotation_string) > 256: return annotation_string[:244] + " [truncated]" return annotation_string + # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference + # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' + # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'ScheduledNotification' + # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): @@ -247,6 +364,7 @@ def get_configuration(resource_type, resource_id, configuration_capture_time): configuration_item = result['configurationItems'][0] return convert_api_configuration(configuration_item) + # Convert from the API model to the original invocation model def convert_api_configuration(configuration_item): for k, v in configuration_item.items(): @@ -262,18 +380,22 @@ def convert_api_configuration(configuration_item): configuration_item['relationships'][i]['name'] = configuration_item['relationships'][i]['relationshipName'] return configuration_item + # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistiry API in getConfiguration function. def get_configuration_item(invoking_event): check_defined(invoking_event, 'invokingEvent') if is_oversized_changed_notification(invoking_event['messageType']): - configuration_item_summary = check_defined(invoking_event['configurationItemSummary'], 'configurationItemSummary') - return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], configuration_item_summary['configurationItemCaptureTime']) + configuration_item_summary = check_defined(invoking_event['configurationItemSummary'], + 'configurationItemSummary') + return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], + configuration_item_summary['configurationItemCaptureTime']) if is_scheduled_notification(invoking_event['messageType']): return None return check_defined(invoking_event['configurationItem'], 'configurationItem') + # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configuration_item, event): try: @@ -308,9 +430,9 @@ def get_assume_role_credentials(role_arn, region=None): ex.response['Error']['Code'] = "InternalError" raise ex + # This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account). def clean_up_old_evaluations(latest_evaluations, event): - cleaned_evaluations = [] old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( @@ -344,13 +466,14 @@ def clean_up_old_evaluations(latest_evaluations, event): return cleaned_evaluations + latest_evaluations + def lambda_handler(event, context): if 'liblogging' in sys.modules: liblogging.logEvent(event) global AWS_CONFIG_CLIENT - #print(event) + # print(event) check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} @@ -364,7 +487,8 @@ def lambda_handler(event, context): try: AWS_CONFIG_CLIENT = get_client('config', event) - if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']: + if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', + 'OversizedConfigurationItemChangeNotification']: configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_result = evaluate_compliance(event, configuration_item, valid_rule_parameters) @@ -375,7 +499,8 @@ def lambda_handler(event, context): except botocore.exceptions.ClientError as ex: if is_internal_error(ex): return build_internal_error_response("Unexpected error while completing API request", str(ex)) - return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message']) + return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], + ex.response['Error']['Message']) except ValueError as ex: return build_internal_error_response(str(ex), str(ex)) @@ -383,13 +508,15 @@ def lambda_handler(event, context): latest_evaluations = [] if not compliance_result: - latest_evaluations.append(build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account')) + latest_evaluations.append( + build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account')) evaluations = clean_up_old_evaluations(latest_evaluations, event) elif isinstance(compliance_result, str): if configuration_item: evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result)) else: - evaluations.append(build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE)) + evaluations.append( + build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE)) elif isinstance(compliance_result, list): for evaluation in compliance_result: missing_fields = False @@ -423,20 +550,27 @@ def lambda_handler(event, context): evaluation_copy = [] evaluation_copy = evaluations[:] while evaluation_copy: - AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, TestMode=test_mode) + AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, + TestMode=test_mode) del evaluation_copy[:100] # Used solely for RDK test to be able to test Lambda function return evaluations + def is_internal_error(exception): - return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error']['Code'].startswith('5') - or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error']['Code']) + return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error'][ + 'Code'].startswith('5') + or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error'][ + 'Code']) + def build_internal_error_response(internal_error_message, internal_error_details=None): return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError') -def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, customer_error_message=None): + +def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, + customer_error_message=None): error_response = { 'internalErrorMessage': internal_error_message, 'internalErrorDetails': internal_error_details, diff --git a/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK_test.py b/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK_test.py index f19878a22..151ad6ae3 100644 --- a/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK_test.py +++ b/python/SQS_ENCRYPTION_CHECK/SQS_ENCRYPTION_CHECK_test.py @@ -1,4 +1,4 @@ -# Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# Copyright 2017-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You may # not use this file except in compliance with the License. A copy of the License is located at @@ -11,6 +11,7 @@ import sys import unittest + try: from unittest.mock import MagicMock except ImportError: @@ -30,7 +31,9 @@ CONFIG_CLIENT_MOCK = MagicMock() STS_CLIENT_MOCK = MagicMock() -SQS_CLIENT_MOCK = MagicMock() +SQS_MOCK = MagicMock() +KMS_MOCK = MagicMock() + class Boto3Mock(): @staticmethod @@ -40,86 +43,229 @@ def client(client_name, *args, **kwargs): if client_name == 'sts': return STS_CLIENT_MOCK if client_name == 'sqs': - return SQS_CLIENT_MOCK + return SQS_MOCK + if client_name == 'kms': + return KMS_MOCK raise Exception("Attempting to create an unknown client") + sys.modules['boto3'] = Boto3Mock() RULE = __import__('SQS_ENCRYPTION_CHECK') -class ComplianceTest(unittest.TestCase): - sqs_encrypted_queue = '{"Attributes":{"QueueArn": "arn:aws:sqs:us-east-1:012345678910:test","KmsMasterKeyId": "alias/aws/sqs","KmsDataKeyReusePeriodSeconds": "300"}}' +def queue_attributes_side_effect(QueueUrl, AttributeNames): + if QueueUrl in ["url/compliant-queue-01", "url/compliant-queue-02", "url/compliant-queue-03"]: + return {"Attributes": {"KmsMasterKeyId": "alias/active-key"}} + + elif QueueUrl == "url/unencrypted-queue-01": + return {"Attributes": {}} + + elif QueueUrl == "url/key-not-exist-01": + return {"Attributes": {"KmsMasterKeyId": "alias/key-not-exist"}} + elif QueueUrl == "url/key-not-enabled-01": + return {"Attributes": {"KmsMasterKeyId": "alias/key-pending-deletion"}} + + +def describe_key_side_effect(KeyId): + if KeyId == 'alias/active-key': + return {"KeyMetadata": {"KeyManager": "AWS", "KeyState": "Enabled"}} + if KeyId == 'alias/key-pending-deletion': + return {"KeyMetadata": {"KeyManager": "AWS", "KeyState": "PendingDeletion"}} + else: + raise Exception('key not exist') + + +class TestHelperMixin: + def _run_test(self, paginator_mock_data, rule_parameters, expected_response): + paginator_mock = MagicMock() + paginator_mock.paginate = MagicMock(side_effect=paginator_mock_data) + + SQS_MOCK.get_paginator = MagicMock(return_value=paginator_mock) + + SQS_MOCK.get_queue_attributes.side_effect = queue_attributes_side_effect + + KMS_MOCK.describe_key.side_effect = describe_key_side_effect + + lambda_result = RULE.lambda_handler(build_lambda_scheduled_event(rule_parameters=rule_parameters), {}) + + assert_successful_evaluation(self, lambda_result, expected_response, len(lambda_result)) + + +class ComplianceTest(unittest.TestCase, TestHelperMixin): + rule_parameters = '{"ExemptedQueueNames": ".*(dev).*, .*(demo).*, exempted-queue-1, exempted-queue-2"}' + + def setUp(self): + pass + + # Scenario: 1 - valid rule parameter + def test_scenario_1_valid_rule_parameter(self): + rule_parameters = {"ExemptedQueueNames": ".*(dev).*, name, xyz, sam.*"} + results = RULE.evaluate_parameters(rule_parameters) + self.assertEqual([".*(dev).*", "sam.*"], results["exempted_regex"]) + self.assertEqual(["name", "xyz"], results["exempted_names"]) + + # Scenario: 2 - invalid rule parameter (empty) + def test_scenario_2_empty_rule_parameter(self): + rule_parameters = {} + results = RULE.evaluate_parameters(rule_parameters) + self.assertEqual([], results["exempted_regex"]) + self.assertEqual([], results["exempted_names"]) + + # Scenario: 2 - invalid rule parameter (wrong type) + def test_scenario_2_invalid_rule_parameter_type(self): + rule_parameters = {"ExemptedQueueNames": 100} + results = RULE.evaluate_parameters(rule_parameters) + self.assertEqual([], results["exempted_regex"]) + self.assertEqual([], results["exempted_names"]) + + # Scenario: 3 - exempted by name + def test_scenario_3_exempted_by_name(self): + paginator_mock_data = [ + [ + {'QueueUrls': ['url/exempted-queue-1', 'url/exempted-queue-2']} + ] + ] + + expected_response = [ + build_expected_response('NOT_APPLICABLE', 'url/exempted-queue-1', DEFAULT_RESOURCE_TYPE, + RULE.EXEMPTED_BY_NAME.format("exempted-queue-1")), + build_expected_response('NOT_APPLICABLE', 'url/exempted-queue-2', DEFAULT_RESOURCE_TYPE, + RULE.EXEMPTED_BY_NAME.format("exempted-queue-2")) + ] + + self._run_test(paginator_mock_data, self.rule_parameters, expected_response) - sqs_unencrypted_queue = '{"Attributes":{"QueueArn": "arn:aws:sqs:us-east-1:012345678910:test"}}' + # Scenario: 4 - exempted by regex + def test_scenario_4_exempted_by_regex(self): + paginator_mock_data = [ + [ + {'QueueUrls': ['url/dev-queue', 'url/my-demo-queue']} + ] + ] - def test_sqs_queue_unencrypted(self): - SQS_CLIENT_MOCK.get_queue_attributes = MagicMock(return_value=self.sqs_unencrypted_queue) - response = RULE.lambda_handler(build_lambda_scheduled_event('{"QueueNameStartsWith":"tes"}'), {}) - resp_expected = [] - resp_expected.append(build_expected_response('NON_COMPLIANT', 'https://queue.amazonaws.com/012345678910/test', DEFAULT_RESOURCE_TYPE, 'SQS Queue URL is not encrypted')) - assert_successful_evaluation(self, response, resp_expected) + expected_response = [ + build_expected_response('NOT_APPLICABLE', 'url/dev-queue', DEFAULT_RESOURCE_TYPE, + RULE.EXEMPTED_BY_REGEX.format(".*(dev).*")), + build_expected_response('NOT_APPLICABLE', 'url/my-demo-queue', DEFAULT_RESOURCE_TYPE, + RULE.EXEMPTED_BY_REGEX.format(".*(demo).*")) + ] - def test_sqs_queue_encrypted(self): - SQS_CLIENT_MOCK.get_queue_attributes = MagicMock(return_value=self.sqs_encrypted_queue) - response = RULE.lambda_handler(build_lambda_scheduled_event('{"QueueNameStartsWith":"tes"}'), {}) - resp_expected = [] - resp_expected.append(build_expected_response('COMPLIANT', 'https://queue.amazonaws.com/012345678910/test')) - assert_successful_evaluation(self, response, resp_expected) + self._run_test(paginator_mock_data, self.rule_parameters, expected_response) + + # Scenario: 5 - non compliant queue; no key is set + def test_scenario_5_encryption_disabled(self): + paginator_mock_data = [ + [ + {'QueueUrls': ['url/unencrypted-queue-01']} + ] + ] + + expected_response = [ + build_expected_response('NON_COMPLIANT', 'url/unencrypted-queue-01', DEFAULT_RESOURCE_TYPE, + RULE.ENCRYPTION_DISABLED) + ] + + self._run_test(paginator_mock_data, self.rule_parameters, expected_response) + + # Scenario: 6 - non compliant queue; key doesn't exist + def test_scenario_6_key_not_exist(self): + paginator_mock_data = [ + [ + {'QueueUrls': ['url/key-not-exist-01']} + ] + ] + + expected_response = [ + build_expected_response('NON_COMPLIANT', 'url/key-not-exist-01', DEFAULT_RESOURCE_TYPE, + RULE.KEY_DOES_NOT_EXIST.format("alias/key-not-exist")) + ] + + self._run_test(paginator_mock_data, self.rule_parameters, expected_response) + + # Scenario: 7 - non compliant queue; key isn't enabled + def test_scenario_7_key_not_enabled(self): + paginator_mock_data = [ + [ + {'QueueUrls': ['url/key-not-enabled-01']} + ] + ] + + expected_response = [ + build_expected_response('NON_COMPLIANT', 'url/key-not-enabled-01', DEFAULT_RESOURCE_TYPE, + RULE.KEY_IS_NOT_ENABLED.format( + "alias/key-pending-deletion", "PendingDeletion")) + ] + + self._run_test(paginator_mock_data, self.rule_parameters, expected_response) + + # Scenario: 8 - compliant queue + def test_scenario_8_compliant_queues(self): + paginator_mock_data = [ + [ + {'QueueUrls': ['url/compliant-queue-01', 'url/compliant-queue-02']}, + {'QueueUrls': ['url/compliant-queue-03']} + ] + ] + + expected_response = [ + build_expected_response('COMPLIANT', 'url/compliant-queue-01', DEFAULT_RESOURCE_TYPE, RULE.COMPLIANT), + build_expected_response('COMPLIANT', 'url/compliant-queue-02', DEFAULT_RESOURCE_TYPE, RULE.COMPLIANT), + build_expected_response('COMPLIANT', 'url/compliant-queue-03', DEFAULT_RESOURCE_TYPE, RULE.COMPLIANT) + ] + + self._run_test(paginator_mock_data, self.rule_parameters, expected_response) - #def test_sample_2(self): - # RULE.ASSUME_ROLE_MODE = False - # response = RULE.lambda_handler(build_lambda_configurationchange_event(self.invoking_event_iam_role_sample, self.rule_parameters), {}) - # resp_expected = [] - # resp_expected.append(build_expected_response('NOT_APPLICABLE', 'some-resource-id', 'AWS::IAM::Role')) - # assert_successful_evaluation(self, response, resp_expected) #################### # Helper Functions # #################### - def build_lambda_configurationchange_event(invoking_event, rule_parameters=None): event_to_return = { - 'configRuleName':'myrule', - 'executionRoleArn':'roleArn', + 'configRuleName': 'myrule', + 'executionRoleArn': 'roleArn', 'eventLeftScope': False, 'invokingEvent': invoking_event, 'accountId': '123456789012', 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', - 'resultToken':'token' + 'resultToken': 'token' } if rule_parameters: event_to_return['ruleParameters'] = rule_parameters return event_to_return + def build_lambda_scheduled_event(rule_parameters=None): invoking_event = '{"messageType":"ScheduledNotification","notificationCreationTime":"2017-12-23T22:11:18.158Z"}' event_to_return = { - 'configRuleName':'myrule', - 'executionRoleArn':'roleArn', + 'configRuleName': 'myrule', + 'executionRoleArn': 'roleArn', 'eventLeftScope': False, 'invokingEvent': invoking_event, 'accountId': '123456789012', 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', - 'resultToken':'token' + 'resultToken': 'token' } if rule_parameters: event_to_return['ruleParameters'] = rule_parameters return event_to_return -def build_expected_response(compliance_type, compliance_resource_id, compliance_resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): + +def build_expected_response(compliance_type, compliance_resource_id, compliance_resource_type=DEFAULT_RESOURCE_TYPE, + annotation=None): if not annotation: return { 'ComplianceType': compliance_type, 'ComplianceResourceId': compliance_resource_id, 'ComplianceResourceType': compliance_resource_type - } + } return { 'ComplianceType': compliance_type, 'ComplianceResourceId': compliance_resource_id, 'ComplianceResourceType': compliance_resource_type, 'Annotation': annotation - } + } + def assert_successful_evaluation(test_class, response, resp_expected, evaluations_count=1): if isinstance(response, dict): @@ -139,6 +285,7 @@ def assert_successful_evaluation(test_class, response, resp_expected, evaluation if 'Annotation' in response_expected or 'Annotation' in response[i]: test_class.assertEquals(response_expected['Annotation'], response[i]['Annotation']) + def assert_customer_error_response(test_class, response, customer_error_code=None, customer_error_message=None): if customer_error_code: test_class.assertEqual(customer_error_code, response['customerErrorCode']) @@ -151,6 +298,7 @@ def assert_customer_error_response(test_class, response, customer_error_code=Non if "internalErrorDetails" in response: test_class.assertTrue(response['internalErrorDetails']) + def sts_mock(): assume_role_response = { "Credentials": { @@ -160,6 +308,7 @@ def sts_mock(): STS_CLIENT_MOCK.reset_mock(return_value=True) STS_CLIENT_MOCK.assume_role = MagicMock(return_value=assume_role_response) + ################## # Common Testing # ################## diff --git a/python/SQS_ENCRYPTION_CHECK/parameters.json b/python/SQS_ENCRYPTION_CHECK/parameters.json index 675463f79..4a8296e89 100644 --- a/python/SQS_ENCRYPTION_CHECK/parameters.json +++ b/python/SQS_ENCRYPTION_CHECK/parameters.json @@ -1,12 +1,12 @@ { - "Version": "1.0", + "Version": "2.0", "Parameters": { "RuleName": "SQS_ENCRYPTION_CHECK", "SourceRuntime": "python3.7", "CodeKey": "SQS_ENCRYPTION_CHECK.zip", - "InputParameters": "{\"QueueNameStartsWith\":\"cookin, servefo\"}", + "InputParameters": "{\"ExemptedQueueNames\":\".*(dev).*, exempted-queue\"}", "OptionalParameters": "{}", - "SourcePeriodic": "Three_Hours" + "SourcePeriodic": "TwentyFour_Hours" }, "Tags": "[]" }