diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 5cd5aed..19bc1d1 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -60,6 +60,11 @@ class ETLPipeline(object): and has functionality to add steps to the pipeline """ + + # put this here so as not to pollute global namespace. Also makes mocking + # easier + DEFAULT_TOPIC_ARN = config.etl.get('DEFAULT_TOPIC_ARN', const.NONE) + def __init__(self, name, frequency='one-time', ec2_resource_config=None, time_delta=None, emr_cluster_config=None, load_time=None, topic_arn=None, max_retries=MAX_RETRIES, teardown=None, @@ -93,7 +98,13 @@ def __init__(self, name, frequency='one-time', ec2_resource_config=None, self.time_delta = time_delta self.description = description self.max_retries = max_retries - self.topic_arn = topic_arn + + if topic_arn is not None: + self.topic_arn = topic_arn + elif self.DEFAULT_TOPIC_ARN: + self.topic_arn = self.DEFAULT_TOPIC_ARN + else: + self.topic_arn = None if bootstrap is not None: self.bootstrap_definitions = bootstrap diff --git a/dataduct/etl/tests/test_etl_pipeline.py b/dataduct/etl/tests/test_etl_pipeline.py index a40c16c..28e135e 100644 --- a/dataduct/etl/tests/test_etl_pipeline.py +++ b/dataduct/etl/tests/test_etl_pipeline.py @@ -1,14 +1,15 @@ """Tests for the ETL Pipeline object """ import unittest +import mock from nose.tools import raises from nose.tools import eq_ +from nose.tools import assert_not_equal from datetime import timedelta from ..etl_pipeline import ETLPipeline from ...utils.exceptions import ETLInputError - class EtlPipelineTests(unittest.TestCase): """Tests for the ETL Pipeline object """ @@ -59,3 +60,16 @@ def test_bad_data_type_throws(self): _s3_uri is bad """ self.default_pipeline._s3_uri('TEST_DATA_TYPE') + + @staticmethod + def test_default_arn_loaded_if_not_in_etl_yaml(): + with mock.patch('dataduct.etl.etl_pipeline.ETLPipeline.DEFAULT_TOPIC_ARN', 'blah'): + result = ETLPipeline('test_pipeline') + eq_(result.topic_arn, 'blah') + + @staticmethod + def test_arn_loads_if_provided_in_etl_yaml(): + with mock.patch('dataduct.etl.etl_pipeline.ETLPipeline.DEFAULT_TOPIC_ARN', 'blah'): + result = ETLPipeline('test_pipeline', topic_arn="not_blah") + eq_(result.topic_arn, "not_blah") + assert_not_equal(result.topic_arn, ETLPipeline.DEFAULT_TOPIC_ARN) diff --git a/docs/config.rst b/docs/config.rst index 3f7401a..3cb957f 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -181,6 +181,7 @@ ETL ROLE: FILL_ME_IN S3_BASE_PATH: dev S3_ETL_BUCKET: FILL_ME_IN + DEFAULT_TOPIC_ARN: 'arn:aws:sns:example_arn' SNS_TOPIC_ARN_FAILURE: null SNS_TOPIC_ARN_WARNING: null FREQUENCY_OVERRIDE: one-time @@ -222,6 +223,8 @@ level. The parameters are explained below: or across production and dev - ``S3_ETL_BUCKET``: S3 bucket to use for DP data, logs, source code etc. +- ``DEFAULT_TOPIC_ARN``: default ARN to use for pipelines. Overridden if + specified in pipeline yaml definition. - ``SNS_TOPIC_ARN_FAILURE``: SNS to trigger for failed steps or pipelines - ``SNS_TOPIC_ARN_WARNING``: SNS to trigger for failed QA checks